You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/02 08:54:21 UTC

[GitHub] [iceberg] moon-fall opened a new pull request #3647: retain flinkCheckpointInfo when rewrite datafiles to ensure restore flink job from checkpoint success

moon-fall opened a new pull request #3647:
URL: https://github.com/apache/iceberg/pull/3647


   when flink job restore from checkpoint,IcebergFilesCommitter read the maxCommittedCheckpointId from summary in metadata file, and only commit the uncommitted DataFiles from maxCommittedCheckpointId +1,get maxCommittedCheckpointId will traverse iceberg table's snapshots until get the maxCommittedCheckpointId which is not null,but if after rewrite data files ,expire snapshots and only retain the latest snapshot after rewrite data files, will lost the maxCommittedCheckpointId info and the flink job restore operation will repeatedly commit the datafile that has committed,then an error will be reported like this because manifest file has been deleted:
   
   > org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: file:/tmp/iceberg10/metadata/41b2614f36cb4fff57efbecd063827ba-00000-0-3-00003.avro
   	at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:177)
   	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
   	at org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
   	at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:103)
   	at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:87)
   	at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:71)
   	at org.apache.iceberg.flink.sink.FlinkManifestUtil.readDataFiles(FlinkManifestUtil.java:59)
   	at org.apache.iceberg.flink.sink.FlinkManifestUtil.readCompletedFiles(FlinkManifestUtil.java:105)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:207)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:153)
   	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
   	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
   	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:432)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
   	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.FileNotFoundException: File file:/tmp/iceberg10/metadata/41b2614f36cb4fff57efbecd063827ba-00000-0-3-00003.avro does not exist
   	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
   	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
   	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
   	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
   	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
   	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
   	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
   	at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:175)
   	... 19 more
   
   i think a way to ensure restore flink job from checkpoint success is to retain flinkCheckpointInfo when rewrite datafiles.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3647: retain flinkCheckpointInfo when rewrite datafiles to ensure restore flink job from checkpoint success

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3647:
URL: https://github.com/apache/iceberg/pull/3647#discussion_r774705689



##########
File path: core/src/main/java/org/apache/iceberg/SnapshotProducer.java
##########
@@ -232,6 +232,9 @@ public Snapshot apply() {
 
     ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
 
+    // merge flinkCheckpointInfo from previousSummary into currentSummary when rewrite datafiles
+    mergeFlinkCheckpointInfo(builder, previousSummary, summary);

Review comment:
       I don't think that we want to modify core to fix this. Instead of carrying metadata forward, I think we should instead look at older snapshots to find the max committed ID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3647: retain flinkCheckpointInfo when rewrite datafiles to ensure restore flink job from checkpoint success

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3647:
URL: https://github.com/apache/iceberg/pull/3647#issuecomment-1000443538


   @stevenzwu can you make a recommendation on the best way to fix this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3647: retain flinkCheckpointInfo when rewrite datafiles to ensure restore flink job from checkpoint success

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3647:
URL: https://github.com/apache/iceberg/pull/3647#discussion_r762170768



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -381,4 +383,31 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteDataFilesMergeCheckpointInfo() throws Exception {
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+
+    Map<String, String> summary = icebergTableUnPartitioned.currentSnapshot().summary();
+    String flinkJobIdBeforeRewrite = summary.get(SnapshotSummary.FLINK_JOB_ID);
+    String flinkMaxCommitedCheckpointIDBeforeRewrite = summary.get(SnapshotSummary.FLINK_MAX_COMMITTED_CHECKPOINT_ID);
+
+    Actions.forTable(icebergTableUnPartitioned)
+            .rewriteDataFiles()
+            .execute();

Review comment:
       Nit: both of these lines are over-indented. They should be indented 4 spaces (just after the i in Actions).

##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -381,4 +383,31 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteDataFilesMergeCheckpointInfo() throws Exception {
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+
+    Map<String, String> summary = icebergTableUnPartitioned.currentSnapshot().summary();
+    String flinkJobIdBeforeRewrite = summary.get(SnapshotSummary.FLINK_JOB_ID);
+    String flinkMaxCommitedCheckpointIDBeforeRewrite = summary.get(SnapshotSummary.FLINK_MAX_COMMITTED_CHECKPOINT_ID);
+
+    Actions.forTable(icebergTableUnPartitioned)
+            .rewriteDataFiles()
+            .execute();
+
+    icebergTableUnPartitioned.refresh();
+
+    Map<String, String> summaryAfterRewrite = icebergTableUnPartitioned.currentSnapshot().summary();
+
+    // Assert the flink job info retain after rewrite.
+    Assert.assertEquals("Should retain flinkJobId after rewrite",
+            flinkJobIdBeforeRewrite, summaryAfterRewrite.get(SnapshotSummary.FLINK_JOB_ID));
+    Assert.assertEquals("Should retain flinkMaxCommitedCheckpointID after rewrite",
+            flinkMaxCommitedCheckpointIDBeforeRewrite,
+            summaryAfterRewrite.get(SnapshotSummary.FLINK_MAX_COMMITTED_CHECKPOINT_ID));

Review comment:
       Nit: same note here about indentation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org