You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/03 17:29:42 UTC

[GitHub] [iceberg] edgarRd opened a new pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

edgarRd opened a new pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293


   Currently, even thought the `BaseRewriteDataFilesAction` allows for using an output spec to group the files to rewrite, the actual output partition spec is not used in the `RowDataRewriter`, producing the incorrect number of files.
   
   This PR sets the spec in the `RowDataRewriter`, if the spec is not modified and is the same as the `table.spec()` this PR has not effective change. However, if the output spec is specified, the new expected number of files according to the output spec should be reflected. I've added a unit test to validate setting output spec.
   
   PTAL @jerryshao @rdblue @aokolnychyi 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r600001818



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +64,11 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteSpec(PartitionSpec newSpec) {
+    Preconditions.checkArgument(current().specsById().containsKey(newSpec.specId()),
+        "Invalid spec with id %d", newSpec.specId());
+    return new BaseRewriteFiles(tableName(), ops(), newSpec);

Review comment:
       Not returning `this` breaks method chaining because you can't do this:
   
   ```java
   RewriteFiles rewrite = table.newRewriteFiles();
   
   if (!spec.equals(table.spec())) {
     rewrite.rewriteSpec(spec);
   }
   
   rewrite.commit();
   ```
   
   I think as a result, we would need to support changing the spec in the underlying class, `MergingSnapshotProducer`. There are a couple of ways to do that. One is to validate that there is only one output spec like you do above. Another is to support writing with multiple specs. By not adding a `rewriteSpec` method and instead detecting from data files, we can keep flexibility and enforce the single spec for now and relax that constraint later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#issuecomment-805313015


   @edgarRd, sorry for the late reply but I've had a chance to review this again. Thanks for working on it! I think we can simplify it a bit, but it's almost ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r586819589



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,101 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());

Review comment:
       I think it would be useful to also add an assertion about the partition spec ID associated with the data file and the partition values for each data file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r600001818



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +64,11 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteSpec(PartitionSpec newSpec) {
+    Preconditions.checkArgument(current().specsById().containsKey(newSpec.specId()),
+        "Invalid spec with id %d", newSpec.specId());
+    return new BaseRewriteFiles(tableName(), ops(), newSpec);

Review comment:
       Not returning `this` breaks method chaining because you can't do this:
   
   ```java
   RewriteFiles rewrite = table.newRewriteFiles();
   
   if (!spec.equals(table.spec())) {
     rewrite.rewriteSpec(spec);
   }
   
   rewrite.commit();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r602509033



##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -60,7 +60,7 @@
 
   private final String tableName;
   private final TableOperations ops;
-  private final PartitionSpec spec;
+  private PartitionSpec spec;

Review comment:
       Can you move this into the "update data" section? These fields are all final.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r587701783



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,101 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());
+
+    // Should still have all the same data
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    List<ThreeColumnRecord> expectedFilteredRecords = Lists.newArrayList(records2);

Review comment:
       After some digging I noticed the Manifest being written was using the current partition spec rather than the one specified for the rewrite. I've added 0b3296d - PTAL, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r602507888



##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -188,11 +190,24 @@ protected void add(DataFile file) {
    * Add a delete file to the new snapshot.
    */
   protected void add(DeleteFile file) {
-    addedFilesSummary.addedFile(spec, file);
+    setWriteSpec(file);
+    addedFilesSummary.addedFile(writeSpec(), file);
     hasNewDeleteFiles = true;
     newDeleteFiles.add(file);
   }
 
+  private void setWriteSpec(ContentFile<?> file) {
+    PartitionSpec writeSpec = ops.current().spec(file.specId());
+    Preconditions.checkNotNull(file, "Invalid content file: null");

Review comment:
       This check should come before calling `specId` on the file above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r586820487



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,101 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());
+
+    // Should still have all the same data
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    List<ThreeColumnRecord> expectedFilteredRecords = Lists.newArrayList(records2);

Review comment:
       I don't think the rest of this test is very valuable for testing the feature here. All this does is test Spark's filter, which will actually read the entire table because filters that use a lambda can't be pushed down. The rest of the method from here could be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r587701783



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,101 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());
+
+    // Should still have all the same data
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    List<ThreeColumnRecord> expectedFilteredRecords = Lists.newArrayList(records2);

Review comment:
       After some digging I noticed the Manifest being written was using the current partition spec rather than the one specified for the rewrite. I've added 7d460c7 - PTAL, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r586882671



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,101 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());

Review comment:
       Done. I added the check on the result of the rewrite.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r602536940



##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -60,7 +60,7 @@
 
   private final String tableName;
   private final TableOperations ops;
-  private final PartitionSpec spec;
+  private PartitionSpec spec;

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#issuecomment-809813597


   Looks great! Thanks for fixing this, @edgarRd!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r600001056



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -46,6 +51,8 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
         "Files to delete cannot be null or empty");
     Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(),
         "Files to add can not be null or empty");
+    Preconditions.checkArgument(filesToAdd.stream().allMatch(df -> df.specId() == writeSpec().specId()),
+        "Files to add can not have a different spec than the rewrite files spec");

Review comment:
       Should we just detect the spec that is used? Why make the user set the spec if we have it for each new data file?
   
   If we didn't need to set the spec, then we could eventually support rewrites that produce multiple specs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#issuecomment-800389307


   @rdblue @aokolnychyi PTAL when you have a chance, or feel free to redirect to someone else - Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r599999931



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -23,8 +23,13 @@
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
+

Review comment:
       Nit: we try to avoid whitespace changes because they can easily cause unnecessary commit conflicts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r600712368



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -46,6 +51,8 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
         "Files to delete cannot be null or empty");
     Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(),
         "Files to add can not be null or empty");
+    Preconditions.checkArgument(filesToAdd.stream().allMatch(df -> df.specId() == writeSpec().specId()),
+        "Files to add can not have a different spec than the rewrite files spec");

Review comment:
       I think this is a great idea. While what you mention is correct, there are many test cases where we add `DataFile`s and `DeleteFile`s where the spec has no specId relative to the table, e.g. it's `0` when initialized, and takes a specId only until it is added via `TableMetadata#updatePartitionSpec` - examples of tests are: https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java#L579 where the PartitionSpec used in the `DataFile` is the one that was initialized with `0`, rather than `table.spec()`.
   
   Before proceeding, I think I'd like to know if this way of writing tests is want we want or should we make sure we stay consistent and add `DataFile`s and `DeleteFile`s with the PartitionSpec with the initialized specId since they are immutable. Since from the `ContentFile` we don't have direct access to the PartitonSpec - only using the specId - I don't think it's possible to validate by the actual spec object.
   
   This issue is found in test cases that add `DataFile`s and `DeleteFile`s after a partition spec update and do not use the partition spec with the specId.
   
   I can work on fixing the tests, but that will increase the diff of this PR, however if that makes it consistent to the behavior we want I'm happy to do it.
   
   Thanks for your guidance!
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r602325951



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -46,6 +51,8 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
         "Files to delete cannot be null or empty");
     Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(),
         "Files to add can not be null or empty");
