You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "gustavoatt (via GitHub)" <gi...@apache.org> on 2023/04/13 16:33:21 UTC

[GitHub] [iceberg] gustavoatt opened a new pull request, #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

gustavoatt opened a new pull request, #7120:
URL: https://github.com/apache/iceberg/pull/7120

   ## Summary
   
   Allow passing `output-spec-id` to the Spark writer, so that we can customize which partition spec to write. This is useful for when a table has more than one active spec (e.g. daily and hourly partition spec).
   
   Fixes https://github.com/apache/iceberg/issues/6932
   
   ## Testing
   
   - [x] Unit tests for Spark 3.1, 3.2 & 3.3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1163431621


##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,87 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);

Review Comment:
   Is this necessary?  It looks like table is dropped after every test?



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,87 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();

Review Comment:
   I think this is not necessary?



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,87 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);

Review Comment:
   Can we do this by dataframe, like below?  That way we may not need to refresh in SQL, and also test can be more consistent, as now its some inserts by DF, others by SQL



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,87 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);

Review Comment:
   May be able to drop following refresh as well then, if we can remove that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho merged pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho merged PR #7120:
URL: https://github.com/apache/iceberg/pull/7120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1164885305


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -586,8 +607,8 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
       Table table = tableBroadcast.value();
-      PartitionSpec spec = table.spec();
       FileIO io = table.io();
+      PartitionSpec outputSpec = table.specs().get(outputSpecId);

Review Comment:
   nit: Shall we still call it `spec` to reduce the amount of changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt closed pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt closed pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec
