You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "ConeyLiu (via GitHub)" <gi...@apache.org> on 2023/03/22 11:05:26 UTC

[GitHub] [iceberg] ConeyLiu opened a new pull request, #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

ConeyLiu opened a new pull request, #7171:
URL: https://github.com/apache/iceberg/pull/7171

   We use the current PartitionSpec for IcebergStreamWriter, which is fixed and will not change after the job started. While the PartitionSpec for IcebergStreamWriter is refreshed with the table snapshot changing. This could fail the fink sink job when updating the partition spec. Because we use the wrong partition spec to write those DataFiles/DeleteFiles to ManifestFile.
   
   For example, we got the following error when updating the partition spec:
   
   <img width="940" alt="error" src="https://user-images.githubusercontent.com/12733256/226884127-cc8d6732-7cec-4a31-bfc1-0b2ccebc3780.png">
   
   In this patch, we keep the same PartitionSpec between `IcebergStreamWriter` and `IcebergFilesCommitter`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1538224575

   Thanks @stevenzwu for the review, and I am sorry for the later response due to some things to do. I updated the fixes implementation. Please take another look when you are free, thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1193129812


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -440,10 +442,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException {
       return EMPTY_MANIFEST_DATA;
     }
 
+    // Refresh table to get the latest specs map
+    table.refresh();

