You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/31 13:45:52 UTC

[GitHub] [iceberg] openinx opened a new pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

openinx opened a new pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404


   This patch will address the comment from the [discussion](https://github.com/apache/iceberg/pull/1185/files#r479545979).  FYI @stevenzwu @rdblue @JingsongLi . 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r480646124



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,26 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,
-          "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId);
-
-      SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
-      // Only keep the uncommitted data files in the cache.
-      this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
+      String oldFlinkJobId = jobIdState.get().iterator().next();
+      Preconditions.checkState(oldFlinkJobId != null && oldFlinkJobId.length() > 0,
+          "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+      long oldMaxCommittedCheckpointId = getMaxCommittedCheckpointId(table, oldFlinkJobId);

Review comment:
       The `tailMap` will use the oldMaxCommittedCheckpointId to filter all committed data files, so even if we move the line inside if-else,  it still need to access the `oldMaxCommittedCheckpointId` outside it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r486116044



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,25 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
       this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,
-          "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId);
-
-      SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
-      // Only keep the uncommitted data files in the cache.
-      this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
+
+      String oldFlinkJobId = jobIdState.get().iterator().next();
+      Preconditions.checkState(oldFlinkJobId != null && oldFlinkJobId.length() > 0,

Review comment:
       OK, you are right.  the `Strings.isNotEmpty` is from another dependency,   should be OK to use `!Strings.isNullOrEmpty` in guava.  Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#issuecomment-689977302


   @openinx LGTM for the scope. Added a few minor comments.
   
   Since this is still early stage, I would image we will have incompatible state change in the future, e.g. if we decide to go with the manifest file checkpoint approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r480230265



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -88,6 +88,13 @@
   private transient Table table;
   private transient long maxCommittedCheckpointId;
 
+  // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
+  // same flink job; another case is restoring from snapshot created by another different job. For the second case, we
+  // need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
+  // traversing iceberg table's snapshots.
+  private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(

Review comment:
       I am wondering if it is better to consolidate all committer checkpoint states into a single structure, e.g. Pojo class or Avro record which are Flink supported state types for schema evolution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r480656851



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -151,13 +169,16 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
     if (checkpointId > maxCommittedCheckpointId) {
-      commitUpToCheckpoint(checkpointId);
+      commitUpToCheckpoint(table, dataFilesPerCheckpoint, flinkJobId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     }
   }
 
-  private void commitUpToCheckpoint(long checkpointId) {
-    NavigableMap<Long, List<DataFile>> pendingFileMap = dataFilesPerCheckpoint.headMap(checkpointId, true);
+  private static void commitUpToCheckpoint(Table table,

Review comment:
       Because I want the user to explicitly write the parameters on the method,  to avoid unexpected bugs caused by the usage of the internal variables of the class.  I encountered the bug before because  I used the current `flinkJobId` [here](https://github.com/apache/iceberg/pull/1404/files#diff-768ddf11822e9ddb5fd460fb4978d79dR191) unintentionally when `commitUpToCheckpoint` in the restoring [path](https://github.com/apache/iceberg/pull/1404/files#diff-768ddf11822e9ddb5fd460fb4978d79dR136),   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#issuecomment-689977302


   @openinx LGTM for the scope.
   
   Since this is still early stage, I would image we will have incompatible state change in the future, e.g. if we decide to go with the manifest file checkpoint approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r486117107



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,25 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
       this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,
-          "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId);
-
-      SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
-      // Only keep the uncommitted data files in the cache.
-      this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
+
+      String oldFlinkJobId = jobIdState.get().iterator().next();
+      Preconditions.checkState(oldFlinkJobId != null && oldFlinkJobId.length() > 0,
+          "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+      long oldMaxCommittedCheckpointId = flinkJobId.equals(oldFlinkJobId) ?

Review comment:
       Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even if it's restored from a snapshot created by another different flink job, so it's safe to assign the max committed
   checkpoint id from restored flink job to the current flink job.
   Yeah, we could make it more simpler now. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#issuecomment-693638294


   This looks good to me so I merged it. 
   
   I agree with @stevenzwu that the serialized state is a compatibility concern. If we eventually plan to move to writing files from a checkpoint as a manifest and that would be a breaking change, then I think it makes sense to get that change done as soon as possible. That would solve the serialization problem, save a later breaking change, and make checkpoints faster. I'd prefer to do this before the 0.10.0 release, if possible.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#issuecomment-693200249


   Ping @rdblue @stevenzwu , Other concerns ?   I think we'd better to include this patch in the next release, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#issuecomment-689441843


   Ping @rdblue ,  @JingsongLi @stevenzwu  ,   any other concern ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r486106317



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,25 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
       this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,
-          "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId);
-
-      SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
-      // Only keep the uncommitted data files in the cache.
-      this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
+
+      String oldFlinkJobId = jobIdState.get().iterator().next();
+      Preconditions.checkState(oldFlinkJobId != null && oldFlinkJobId.length() > 0,

Review comment:
       I think you mean `Strings.isNotEmpty` ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r486059813



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,25 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
       this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,
-          "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId);
-
-      SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
-      // Only keep the uncommitted data files in the cache.
-      this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
+
+      String oldFlinkJobId = jobIdState.get().iterator().next();
+      Preconditions.checkState(oldFlinkJobId != null && oldFlinkJobId.length() > 0,
+          "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+      long oldMaxCommittedCheckpointId = flinkJobId.equals(oldFlinkJobId) ?

Review comment:
       nit: maybe a simpler version? we can also remove the line 120 above
   ```
   long maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r486058384



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,25 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
       this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,
-          "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId);
-
-      SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
-      // Only keep the uncommitted data files in the cache.
-      this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
+
+      String oldFlinkJobId = jobIdState.get().iterator().next();

Review comment:
       nit: oldFlinkJobId -> restoredFlinkJobId, since it may be the same job id for failure recovery case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r480219376



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,26 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,
-          "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId);
-
-      SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
-      // Only keep the uncommitted data files in the cache.
-      this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
+      String oldFlinkJobId = jobIdState.get().iterator().next();
+      Preconditions.checkState(oldFlinkJobId != null && oldFlinkJobId.length() > 0,
+          "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+      long oldMaxCommittedCheckpointId = getMaxCommittedCheckpointId(table, oldFlinkJobId);

Review comment:
       nit: move this inside the if section to make the if-else more symmetric.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r481254536



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -88,6 +88,13 @@
   private transient Table table;
   private transient long maxCommittedCheckpointId;
 
+  // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
+  // same flink job; another case is restoring from snapshot created by another different job. For the second case, we
+  // need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
+  // traversing iceberg table's snapshots.
+  private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(

Review comment:
       fair enough. agree that we shouldn't expose the GenericDataFile. 
   
   On the other hand, this can be simpler if we decided to just checkpoint the manifest file. Then we only need to track the manifest file path along with other metadata. Maybe we can follow up on the issue: https://github.com/apache/iceberg/issues/1403
   
   Here is our Avro schema file.
   ```
   {
       "type": "record",
       "name": "ManifestFileState",
       "namespace": "com.netflix.spaas.nfflink.connector.iceberg.model",
       "fields": [
           {"name":"path", "type":"string"},
           {"name":"length", "type":"long"},
           {"name":"specId", "type":"int"},
           { "name":"checkpointId", "type": "long"},
           { "name":"checkpointTimestamp", "type": "long"},
           { "name":"dataFileCount", "type": "long"},
           { "name":"recordCount", "type": "long"},
           { "name":"byteCount", "type": "long"},
           { "name":"lowWatermark", "type": ["null", "long"], "default": null},
           { "name":"highWatermark", "type": ["null", "long"], "default": null}
       ]
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#issuecomment-693757280


   Agreed, I would prepare patch for that today.   Thanks for the merging. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r480150633



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,26 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
-      this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,

Review comment:
       Assume the case:   the flink job bootstrap first and write few records,  after the first `snapshotState(1)` finished, its job crashed, then it started to restore from the first snapshot.  Because we don't commit any iceberg transaction, so the maxCommittedCheckpointId will be -1,  finally the job won't recover successfully. 
   
   So in theory, this `Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID)` should be incorrect here.   For the snapshot expiration case,  we may need to find other ways to deal with. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r480686600



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -88,6 +88,13 @@
   private transient Table table;
   private transient long maxCommittedCheckpointId;
 
+  // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
+  // same flink job; another case is restoring from snapshot created by another different job. For the second case, we
+  // need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
+  // traversing iceberg table's snapshots.
+  private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(

Review comment:
       We've considered to use avro or pojo to consolidate the job-id and data files into a single structure. It could be considered as two separate issue: 
   1.  use avro or pojo to serialize/deserialize.  If use avro, then we need the detailed schema for the whole structure,  but the `DataFile`  hides its schema inside a non-public implementation `GenericDataFile`, that was designed intentionally because we don't want to expose the detail schema to upper users for iceberg-core.  POJO need all fields provide getter/setter, while the DataFile don't support setter now.  More discussion could be found. 
   2. Making them into a single structure.  I think we could do but I'm not sure what's the benefit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r486058202



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -108,18 +115,25 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
     if (context.isRestored()) {
       this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
-      // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed
-      // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
-      // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
-      // the iceberg table.
-      Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID,
-          "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId);
-
-      SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
-      // Only keep the uncommitted data files in the cache.
-      this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
+
+      String oldFlinkJobId = jobIdState.get().iterator().next();
+      Preconditions.checkState(oldFlinkJobId != null && oldFlinkJobId.length() > 0,

Review comment:
       nit: since we are using guava, we can use `Strings.isNullOrEmpty`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r481254536



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -88,6 +88,13 @@
   private transient Table table;
   private transient long maxCommittedCheckpointId;
 
+  // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
+  // same flink job; another case is restoring from snapshot created by another different job. For the second case, we
+  // need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
+  // traversing iceberg table's snapshots.
+  private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(

Review comment:
       fair enough. agree that we should expose the GenericDataFile. 
   
   On the other hand, this can be simpler if we decided to just checkpoint the manifest file. Then we only need to track the manifest file path along with other metadata. Maybe we can follow up on the issue: https://github.com/apache/iceberg/issues/1403
   
   Here is our Avro schema file.
   ```
   {
       "type": "record",
       "name": "ManifestFileState",
       "namespace": "com.netflix.spaas.nfflink.connector.iceberg.model",
       "fields": [
           {"name":"path", "type":"string"},
           {"name":"length", "type":"long"},
           {"name":"specId", "type":"int"},
           { "name":"checkpointId", "type": "long"},
           { "name":"checkpointTimestamp", "type": "long"},
           { "name":"dataFileCount", "type": "long"},
           { "name":"recordCount", "type": "long"},
           { "name":"byteCount", "type": "long"},
           { "name":"lowWatermark", "type": ["null", "long"], "default": null},
           { "name":"highWatermark", "type": ["null", "long"], "default": null}
       ]
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r480248357



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -151,13 +169,16 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
     if (checkpointId > maxCommittedCheckpointId) {
-      commitUpToCheckpoint(checkpointId);
+      commitUpToCheckpoint(table, dataFilesPerCheckpoint, flinkJobId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     }
   }
 
-  private void commitUpToCheckpoint(long checkpointId) {
-    NavigableMap<Long, List<DataFile>> pendingFileMap = dataFilesPerCheckpoint.headMap(checkpointId, true);
+  private static void commitUpToCheckpoint(Table table,

Review comment:
       what is the reason making this a static method now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r480686600



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -88,6 +88,13 @@
   private transient Table table;
   private transient long maxCommittedCheckpointId;
 
+  // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
+  // same flink job; another case is restoring from snapshot created by another different job. For the second case, we
+  // need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
+  // traversing iceberg table's snapshots.
+  private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(

Review comment:
       We've considered to use avro or pojo to consolidate the job-id and data files into a single structure. It could be considered as two separate issue: 
   1.  use avro or pojo to serialize/deserialize.  If use avro, then we need the detailed schema for the whole structure,  but the `DataFile`  hides its schema inside a non-public implementation `GenericDataFile`, that was designed intentionally because we don't want to expose the detail schema to upper users for iceberg-core.  POJO need all fields provide getter/setter, while the DataFile don't support setter now. 
   2. Making them into a single structure.  I think we could do but I'm not sure what's the benefit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #1404: Flink: Add flink job id to state backend for handling flink job redeployment

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #1404:
URL: https://github.com/apache/iceberg/pull/1404#discussion_r481254536



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -88,6 +88,13 @@
   private transient Table table;
   private transient long maxCommittedCheckpointId;
 
+  // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
+  // same flink job; another case is restoring from snapshot created by another different job. For the second case, we
+  // need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
+  // traversing iceberg table's snapshots.
+  private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(

Review comment:
       fair enough. agree that we should expose the GenericDataFile.
   
   On the other hand, this can be simpler if we decided to just checkpoint the manifest file. Then we only need to track the manifest file path along with other metadata. Here is our Avro schema file.
   
   ```
   {
       "type": "record",
       "name": "ManifestFileState",
       "namespace": "com.netflix.spaas.nfflink.connector.iceberg.model",
       "fields": [
           {"name":"path", "type":"string"},
           {"name":"length", "type":"long"},
           {"name":"specId", "type":"int"},
           { "name":"checkpointId", "type": "long"},
           { "name":"checkpointTimestamp", "type": "long"},
           { "name":"dataFileCount", "type": "long"},
           { "name":"recordCount", "type": "long"},
           { "name":"byteCount", "type": "long"},
           { "name":"lowWatermark", "type": ["null", "long"], "default": null},
           { "name":"highWatermark", "type": ["null", "long"], "default": null}
       ]
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org