URL: https://github.com/apache/iceberg/pull/7120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1167057077


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -127,6 +129,16 @@ class SparkWrite {
     this.dsSchema = dsSchema;
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+
+    if (writeConf.outputSpecId() == null) {

Review Comment:
   Follow up PR https://github.com/apache/iceberg/pull/7348



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1507473736

   Merged, thanks @gustavoatt , and also @aokolnychyi for additional review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1164113783


##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,87 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);

Review Comment:
   Unfortunately the `createTable` always inserts three records https://github.com/apache/iceberg/blob/master/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java#L46
   
   I did not want this test to also have to keep track about the partition spec of those records so I thought it was easier to just delete all records.



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,87 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);

Review Comment:
   Yes, I modified all the code to do inserts only with the dataframe API. I still left the reads using `sql` because its simpler, but happy to change it if you think that is preferable. 



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,87 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1149501204


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -566,12 +578,14 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
     protected WriterFactory(
         Broadcast<Table> tableBroadcast,
         FileFormat format,
+        PartitionSpec outputSpec,

Review Comment:
   Done. Changed it on all Spark versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1164445500


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java:
##########
@@ -51,5 +51,8 @@ private SparkWriteOptions() {}
   public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
       "handle-timestamp-without-timezone";
 
+  // Output partition spec ID where writes should go.

Review Comment:
   Nit: comment seems to just repeat information of the field name, doesnt seem worth it as is.  Maybe remove, or improve it, maybe 'partition spec to apply on newly written files'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1507252375

   Thanks for the review @aokolnychyi and @szehon-ho.
   
   There is a check failure on this PR but looks unrelated to my changes:
   
   ```
   Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0.
   * What went wrong:
   
   Execution failed for task ':iceberg-spark:iceberg-spark-3.2_2.12:checkstyleMain'.
   You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins.
   > A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction
   
      > An unexpected error occurred configuring and executing Checkstyle.
   See https://docs.gradle.org/8.0.2/userguide/command_line_interface.html#sec:command_line_warnings
         > java.lang.Error: Error was thrown while processing /home/runner/work/iceberg/iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
   499 actionable tasks: 499 executed
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1160342252


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/PartitionedWritesTestBase.java:
##########
@@ -183,4 +184,91 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);
+
+    List<Object[]> expected = ImmutableList.of(row(10L, "a", table.spec().specId()));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));
+
+    // Output spec ID should be respected when present.
+    List<SimpleRecord> data =
+        ImmutableList.of(new SimpleRecord(11, "b"), new SimpleRecord(12, "c"));
+    spark
+        .createDataFrame(data, SimpleRecord.class)
+        .toDF()
+        .writeTo(tableName)
+        .option("output-spec-id", Integer.toString(originalSpecId))
+        .append();
+
+    expected =
+        ImmutableList.of(
+            row(10L, "a", table.spec().specId()),
+            row(11L, "b", originalSpecId),
+            row(12L, "c", originalSpecId));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));
+
+    // Verify that the actual partitions are written with the correct spec ID.
+    // Two of the partitions should have the original spec ID and one should have the new one.
+
+    if (!this.getClass().equals(TestPartitionedWritesToWapBranch.class)) {

Review Comment:
   Is there any issue to track this?  (can you file if not).  And also, is there an easier way to do this?  (Maybe override the test method there and don't run?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1160930338


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/PartitionedWritesTestBase.java:
##########
@@ -183,4 +184,91 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);
+
+    List<Object[]> expected = ImmutableList.of(row(10L, "a", table.spec().specId()));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));
+
+    // Output spec ID should be respected when present.
+    List<SimpleRecord> data =
+        ImmutableList.of(new SimpleRecord(11, "b"), new SimpleRecord(12, "c"));
+    spark
+        .createDataFrame(data, SimpleRecord.class)
+        .toDF()
+        .writeTo(tableName)
+        .option("output-spec-id", Integer.toString(originalSpecId))
+        .append();
+
+    expected =
+        ImmutableList.of(
+            row(10L, "a", table.spec().specId()),
+            row(11L, "b", originalSpecId),
+            row(12L, "c", originalSpecId));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));
+
+    // Verify that the actual partitions are written with the correct spec ID.
+    // Two of the partitions should have the original spec ID and one should have the new one.
+
+    if (!this.getClass().equals(TestPartitionedWritesToWapBranch.class)) {

Review Comment:
   Just filed one at https://github.com/apache/iceberg/issues/7297. Basically I'm not able to read from the partitions table the newly written data into the WAP branch. It does work for non WAP branches though.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -118,6 +119,10 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
     this.applicationId = applicationId;
     this.wapEnabled = writeConf.wapEnabled();
     this.wapId = writeConf.wapId();
+    this.outputSpecId =

Review Comment:
   Done, added a check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1146566963


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -566,12 +578,14 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
     protected WriterFactory(
         Broadcast<Table> tableBroadcast,
         FileFormat format,
+        PartitionSpec outputSpec,

Review Comment:
   How about passing spec id and looking up PartitionSpec on the last part?  It will be less to serailize (I think WriterFactory is serialized and sent to executors)



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,64 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);
+
+    List<Object[]> expected = ImmutableList.of(row(10L, "a", table.spec().specId()));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));
+
+    // Output spec ID should be respected when present.
+    List<SimpleRecord> data =
+        ImmutableList.of(new SimpleRecord(11, "b"), new SimpleRecord(12, "c"));
+    spark
+        .createDataFrame(data, SimpleRecord.class)
+        .toDF()
+        .writeTo(tableName)
+        .option("output-spec-id", Integer.toString(originalSpecId))
+        .append();
+
+    expected =
+        ImmutableList.of(
+            row(10L, "a", table.spec().specId()),
+            row(11L, "b", originalSpecId),
+            row(12L, "c", originalSpecId));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));

Review Comment:
   Can we check partitions table to be sure right partitions are added?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1165837298


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java:
##########
@@ -115,6 +121,20 @@ public String wapId() {
     return sessionConf.get("spark.wap.id", null);
   }
 
+  public int outputSpecId() {
+    final int outputSpecId =

Review Comment:
   Done. I usually just keep the final as a way of having something like `const` to avoid accidentally modifying something I did not intend to. But I think it is not necessary in this case and would prefer to do it to keep consistency in the rpo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1163426968


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -127,6 +129,15 @@ class SparkWrite {
     this.dsSchema = dsSchema;
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+
+    Preconditions.checkArgument(

Review Comment:
   I agree, this is much simpler. The ternary operator became too unwieldy in this case. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1507484483

   Thanks for the review and merging @szehon-ho @aokolnychyi!
   
   Appreciate the effort spent reviewing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1470941540

   @edgarRd I looked into it, but I'm not sure whether it is worthwhile to add it to Spark 2.4:
   
   1. It is a much larger change since the `SparkWriter` significantly changed from Spark 2 to Spark 3.
   2. I hit some issues with `org.apache.iceberg.io.PartitionedWriter` when trying to write to a non-default partition spec which is only used for Spark 2.4.
   
   For our use-case, adding only on Spark 3 should be enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "edgarRd (via GitHub)" <gi...@apache.org>.
edgarRd commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1470887354

   Should this be in Spark 2.4 as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1470870363

   @rdblue could you take a look when you have a chance? I wrote about the use-case for this feature on https://github.com/apache/iceberg/issues/6932.
   
   Let me know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1507258452

   Yea I think its appeared a few times, tracked by https://github.com/apache/iceberg/pull/7321, let's just retrigger (close and re-open for example)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1164882606


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -127,6 +129,16 @@ class SparkWrite {
     this.dsSchema = dsSchema;
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+
+    if (writeConf.outputSpecId() == null) {

Review Comment:
   Why not have this inside `SparkWriteConf` and make `outputSpecId()` return `int`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1165618038


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -127,6 +129,16 @@ class SparkWrite {
     this.dsSchema = dsSchema;
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+
+    if (writeConf.outputSpecId() == null) {

Review Comment:
   Done, I moved this logic to `SparkWriteConf`. The main reason why I initially did not do it there was because I did not want to store the specs and current spec there, but I think that should be ok.



##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -586,8 +607,8 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
       Table table = tableBroadcast.value();
-      PartitionSpec spec = table.spec();
       FileIO io = table.io();
+      PartitionSpec outputSpec = table.specs().get(outputSpecId);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1506221636

   The change looks good to me, I had a few minor comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1166312659


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -127,6 +129,16 @@ class SparkWrite {
     this.dsSchema = dsSchema;
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+
+    if (writeConf.outputSpecId() == null) {

Review Comment:
   We could keep a reference to `Table`, just like we do in `SparkReadConf`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#issuecomment-1505669489

   Thanks @gustavoatt for all the changes, let me wait a little bit to see if any other concerns, will merge if not


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1160352335


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -118,6 +119,10 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
     this.applicationId = applicationId;
     this.wapEnabled = writeConf.wapEnabled();
     this.wapId = writeConf.wapId();
+    this.outputSpecId =

Review Comment:
   Should we add some validation, that given specid is contained inside table.specs?



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/PartitionedWritesTestBase.java:
##########
@@ -183,4 +184,91 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);
+
+    List<Object[]> expected = ImmutableList.of(row(10L, "a", table.spec().specId()));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));
+
+    // Output spec ID should be respected when present.
+    List<SimpleRecord> data =
+        ImmutableList.of(new SimpleRecord(11, "b"), new SimpleRecord(12, "c"));
+    spark
+        .createDataFrame(data, SimpleRecord.class)
+        .toDF()
+        .writeTo(tableName)
+        .option("output-spec-id", Integer.toString(originalSpecId))
+        .append();
+
+    expected =
+        ImmutableList.of(
+            row(10L, "a", table.spec().specId()),
+            row(11L, "b", originalSpecId),
+            row(12L, "c", originalSpecId));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));
+
+    // Verify that the actual partitions are written with the correct spec ID.
+    // Two of the partitions should have the original spec ID and one should have the new one.
+

Review Comment:
   Nit: can remove extra newline here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1148058840


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -566,12 +578,14 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
     protected WriterFactory(
         Broadcast<Table> tableBroadcast,
         FileFormat format,
+        PartitionSpec outputSpec,

Review Comment:
   Got it, I'll add that shortly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1163389284


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -127,6 +129,15 @@ class SparkWrite {
     this.dsSchema = dsSchema;
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+
+    Preconditions.checkArgument(
+        writeConf.outputSpecId() == null || table.specs().containsKey(writeConf.outputSpecId()),
+        "Cannot write to unknown spec: %s",
+        writeConf.outputSpecId());
+    this.outputSpecId =
+        writeConf.outputSpecId() != null
+            ? table.specs().get(writeConf.outputSpecId()).specId()

Review Comment:
   I think this is a bit redundant, but if we go with previous suggestion it will be not necessary?



##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -127,6 +129,15 @@ class SparkWrite {
     this.dsSchema = dsSchema;
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+
+    Preconditions.checkArgument(

Review Comment:
   Nit:  This is a bit hard to read, would this be better?
   ```
   if (writeConf.outputSpecId == null) {
     this.outputSpecId = table.spec().specId();
   } else {
     this.outputSpecId = writeConf.outputSpecId();
     Preconditions.checkArgument(table.specs().containsKey(outputSpecId), ...);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1149502297


##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java:
##########
@@ -157,4 +158,64 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);
+
+    List<Object[]> expected = ImmutableList.of(row(10L, "a", table.spec().specId()));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));
+
+    // Output spec ID should be respected when present.
+    List<SimpleRecord> data =
+        ImmutableList.of(new SimpleRecord(11, "b"), new SimpleRecord(12, "c"));
+    spark
+        .createDataFrame(data, SimpleRecord.class)
+        .toDF()
+        .writeTo(tableName)
+        .option("output-spec-id", Integer.toString(originalSpecId))
+        .append();
+
+    expected =
+        ImmutableList.of(
+            row(10L, "a", table.spec().specId()),
+            row(11L, "b", originalSpecId),
+            row(12L, "c", originalSpecId));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", tableName));