+    Preconditions.checkArgument(filesToAdd.stream().allMatch(df -> df.specId() == writeSpec().specId()),
+        "Files to add can not have a different spec than the rewrite files spec");

Review comment:
       Turns out it was manageable. I was able to fix the failing tests and make the changes suggested. PTAL and thanks for the help!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r600712368



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -46,6 +51,8 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
         "Files to delete cannot be null or empty");
     Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(),
         "Files to add can not be null or empty");
+    Preconditions.checkArgument(filesToAdd.stream().allMatch(df -> df.specId() == writeSpec().specId()),
+        "Files to add can not have a different spec than the rewrite files spec");

Review comment:
       I think this is a great idea. While what you mention is correct, there are many test cases where we add `DataFile`s and `DeleteFile`s where the spec has no specId relative to the table, e.g. it's `0` when initialized, and takes a specId only until it is added via `TableMetadata#updatePartitionSpec` - examples of tests are: https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java#L579 where the PartitionSpec used in the `DataFile` is the one that was initialized with `0`, rather than `table.spec()`.
   
   Before proceeding, I think I'd like to know if this way of writing tests is want we want or should we make sure we stay consistent and add `DataFile`s and `DeleteFile`s with the `PartitionSpec` with the initialized specId since they are immutable. Since from the `ContentFile` we don't have direct access to the PartitonSpec - only using the specId - I don't think it's possible to validate by the actual spec object.
   
   This issue is found in test cases that add `DataFile`s and `DeleteFile`s after a partition spec update and do not use the partition spec with the specId.
   
   I can work on fixing the tests, but that will increase the diff of this PR, however if that makes it consistent to the behavior we want I'm happy to do it.
   
   Thanks for your guidance!
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r586874413



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,101 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());
+
+    // Should still have all the same data
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    List<ThreeColumnRecord> expectedFilteredRecords = Lists.newArrayList(records2);

Review comment:
       Interestingly, after rewriting the filter as a string (`c1 = 1 AND c2 = 'BBBBBBBBBB'`) to push it down the test fails, e.g.:
   This block:
   ```java
   Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
   List<ThreeColumnRecord> actualFilteredRecords = resultDF.sort("c1", "c2", "c3")
       .filter("c1 = 1 AND c2 = 'BBBBBBBBBB'")
       .as(Encoders.bean(ThreeColumnRecord.class))
       .collectAsList();
   Assert.assertEquals("Rows must match", records2, actualFilteredRecords);
   ```
   works before but fails after the rewrite, returning no results.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#issuecomment-796102912


   @rdblue @aokolnychyi I just wanted to follow up on this. Please let me know if there's any other comment. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r601758199



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -46,6 +51,8 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
         "Files to delete cannot be null or empty");
     Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(),
         "Files to add can not be null or empty");
+    Preconditions.checkArgument(filesToAdd.stream().allMatch(df -> df.specId() == writeSpec().specId()),
+        "Files to add can not have a different spec than the rewrite files spec");