Review Comment:
   It should be reverted. Will update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1189468808


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;
 
     // Write the completed data files into a newly created data manifest file.
     if (result.dataFiles() != null && result.dataFiles().length > 0) {
+      int specId = result.dataFiles()[0].specId();
+      spec = specsById.get(specId);
+      Preconditions.checkState(
+          Arrays.stream(result.dataFiles()).allMatch(file -> file.specId() == specId),

Review Comment:
   I just think this is more concise and easier to understand, we don’t need to take the first one out separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1189469467


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -134,6 +135,19 @@ public static DataFile writeFile(
       String filename,
       List<RowData> rows)
       throws IOException {
+    return writeFile(table, schema, spec, conf, location, filename, rows, null);
+  }
+
+  public static DataFile writeFile(

Review Comment:
   Yes, it's not necessary. Just wondering if we could explain it briefly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1196359776


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +887,130 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              table.spec().specId(), specId),
+          table.spec().specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      PartitionSpec oldSpec = table.spec();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId++;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // write data with old partition spec
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData), oldSpec, null);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      snapshot = harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              oldSpec.specId(), specId),
+          oldSpec.specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+      Assert.assertEquals(

Review Comment:
   Removed



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +887,130 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              table.spec().specId(), specId),
+          table.spec().specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      PartitionSpec oldSpec = table.spec();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId++;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // write data with old partition spec
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData), oldSpec, null);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      snapshot = harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              oldSpec.specId(), specId),
+          oldSpec.specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+      Assert.assertEquals(
+          TestIcebergFilesCommitter.class.getName(),
+          SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
+    }
+
+    // Restore from the given snapshot
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.getStreamConfig().setOperatorID(operatorId);
+      harness.setup();
+      harness.initializeState(snapshot);
+      harness.open();
+
+      SimpleDataUtil.assertTableRows(table, rows, branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+
+      checkpointId++;
+      RowData row = SimpleDataUtil.createRowData(checkpointId, "world" + checkpointId);
+      StructLike partition = new PartitionData(table.spec().partitionType());
+      partition.set(0, checkpointId);
+      dataFile =
+          writeDataFile("data-" + checkpointId, ImmutableList.of(row), table.spec(), partition);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(row);
+      harness.snapshot(checkpointId, ++timestamp);
+      assertFlinkManifests(1);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1194990767


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;

Review Comment:
   @stevenzwu Thanks for the explanation, keep the original implementation and pass the same partition spec to `IcebergFilesCommitter `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1195914753


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +887,130 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(

Review Comment:
   nit: use assertj instead. 
   https://iceberg.apache.org/contribute/#assertj
   
   then we can use format string for error msg
   ```
     public SELF as(String description, Object... args) {
       return super.as(description, args);
     }
   ```
   
   



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +887,130 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              table.spec().specId(), specId),
+          table.spec().specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      PartitionSpec oldSpec = table.spec();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId++;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // write data with old partition spec
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData), oldSpec, null);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      snapshot = harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              oldSpec.specId(), specId),
+          oldSpec.specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+      Assert.assertEquals(
+          TestIcebergFilesCommitter.class.getName(),
+          SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
+    }
+
+    // Restore from the given snapshot
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.getStreamConfig().setOperatorID(operatorId);
+      harness.setup();
+      harness.initializeState(snapshot);
+      harness.open();
+
+      SimpleDataUtil.assertTableRows(table, rows, branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+
+      checkpointId++;
+      RowData row = SimpleDataUtil.createRowData(checkpointId, "world" + checkpointId);
+      StructLike partition = new PartitionData(table.spec().partitionType());
+      partition.set(0, checkpointId);
+      dataFile =
+          writeDataFile("data-" + checkpointId, ImmutableList.of(row), table.spec(), partition);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(row);
+      harness.snapshot(checkpointId, ++timestamp);
+      assertFlinkManifests(1);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(

Review Comment:
   nit: same comment for assertj style



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +887,130 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              table.spec().specId(), specId),
+          table.spec().specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      PartitionSpec oldSpec = table.spec();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId++;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // write data with old partition spec
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData), oldSpec, null);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      snapshot = harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              oldSpec.specId(), specId),
+          oldSpec.specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+      Assert.assertEquals(

Review Comment:
   nit: not sure if this assertion is needed in this test



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +887,130 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(

Review Comment:
   the error msg might be unnecessary with the assertj style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1479378399

   cc @chenjunjiedada @stevenzwu @rdblue Pls take a look when you are free. Thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1193053642


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -440,10 +442,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException {
       return EMPTY_MANIFEST_DATA;
     }
 
+    // Refresh table to get the latest specs map
+    table.refresh();

Review Comment:
   is this step necessary? 
   
   this is definitely not needed if we go with the read-only table approach for partition spec.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;

Review Comment:
   > We use a SerializableTable instance to create IcebergStreamWriter. The PartitionSpec of SerializableTable is fixed and will not change after the job started. While the PartitionSpec for IcebergFilesCommitter is refreshed with the table snapshot changing. This could fail the fink sink job when updating the partition spec in another job. Because we use the wrong partition spec to write those DataFiles/DeleteFiles to ManifestFile.
   
   your original description analyzed the problem very well. I am wondering if we should just pass the same read-only `SerializableTable` to `IcebergFilesCommitter` so that it also use the same table spec as the `IcebergStreamingWriter/RowDataTaskWriterFactory`. 
   
   @hililiwei @Reo-LEI what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1193189896


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;

Review Comment:
   I am only talking about write path. 
   
   for read path, you are passing in the `specsById` which should work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1550643780

   > @stevenzwu, We also have a requirement to migrate the table without restarting the Flink job since users may have thousands of production streaming jobs online. Right now, I don't have a full solution in my mind, the early thinking is to notify the task manager to update the writer after checkpoint. Do you have a such kind requirement as well? Any idea?
   
   @chenjunjiedada we probably can take this discussion in a separate issue. I remember some previous ask in this area about handling table schema evolution without manual intervention. I couldn't seem to find the PR or issue. there are two slightly different asks.
   
   1. table schema is already updated/synced via external mechanism (like control plane). Just need the writer and committer to pick up the latest schema (or partition spec) without job restart.
   2. need writer to detect table schema is out of sync with the record schema. automatically update the table schema and write with latest schema.
   
   case 1 can be implemented with resolving the write schema (or partition spec) not during job initialization, rather during task initialization. writers periodically check (e.g. every checkpoint cycle) if table schema or partition spec changed. if changed, writers can fail the job. Restart and task initialization will load the latest schema and spec. However, it does bring scalability concern because every writer task (hundreds or more) need to load a Iceberg table from catalog to retrieve the schema and partition spec.
   
   Case 2 can be implemented similarly. But it is more risky. if bad records (schema) can cause unintended change in Iceberg table schema. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu merged PR #7171:
URL: https://github.com/apache/iceberg/pull/7171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1156945498


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,40 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {

Review Comment:
   On second thought, I don't think this UT is stable enough to test your case.  My test also confirmed my point, I ran the UT many times, sometimes it passed, sometimes it didn't.
   
   I think we should use `TestIcebergFilesCommitter` to test it. Specifically, we can change the table's spec between commits to validate your code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1156945498


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,40 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {

Review Comment:
   On second thought, I don't think this UT is stable enough.  My test also confirmed my point, I ran the UT many times, sometimes it passed, sometimes it didn't.
   
   I think we should use `TestIcebergFilesCommitter` to test it. Specifically, we can change the table's spec between commits to validate your code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1161386084


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -145,6 +151,7 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.tableLoader.open();
     this.table = tableLoader.loadTable();
     this.committerMetrics = new IcebergFilesCommitterMetrics(super.metrics, table.name());
+    this.spec = table.specs().get(specId);

Review Comment:
   why don't we pass the `spec` in directly?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();

Review Comment:
   I think we should verify the restore works after partition spec change. 



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId = 2;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec again

Review Comment:
   I don't know if it is necessary to test partition spec change again



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -443,7 +451,7 @@ private byte[] writeToManifest(long checkpointId) throws IOException {
     WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> manifestOutputFileFactory.create(checkpointId), table.spec());
+            result, () -> manifestOutputFileFactory.create(checkpointId), spec);

Review Comment:
   looks like this is the key fix



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId = 2;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);

Review Comment:
   we should verify that the staging manifest file is still written with the old partition spec when the committer operator was created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1501440604

   @stevenzwu I'm sorry for the mistake. Just updated the descriptions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zinking commented on pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "zinking (via GitHub)" <gi...@apache.org>.
zinking commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1484982586

   @szehon-ho  @stevenzwu  can we get this merged ?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1553404140

   thanks @ConeyLiu for the fix and @Reo-LEI and @hililiwei for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1187351709


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;

Review Comment:
   Updated the code implementation. Now get the true partition spec from the data files/delete files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1193129607


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;

Review Comment:
   There is a problem that the job's partition spec maybe not same as the one in the state when we restore the state from the restart job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1153093254


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,45 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {
+    List<List<Row>> rows =
+        IntStream.range(0, 10)
+            .mapToObj(i -> createRows(String.valueOf(i)))
+            .collect(Collectors.toList());
+
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .append();
+
+    Thread thread = null;
+    if (partitioned) {

Review Comment:
   We can use Assume to test only partitioned tables, as it seems pointless to do so for non-partitioned tables. Alternatively, we can only add partition fields instead of deleting them, so that we can test them in any case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1491531379

   Thanks @hililiwei, comment has been addressed. Pls take another look.
   Also CC @jackye1995 @nastra @Fokko, could you take a look when you are free? Thanks in advance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1154185477


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,45 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {
+    List<List<Row>> rows =
+        IntStream.range(0, 10)
+            .mapToObj(i -> createRows(String.valueOf(i)))
+            .collect(Collectors.toList());
+
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .append();
+
+    Thread thread = null;
+    if (partitioned) {

Review Comment:
   Good idea, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1189284015


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,40 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {

Review Comment:
   Sorry for the late reply. The main problem is `multi-thread`, sometimes it doesn't work as expected, but I can't remember the details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1187353142


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();

Review Comment:
   Add the testing.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId = 2;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);

Review Comment:
   Add the testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1193735416


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;

Review Comment:
   Sorry, I got mixed up with something. Keep the original implementation.
   
   >  I am wondering if we should just pass the same read-only SerializableTable to IcebergFilesCommitter so that it also use the same table spec as the IcebergStreamingWriter/RowDataTaskWriterFactory.
   
   I think we should not use the `SerializableTable`, it can not commit the new files change. Or I missed something.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1196358788


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +887,130 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(

Review Comment:
   OK, updated with assertj style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1148894713


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,45 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {
+    List<List<Row>> rows =
+        IntStream.range(0, 10)
+            .mapToObj(i -> createRows(String.valueOf(i)))
+            .collect(Collectors.toList());
+
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .append();
+
+    Thread thread = null;
+    if (partitioned) {
+      thread =
+          new Thread(
+              () -> {
+                try {
+                  Thread.sleep(120);

Review Comment:
   `TestIcebergFilesCommitter ` only tests `IcebergFilesCommitter` self, the new UT aims to simulate the partition evolution while flink job running.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1193727904


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -440,10 +442,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException {
       return EMPTY_MANIFEST_DATA;
     }
 
+    // Refresh table to get the latest specs map
+    table.refresh();

Review Comment:
   Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1187356833


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId = 2;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec again

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1189404369


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;
 
     // Write the completed data files into a newly created data manifest file.
     if (result.dataFiles() != null && result.dataFiles().length > 0) {
+      int specId = result.dataFiles()[0].specId();
+      spec = specsById.get(specId);
+      Preconditions.checkState(
+          Arrays.stream(result.dataFiles()).allMatch(file -> file.specId() == specId),

Review Comment:
   Just curious, why do you prefer this? Is there any performance improvement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1553995909

   Thanks all. Will submit backport PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1189405422


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -134,6 +135,19 @@ public static DataFile writeFile(
       String filename,
       List<RowData> rows)
       throws IOException {
+    return writeFile(table, schema, spec, conf, location, filename, rows, null);
+  }
+
+  public static DataFile writeFile(

Review Comment:
   I could add it, but this is util for testing. Is it needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1194005805


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;

Review Comment:
   sorry for the confusion. For commit, we still need the TableLoader to load a regular table at the committer operator initialization. I was saying that we pass a read-only table or partition spec to the committer for writing manifest file. this partition spec is always the same as the writer operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1156945498


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,40 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {

Review Comment:
   On second thought, I don't think this UT is stable enough to test your case.  My test also confirmed my point, I ran the UT many times, sometimes it passed, sometimes it didn't.
   
   I think we should use `TestIcebergFilesCommitter` to test it. Specifically, we can change its table partition between commits to validate your code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1160502198


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,40 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {

Review Comment:
   Updated to `TestIcebergFilesCommitter `. @hililiwei which errors are you met? I have run several times and all are passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1187355584


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +887,130 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(
+          String.format(
+              "Expected the partition spec ID of staging manifest is %s, but the ID is: %s",
+              table.spec().specId(), specId),
+          table.spec().specId(),
+          specId);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      PartitionSpec oldSpec = table.spec();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId++;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // write data with old partition spec
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData), oldSpec, null);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      snapshot = harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      Assert.assertEquals(

Review Comment:
   Checking the staging manifest files are written with old partition spec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "Reo-LEI (via GitHub)" <gi...@apache.org>.
Reo-LEI commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1144681193


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,45 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {
+    List<List<Row>> rows =
+        IntStream.range(0, 10)
+            .mapToObj(i -> createRows(String.valueOf(i)))
+            .collect(Collectors.toList());
+
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .append();
+
+    Thread thread = null;
+    if (partitioned) {
+      thread =
+          new Thread(
+              () -> {
+                try {
+                  Thread.sleep(120);

Review Comment:
   I think we can test spce changes in `TestIcebergFilesCommitter` instead of testing here by starting another thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1189406015


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,40 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {

Review Comment:
   Thanks for the reply, the UTs have updated to `TestIcebergFilesCommitter `.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1548779483

    > I am wondering if we should just pass the same read-only SerializableTable to IcebergFilesCommitter so that it also use the same table spec as the IcebergStreamingWriter/RowDataTaskWriterFactory.
   
   @stevenzwu, We also have a requirement to migrate the table without restarting the Flink job since users may have thousands of production streaming jobs online.  Right now, I don't have a full solution in my mind, the early thinking is to notify the task manager to update the writer after checkpoint. Do you have a such kind requirement as well? Any idea?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1189289880


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;
 
     // Write the completed data files into a newly created data manifest file.
     if (result.dataFiles() != null && result.dataFiles().length > 0) {
+      int specId = result.dataFiles()[0].specId();
+      spec = specsById.get(specId);
+      Preconditions.checkState(
+          Arrays.stream(result.dataFiles()).allMatch(file -> file.specId() == specId),
+          "All data files should have same partition spec");
       dataManifest =
           writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
     }
 
     // Write the completed delete files into a newly created delete manifest file.
     if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
+      if (spec == null) {
+        spec = specsById.get(result.deleteFiles()[0].specId());
+      }
+      int specId = spec.specId();

Review Comment:
   nit: new empty line.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java:
##########
@@ -73,21 +78,43 @@ static ManifestOutputFileFactory createOutputFileFactory(
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      Map<Integer, PartitionSpec> specsById)
       throws IOException {
 
     ManifestFile dataManifest = null;
     ManifestFile deleteManifest = null;
+    PartitionSpec spec = null;
 
     // Write the completed data files into a newly created data manifest file.
     if (result.dataFiles() != null && result.dataFiles().length > 0) {
+      int specId = result.dataFiles()[0].specId();
+      spec = specsById.get(specId);
+      Preconditions.checkState(
+          Arrays.stream(result.dataFiles()).allMatch(file -> file.specId() == specId),

Review Comment:
   use `Arrays.stream(result.deleteFiles()).map(ContentFile::specId).distinct().count() > 1` ?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -134,6 +135,19 @@ public static DataFile writeFile(
       String filename,
       List<RowData> rows)
       throws IOException {
+    return writeFile(table, schema, spec, conf, location, filename, rows, null);
+  }
+
+  public static DataFile writeFile(

Review Comment:
   nit: add doc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "Reo-LEI (via GitHub)" <gi...@apache.org>.
Reo-LEI commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1156976982


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -390,4 +393,40 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
           return null;
         });
   }
+
+  @Test
+  public void testPartitionEvolution() throws Exception {

Review Comment:
   +1, use multi-thread in unit test will cause unstable result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#issuecomment-1501308106

   > We use the current PartitionSpec for IcebergStreamWriter, which is fixed and will not change after the job started. While the PartitionSpec for IcebergStreamWriter is refreshed with the table snapshot changing.
   
   @ConeyLiu is there a typo in the above descriptions. seems contradicting each other.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7171: Flink: Fixes flink sink failed due to updating partition spec

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1187352673


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -443,7 +451,7 @@ private byte[] writeToManifest(long checkpointId) throws IOException {
     WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> manifestOutputFileFactory.create(checkpointId), table.spec());
+            result, () -> manifestOutputFileFactory.create(checkpointId), spec);

Review Comment:
   The code updated, please see the following comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org