You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/12 07:09:33 UTC

[GitHub] [iceberg] coolderli opened a new issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

coolderli opened a new issue #2808:
URL: https://github.com/apache/iceberg/issues/2808


   In my flink job (a job write binlog to iceberg), it commits twice when a `CommitStateUnknownException` occurs. The exceptions is as follows:
   ```
   2021-07-12 13:11:47.859 WARN  org.apache.flink.runtime.taskmanager.Task                    - IcebergFilesCommitter -> Sink: IcebergSink iceberg_zjyprc_hadoop.dw_business.dwd_ord_ord_df (1/1)#21 (bd443bf5439bb8de4083b56499a8d346) switched from RUNNING to FAILED.
   org.apache.iceberg.exceptions.CommitStateUnknownException: org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
   Cannot determine whether the commit was successful or not, the underlying data files may or may not be needed. Manual intervention via the Remove Orphan Files Action can remove these files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.
   Please check to see whether or not your commit was successful before retrying this commit. Retrying an already successful operation will result in duplicate records or unintentional modifications.
   At this time no files will be deleted including possibly unused manifest lists.
   	at org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:271)
   	at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:124)
   	at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:307)
   	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
   	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213)
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197)
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
   	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:289)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:308)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:295)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:219)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:189)
   	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
   	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1089)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1054)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1077)
   	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
   	at org.apache.iceberg.relocated.com.google.common.base.Throwables.propagate(Throwables.java:241)
   	at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:80)
   	at org.apache.iceberg.hive.HiveTableOperations.lambda$persistTable$3(HiveTableOperations.java:308)
   	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
   	at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:77)
   	at org.apache.iceberg.hive.HiveTableOperations.persistTable(HiveTableOperations.java:304)
   	at org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:259)
   	... 25 more
   Caused by: org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
   	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129)
   	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
   	at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:376)
   	at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:453)
   	at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:435)
   	at org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37)
   	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
   	at org.apache.hadoop.hive.thrift.TFilterTransport.readAll(TFilterTransport.java:62)
   	at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
   	at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
   	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
   	at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
   	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:1375)
   	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:1359)
   	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_table(HiveMetaStoreClient.java:370)
   	at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:65)
   	at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:77)
   	... 30 more
   Caused by: java.net.SocketTimeoutException: Read timed out
   	at java.net.SocketInputStream.socketRead0(Native Method)
   	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
   	at java.net.SocketInputStream.read(SocketInputStream.java:171)
   	at java.net.SocketInputStream.read(SocketInputStream.java:141)
   	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
   	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
   	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
   	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
   	... 49 more
   2021-07-12 13:11:47.860 INFO  org.apache.flink.runtime.taskmanager.Task                    - Freeing task resources for IcebergFilesCommitter -> Sink: IcebergSink iceberg_zjyprc_hadoop.dw_business.dwd_ord_ord_df (1/1)#21 (bd443bf5439bb8de4083b56499a8d346).
   ```
   
   And I found duplicate snapshots:
   ![image](https://user-images.githubusercontent.com/38486782/125245060-add8ec80-e322-11eb-8e9b-269e458b85f9.png)
   
   I think we should check if the last snapshot is committed successfully before we commit it again in [initializeState](https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L154).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-913751666


   We need to come up with some good hypothesis. Right now, none makes sense. 
   
   Let me recap the two observations from the thread
   1. both transactions have the same `max-committed-checkpointid`
   2. those two duplicate transactions have linear relationship, as we can see from the parent snapshot id
   
   > Just quit the flink streaming job when encountering CommitStateUnknownException and let people to check whether it's OK to restart the flink job.
   
   I thought this already the case as @coolderli configured `execution.checkpointing.tolerable-failed-checkpoints=0`
   
   > Catch the CommitStateUnknownException in commitOperation, and retry to check the iceberg table whether it has been committed the stale txn.
   
   This is also our solution when we encountered this problem. Note that this is not bullet proof as re-check can also fail. But it should reduce the chance of happening a lot. However, this still doesn't explain why @coolderli were seeing the duplicate transactions.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
coolderli commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-913491550


   I didn't found the solution to this scenario. Maybe the hive meta store should ignore the socket timeout requests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wg1026688210 commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
wg1026688210 commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-895952463


   [initializeState will get the max-committed-checkpointid from iceberg table ](https://github.com/apache/iceberg/blob/7a3bfedf25cb74624ed898cec81e3bcadffb2373/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L143-L146) and  [notifyCheckpointComplete also check whether checkpointid is greater than max-committed-checkpointid](https://github.com/apache/iceberg/blob/7a3bfedf25cb74624ed898cec81e3bcadffb2373/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L188) .I  am  confused why the commits have the same  the max-committed-checkpointid


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-893979874


   This is an interesting scenario.
   
   IcebergFileCommitter [does the check upon restore/restart](https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L148-L155).
   
   Maybe in this case, job didn't restart? Checkpoint should fail. Did you set this [Flink config](https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints)? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
coolderli commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-878030626


   @openinx Can you please help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
coolderli commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-913489936


   @stevenzwu @wg1026688210  I have already set the config.  I've had thise problem #2632. But this is different.
   I think the socket timeout Txn1 is committed successfully finally and then the job restores before the Txn1 is committed, and the restored job commits normally. Then there will be two same `max-committed-checkpointid` snapshots.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-913602308


   Sorry for missed this issue , @coolderli !  That's an interesting case,  Let me consider what's the reason...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-898209231


   I didn't notice the `max-committed-checkpointid` are the same in those two commits. then the scenario described in my earlier comment is probably not the case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wg1026688210 commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
wg1026688210 commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-895956035


   did you set the 'execution.checkpointing.tolerable-failed-checkpoints' as @stevenzwu  said


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-913629145


   > I think the socket timeout Txn1 is committed successfully finally and then the job restores before the Txn1 is committed, and the restored job commits normally. Then there will be two same max-committed-checkpointid snapshots.
   
   This could explain why there're two same txn commits in the metadata.  I am thinking the candidate way to resolve this consistent issue are: 
   
   1. Just quit the flink streaming job when encountering CommitStateUnknownException  and let people to check whether it's OK to restart the flink job. 
   2. Catch the CommitStateUnknownException in [commitOperation](https://github.com/apache/iceberg/blob/e20088449daec9ed431754044b520b3ac5fa3eaa/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L308),  and retry to check the iceberg table whether it has been committed the stale txn.   If it has been exhausted and timeout to check the table ( I mean it does not commit the txn successfully finally) , then we start to failover.   In this way we will need to use an experience timeout to evaluate whether it's OK to stop to check the hive-metastore, and start the flink job failover....


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-893979874


   This is an interesting scenario.
   
   IcebergFileCommitter [does the check upon restore/restart](https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L148-L155).
   
   Maybe in this case, job didn't restart? Checkpoint should fail. But checkpoint failure doesn't necessarily cause job restart. Did you set this [Flink config](https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints)?  
   
   In the no-restart scenario, the next commit will commit a new set of files along with the old set of files (due to CommitStateUnknownException). Maybe when the next checkpoint and commit came, there was no new files. That could explain the scenario. This is one possible scenario that I can think of.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on issue #2808: Flink write cdc to iceberg may have duplicate records when an CommitStateUnknownException occurs.

Posted by GitBox <gi...@apache.org>.
coolderli commented on issue #2808:
URL: https://github.com/apache/iceberg/issues/2808#issuecomment-913521847


   Maybe we should close the socket when there is a  socket timeout.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org