You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/28 15:48:24 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

stevenzwu commented on code in PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#discussion_r982577657


##########
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -149,7 +149,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;

Review Comment:
   `allowNonRestoredState` is not exposed in the operator context. I think it should be safe. This was not checked before because we assumed restored checkpoint/savepoint should have the jobId and other committer state. This assumption is not true with operator uid change and `allowNonRestoredState` enabled. if `allowNonRestoredState` is not enabled, Flink will fail the restore with an explicit error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org