You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/28 07:26:22 UTC

[GitHub] [iceberg] pvary commented on a diff in pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

pvary commented on code in PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#discussion_r982040578


##########
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -149,7 +149,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;

Review Comment:
   Would this mask other errors? Like if somehow the savepoint is corrupted?
   
   Shall we check if the `allowNonRestoredState` is true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org