You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/28 04:27:27 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request, #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

stevenzwu opened a new pull request, #5873:
URL: https://github.com/apache/iceberg/pull/5873

   …te. This would support Flink's allowNonRestoredState in case operator uid changed.
   
   Otherwise, we can get exception like this.
   --------------------------
   java.util.NoSuchElementException
   	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1000)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:175)
   
   See more about `allowNonRestoredState` in Flink doc: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#discussion_r982040578


##########
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -149,7 +149,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;

Review Comment:
   Would this mask other errors? Like if somehow the savepoint is corrupted?
   
   Shall we check if the `allowNonRestoredState` is true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary merged pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring state

Posted by GitBox <gi...@apache.org>.
pvary merged PR #5873:
URL: https://github.com/apache/iceberg/pull/5873


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#discussion_r982577657


##########
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -149,7 +149,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;

Review Comment:
   `allowNonRestoredState` is not exposed in the operator context. I think it should be safe. This was not checked before because we assumed restored checkpoint/savepoint should have the jobId and other committer state. This assumption is not true with operator uid change and `allowNonRestoredState` enabled. if `allowNonRestoredState` is not enabled, Flink will fail the restore with an explicit error when uid changed. 
   
   `allowNonRestoredState` is disabled by default in Flink. Users should only enable it when they are aware of the implications that some non-matched operator state will be discarded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#discussion_r982638333


##########
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -149,7 +149,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;

Review Comment:
   normally, when we talk about corrupted Flink checkpoint or savepoint, it is corrupted checkpoint metadata file or RocksDB SST files. those shouldn't be affected by this change, as they typically shown up as parsing errors.
   
   operator uid change with would cause the issue that this PR tries to fix.



##########
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -149,7 +149,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;

Review Comment:
   normally, when we talk about corrupted Flink checkpoint or savepoint, it is corrupted checkpoint metadata file or RocksDB SST files. those shouldn't be affected by this change, as they typically shown up as parsing errors.
   
   operator uid change with allowNonRestoredState would cause the issue that this PR tries to fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#discussion_r982724320


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -156,7 +156,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;
+      }
+
+      String restoredFlinkJobId = jobIdIterable.iterator().next();
       Preconditions.checkState(

Review Comment:
   thx, I would keep this Preconditions check. it should never happen unless there is some code bug or unforeseen weird bug. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring state

Posted by GitBox <gi...@apache.org>.
pvary commented on PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#issuecomment-1261763050

   Thanks @stevenzwu for the fix and @abmo-x for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

Posted by GitBox <gi...@apache.org>.
abmo-x commented on code in PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#discussion_r982674737


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -156,7 +156,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;
+      }
+
+      String restoredFlinkJobId = jobIdIterable.iterator().next();
       Preconditions.checkState(

Review Comment:
   can this check be removed now as we added a new check above on the iterator? 
   or do you expect the iterable to have null or empty id?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5873: Flink: add defensive check in IcebergFilesCommitter for restoring sta…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5873:
URL: https://github.com/apache/iceberg/pull/5873#discussion_r982577657


##########
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -149,7 +149,18 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      String restoredFlinkJobId = jobIdState.get().iterator().next();
+      Iterable<String> jobIdIterable = jobIdState.get();
+      if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
+        LOG.warn(
+            "Failed to restore committer state. This can happen when operator uid changed and Flink "
+                + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id "
+                + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. "
+                + "Otherwise, Flink auto generate an operator uid based on job topology."
+                + "With that, operator uid is subjective to change upon topology change.");
+        return;

Review Comment:
   `allowNonRestoredState` is not exposed in the operator context. I think it should be safe. This was not checked before because we assumed restored checkpoint/savepoint should have the jobId and other committer state. This assumption is not true with operator uid change and `allowNonRestoredState` enabled. if `allowNonRestoredState` is not enabled, Flink will fail the restore with an explicit error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org