Review Comment:
   Done. Added it.
   
   I had to skip that check on Spark 3.3. for Wap branch. There is a bug reading the `.partitions` table on a WAP branch so I just skipped that check (we still check that each row has the correct spec_id).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1164884600


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java:
##########
@@ -115,6 +115,10 @@ public String wapId() {
     return sessionConf.get("spark.wap.id", null);
   }
 
+  public Integer outputSpecId() {

Review Comment:
   Initially we were not sure whether to make spec IDs public but I also don't see a good alternative.
   I am OK with the overall idea of exposing this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1165819216


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java:
##########
@@ -115,6 +121,20 @@ public String wapId() {
     return sessionConf.get("spark.wap.id", null);
   }
 
+  public int outputSpecId() {
+    final int outputSpecId =

Review Comment:
   Nit: Would you mind remove 'final' here?  I know from  @aokolnychyi (when he reviewed me change), he prefers not to have extra finals except in class fields, as modern compiler usually adds it anyway.  ex: https://github.com/apache/iceberg/pull/4293#discussion_r823011381



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a diff in pull request #7120: Spark - Accept an `output-spec-id` that allows writing to a desired partition spec

Posted by "gustavoatt (via GitHub)" <gi...@apache.org>.
gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1164486623


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java:
##########
@@ -51,5 +51,8 @@ private SparkWriteOptions() {}
   public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
       "handle-timestamp-without-timezone";
 
+  // Output partition spec ID where writes should go.

Review Comment:
   Agreed. Removed it as it is self-explanatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org