You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "stevenzwu (via GitHub)" <gi...@apache.org> on 2023/04/10 15:47:36 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7171: Flink: IcebergFilesCommitter should use same PartitionSpec as the IcebergStreamWriter

stevenzwu commented on code in PR #7171:
URL: https://github.com/apache/iceberg/pull/7171#discussion_r1161386084


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -145,6 +151,7 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.tableLoader.open();
     this.table = tableLoader.loadTable();
     this.committerMetrics = new IcebergFilesCommitterMetrics(super.metrics, table.name());
+    this.spec = table.specs().get(specId);

Review Comment:
   why don't we pass the `spec` in directly?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();

Review Comment:
   I think we should verify the restore works after partition spec change. 



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId = 2;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec again

Review Comment:
   I don't know if it is necessary to test partition spec change again



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -443,7 +451,7 @@ private byte[] writeToManifest(long checkpointId) throws IOException {
     WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> manifestOutputFileFactory.create(checkpointId), table.spec());
+            result, () -> manifestOutputFileFactory.create(checkpointId), spec);

Review Comment:
   looks like this is the key fix



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+
+      int checkpointId = 1;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId = 2;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);

Review Comment:
   we should verify that the staging manifest file is still written with the old partition spec when the committer operator was created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org