You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/13 06:02:42 UTC

[GitHub] [iceberg] jxwnhj0717 commented on issue #4550: the snapshot file is lost when write iceberg using flink Failed to open input stream for file File does not exist

jxwnhj0717 commented on issue #4550:
URL: https://github.com/apache/iceberg/issues/4550#issuecomment-1381352493

   I added logs for locking and releasing locks in LockManagers$InMemoryLockManager, reproducing the bad table issue:
   `
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - acquiring lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - acquiring lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - acquired lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - acquired lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - add lock heartbeat. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - add lock heartbeat. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - releasing lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/22f4f9c5-35d2-467c-8263-d26fd6548017.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - released lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/22f4f9c5-35d2-467c-8263-d26fd6548017.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 INFO  org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a new metadata file hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - releasing lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/ced05720-8f32-49c9-884d-49b604290622.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - released lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/ced05720-8f32-49c9-884d-49b604290622.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO  org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a new metadata file hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,679 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - releasing lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - releasing lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - released lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO  org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a new metadata file hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,684 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2 (5dec3e28251735766bb3eb423ca5a45c) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
   	at org.apache.iceberg.util.LockManagers$InMemoryLockManager.release(LockManagers.java:235)
   	at org.apache.iceberg.hadoop.HadoopTableOperations.renameToFinal(HadoopTableOperations.java:377)
   	at org.apache.iceberg.hadoop.HadoopTableOperations.commit(HadoopTableOperations.java:159)
   	at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:317)
   	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
   	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
   	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:295)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:312)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:276)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:218)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:188)
   	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
   	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:334)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1171)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:1136)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:1159)
   	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
   	at java.lang.Thread.run(Thread.java:748)
   `
   
   The code is as follows:
   `
       @VisibleForTesting
       void acquireOnce(String entityId, String ownerId) {
         InMemoryLockContent content = LOCKS.get(entityId);
         if (content != null && content.expireMs() > System.currentTimeMillis()) {
           throw new IllegalStateException(String.format("Lock for %s currently held by %s, expiration: %s",
                   entityId, content.ownerId(), content.expireMs()));
         }
   
         LOG.info("acquiring lock. entityId:{} ownerId:{}", entityId, ownerId);
         long expiration = System.currentTimeMillis() + heartbeatTimeoutMs();
         boolean succeed;
         if (content == null) {
           InMemoryLockContent previous = LOCKS.putIfAbsent(
                   entityId, new InMemoryLockContent(ownerId, expiration));
           succeed = previous == null;
         } else {
           succeed = LOCKS.replace(entityId, content, new InMemoryLockContent(ownerId, expiration));
         }
   
         if (succeed) {
           LOG.info("acquired lock. entityId:{} ownerId:{}", entityId, ownerId);
           // cleanup old heartbeat
           if (HEARTBEATS.containsKey(entityId)) {
             HEARTBEATS.remove(entityId).cancel(false);
             LOG.info("remove lock heartbeat. entityId:{} ownerId:{}", entityId, ownerId);
           }
   
           HEARTBEATS.put(entityId, scheduler().scheduleAtFixedRate(() -> {
             InMemoryLockContent lastContent = LOCKS.get(entityId);
             try {
               long newExpiration = System.currentTimeMillis() + heartbeatTimeoutMs();
               LOCKS.replace(entityId, lastContent, new InMemoryLockContent(ownerId, newExpiration));
             } catch (NullPointerException e) {
               throw new RuntimeException("Cannot heartbeat to a deleted lock " + entityId, e);
             }
   
           }, 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
           LOG.info("add lock heartbeat. entityId:{} ownerId:{}", entityId, ownerId);
   
         } else {
           throw new IllegalStateException("Unable to acquire lock " + entityId);
         }
       }
       @Override
       public boolean release(String entityId, String ownerId) {
         LOG.info("releasing lock. entityId:{} ownerId:{}", entityId, ownerId);
         InMemoryLockContent currentContent = LOCKS.get(entityId);
         if (currentContent == null) {
           LOG.error("Cannot find lock for entity {}", entityId);
           return false;
         }
   
         if (!currentContent.ownerId().equals(ownerId)) {
           LOG.error("Cannot unlock {} by {}, current owner: {}", entityId, ownerId, currentContent.ownerId());
           return false;
         }
   
         HEARTBEATS.remove(entityId).cancel(false);
         LOCKS.remove(entityId);
         LOG.info("released lock. entityId:{} ownerId:{}", entityId, ownerId);
         return true;
       }
   `
   
   HEARTBEATS.remove(entityId) is empty when the lock is released, strangely, 2023-01-11 18:46:03, 672 added entityId heartbeart, 2023-01-11 18:46:03, 684 can not find the corresponding heartbeat, the whole process is single-threaded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org