Review comment:
       If it isn't a huge number of changes, I'd prefer to fix the tests instead of having a hacky work-around. If it is a huge number of changes we can look into what to do next. What do you think the effort is to fix them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r586678757



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,95 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+            .identity("c1")
+            .build();

Review comment:
       Nit: continuing indentation in this method is off. It should be 2 indents / 4 spaces from the start of the previous line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#issuecomment-789913182


   After a quick look, this seems correct to me. I'll take a longer look at the test when I get a chance.
   
   @aokolnychyi, you may want to review this as well. Thanks @edgarRd!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r602507337



##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -109,6 +109,7 @@ public ThisT set(String property, String value) {
   }
 
   protected PartitionSpec writeSpec() {
+    Preconditions.checkState(spec != null, "No data or delete files have been added.");

Review comment:
       We normally note the operation that is failing, like "Cannot determine partition spec: no data or delete files have been added". Also, no need for ending punctuation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r602508565



##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -188,11 +190,24 @@ protected void add(DataFile file) {
    * Add a delete file to the new snapshot.
    */
   protected void add(DeleteFile file) {
-    addedFilesSummary.addedFile(spec, file);
+    setWriteSpec(file);
+    addedFilesSummary.addedFile(writeSpec(), file);
     hasNewDeleteFiles = true;
     newDeleteFiles.add(file);
   }
 
+  private void setWriteSpec(ContentFile<?> file) {
+    PartitionSpec writeSpec = ops.current().spec(file.specId());
+    Preconditions.checkNotNull(file, "Invalid content file: null");
+    Preconditions.checkNotNull(writeSpec,
+        "Partition spec id should be defined in table, writing partition spec: null");

Review comment:
       Our error messages are usually of the form "Cannot <action>: <reason>". This would be "Cannot find partition spec for file: %s"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r587735828



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,101 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());
+
+    // Should still have all the same data
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    List<ThreeColumnRecord> expectedFilteredRecords = Lists.newArrayList(records2);

Review comment:
       Great, thanks for catching this! I was trying to find some time to look at it but I'm glad you were able to track it down.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r586911453



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -377,4 +378,101 @@ private void writeDF(Dataset<Row> df) {
         .mode("append")
         .save(tableLocation);
   }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());
+
+    // Should still have all the same data
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    List<ThreeColumnRecord> expectedFilteredRecords = Lists.newArrayList(records2);

Review comment:
       I think we need to find out what's going on then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r600712368



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -46,6 +51,8 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
         "Files to delete cannot be null or empty");
     Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(),
         "Files to add can not be null or empty");
+    Preconditions.checkArgument(filesToAdd.stream().allMatch(df -> df.specId() == writeSpec().specId()),
+        "Files to add can not have a different spec than the rewrite files spec");

Review comment:
       I think this is a great idea. While what you mention is correct, there are many test cases where we add `DataFiles` and `DeleteFiles` where the spec has no specId relative to the table, e.g. it's `0` when initialized, and takes a specId only until it is added via `TableMetadata#updatePartitionSpec` - examples of tests are: https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java#L579 where the PartitionSpec used in the `DataFile` is the one that was initialized with `0`, rather than `table.spec()`.
   
   Before proceeding, I think I'd like to know if this way of writing tests is want we want or should we make sure we stay consistent and add `DataFile`s and `DeleteFile`s with the PartitionSpec with the initialized specId since they are immutable. Since from the `ContentFile` we don't have direct access to the PartitonSpec - only using the specId - I don't think it's possible to validate by the actual spec object.
   
   This issue is found in test cases that add `DataFile`s and `DeleteFile`s after a partition spec update and do not use the partition spec with the specId.
   
   I can work on fixing the tests, but that will increase the diff of this PR, however if that makes it consistent to the behavior we want I'm happy to do it.
   
   Thanks for your guidance!
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r600004130



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -260,7 +264,7 @@ public RewriteDataFilesActionResult execute() {
 
   private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
     try {
-      RewriteFiles rewriteFiles = table.newRewrite();
+      RewriteFiles rewriteFiles = table.newRewrite().rewriteSpec(spec());

Review comment:
       With the changes I suggested above, I don't think we need to change this file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2293: Spark: Fix RewriteDataFilesAction with output spec in writer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2293:
URL: https://github.com/apache/iceberg/pull/2293#discussion_r600003774



##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -84,10 +84,14 @@
   private boolean hasNewDeleteFiles = false;
 
   MergingSnapshotProducer(String tableName, TableOperations ops) {
+    this(tableName, ops, ops.current().spec());
+  }
+
+  MergingSnapshotProducer(String tableName, TableOperations ops, PartitionSpec spec) {

Review comment:
       I don't think that `spec` should be passed in the constructor. See my comments on `BaseRewriteFiles`.
   
   Instead, I think we should initially set the spec ID to null and initialize it the first time a `DataFile` is added. Then validate that all other data files use the same ID. That way we don't need a new API method and maintain flexibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org