You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/10 06:48:16 UTC

[GitHub] [iceberg] openinx opened a new issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

openinx opened a new issue #2575:
URL: https://github.com/apache/iceberg/issues/2575


   I encountered the flakey unit tests several times: 
   
   ```
   org.apache.iceberg.flink.TestFlinkTableSink > testHashDistributeMode[catalogName=testhive, baseNamespace=, format=AVRO, isStreaming=true] FAILED
       java.lang.AssertionError: There should be only 1 data file in partition 'aaa' expected:<1> but was:<2>
           at org.junit.Assert.fail(Assert.java:88)
           at org.junit.Assert.failNotEquals(Assert.java:834)
           at org.junit.Assert.assertEquals(Assert.java:645)
           at org.apache.iceberg.flink.TestFlinkTableSink.testHashDistributeMode(TestFlinkTableSink.java:274)
   
       org.apache.flink.table.api.ValidationException: Could not execute DROP DATABASE IF EXISTS  testhive.db RESTRICT
           at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:989)
           at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
           at org.apache.iceberg.flink.FlinkTestBase.exec(FlinkTestBase.java:91)
           at org.apache.iceberg.flink.FlinkTestBase.exec(FlinkTestBase.java:95)
           at org.apache.iceberg.flink.FlinkTestBase.sql(FlinkTestBase.java:99)
           at org.apache.iceberg.flink.TestFlinkTableSink.clean(TestFlinkTableSink.java:126)
   
           Caused by:
           org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException: Database db in catalog testhive is not empty.
               at org.apache.iceberg.flink.FlinkCatalog.dropDatabase(FlinkCatalog.java:240)
               at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:983)
               ... 5 more
   
               Caused by:
               org.apache.iceberg.exceptions.NamespaceNotEmptyException: Namespace db is not empty. One or more tables exist.
                   at org.apache.iceberg.hive.HiveCatalog.dropNamespace(HiveCatalog.java:307)
                   at org.apache.iceberg.flink.FlinkCatalog.dropDatabase(FlinkCatalog.java:231)
                   ... 6 more
   
                   Caused by:
                   InvalidOperationException(message:Database db is not empty. One or more tables exist.)
                       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$drop_database_result$drop_database_resultStandardScheme.read(ThriftHiveMetastore.java:28714)
                       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$drop_database_result$drop_database_resultStandardScheme.read(ThriftHiveMetastore.java:28691)
                       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$drop_database_result.read(ThriftHiveMetastore.java:28625)
                       at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
                       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_drop_database(ThriftHiveMetastore.java:813)
                       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.drop_database(ThriftHiveMetastore.java:798)
                       at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.dropDatabase(HiveMetaStoreClient.java:868)
                       at org.apache.iceberg.hive.HiveCatalog.lambda$dropNamespace$9(HiveCatalog.java:296)
                       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
                       at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:77)
                       at org.apache.iceberg.hive.HiveCatalog.dropNamespace(HiveCatalog.java:295)
                       ... 7 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1047520875


   @yittg  I think the #4189 & #4187 can still fix the all case you described.  Would you mind to check & verify this again after apply those two PR ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
yittg commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1046609038


   I encountered this failed test case twice, and i think i reproduced it locally.
   
   Before diving it deeply, i think it's better to share the log here, 
   and at first glance it looks like it is different from [the conclusion](https://github.com/apache/iceberg/pull/4117#issuecomment-1042701849).
   
   The following is the detail:
   
   ```
   [Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1)#0 (c3d03556514594e8aff0175bbd12d35e) switched from INITIALIZING to RUNNING.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1) (c3d03556514594e8aff0175bbd12d35e) switched from INITIALIZING to RUNNING.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/metadata/00000-e773c6cb-c67a-422d-9c56-8c68d3d2d64b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by catalog: testhive.db.test_hash_distribution_mode
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergStreamWriter (1/1)#0 (e863df4f3c93498a6d45488a9898774b) switched from INITIALIZING to RUNNING.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergStreamWriter (1/1) (e863df4f3c93498a6d45488a9898774b) switched from INITIALIZING to RUNNING.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (4154094a26a2ffc60623d0ec10172143) switched from INITIALIZING to RUNNING.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1) (4154094a26a2ffc60623d0ec10172143) switched from INITIALIZING to RUNNING.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CHECKPOINT) @ 1645431273362 for job 32b28d9a2d686b0cb1ed6efb940781b5.
   [Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1)#0 (c3d03556514594e8aff0175bbd12d35e) switched from RUNNING to FINISHED.
   [Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1)#0 (c3d03556514594e8aff0175bbd12d35e).
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1)#0 c3d03556514594e8aff0175bbd12d35e.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Values(tuples=[[{ 1, _UTF-16LE'aaa' }, { 1, _UTF-16LE'bbb' }, { 1, _UTF-16LE'ccc' }, { 2, _UTF-16LE'aaa' }, { 2, _UTF-16LE'bbb' }, { 2, _UTF-16LE'ccc' }, { 3, _UTF-16LE'aaa' }, { 3, _UTF-16LE'bbb' }, { 3, _UTF-16LE'ccc' }]]) (1/1) (c3d03556514594e8aff0175bbd12d35e) switched from RUNNING to FINISHED.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.HadoopShimsPre2_7 - Can't get KeyProvider for ORC encryption from hadoop.security.key.provider.path.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00001.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.WriterImpl - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00001.orc with stripeSize: 67108864 options: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=bbb/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00002.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.WriterImpl - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=bbb/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00002.orc with stripeSize: 67108864 options: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=ccc/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00003.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.WriterImpl - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=ccc/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00003.orc with stripeSize: 67108864 options: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00004.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Start to flush snapshot state to state backend, table: testhive.db.test_hash_distribution_mode, checkpointId: 1
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.WriterImpl - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00004.orc with stripeSize: 67108864 options: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergStreamWriter (1/1)#0 (e863df4f3c93498a6d45488a9898774b) switched from RUNNING to FINISHED.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for IcebergStreamWriter (1/1)#0 (e863df4f3c93498a6d45488a9898774b).
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task IcebergStreamWriter (1/1)#0 e863df4f3c93498a6d45488a9898774b.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergStreamWriter (1/1) (e863df4f3c93498a6d45488a9898774b) switched from RUNNING to FINISHED.
   [jobmanager-io-thread-5] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job 32b28d9a2d686b0cb1ed6efb940781b5 (3397 bytes, checkpointDuration=908 ms, finalizationTime=4 ms).
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Committing append with 4 data files and 0 delete files to table testhive.db.test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 alter_table: db=db tbl=test_hash_distribution_mode newtbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 alter_table: db=db tbl=test_hash_distribution_mode newtbl=test_hash_distribution_mode	
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 32b28d9a2d686b0cb1ed6efb940781b5 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreTableOperations - Successfully committed to table testhive.db.test_hash_distribution_mode in 105 ms
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.SnapshotProducer - Committed snapshot 1987697929173874507 (MergeAppend)
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/metadata/00001-6d7501d3-28cb-4ec2-bdb8-51c3b948589e.metadata.json
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Committed in 225 ms
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (4154094a26a2ffc60623d0ec10172143) switched from RUNNING to FINISHED.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (4154094a26a2ffc60623d0ec10172143).
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 4154094a26a2ffc60623d0ec10172143.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1) (4154094a26a2ffc60623d0ec10172143) switched from RUNNING to FINISHED.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Clearing resource requirements of job 32b28d9a2d686b0cb1ed6efb940781b5
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job insert-into_testhive.db.test_hash_distribution_mode (32b28d9a2d686b0cb1ed6efb940781b5) switched from state RUNNING to FINISHED.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 32b28d9a2d686b0cb1ed6efb940781b5.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 32b28d9a2d686b0cb1ed6efb940781b5 reached terminal state FINISHED.
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job 'insert-into_testhive.db.test_hash_distribution_mode' (32b28d9a2d686b0cb1ed6efb940781b5).
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
   ```
   
   The result as expected
   ```
   There should be 1 data file in partition 'aaa' expected:<1> but was:<2>
   Expected :1
   Actual   :2
   <Click to see difference>
   
   java.lang.AssertionError: There should be 1 data file in partition 'aaa' expected:<1> but was:<2>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
yittg commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1047492144


   > 1. Committing with more than one files in one snapshot;
   
   For part-1, i think the following log is more detailed by adding some flag log,
   It's mostly like because `EndInput` comes before `notifyCheckpointComplete`. With the following timeline:
   _W is for IcebergStreamWriter, C is for IcebergFilesCommitter_
   * [W]file-1 created;
   * [W]checkpoint-1 prepared;
   * [W]result with file-1 emitted;
   * [W]file-2 created;
   * **[C]checkpoint-1 notified, snapshot-1 with file-1 committed;**
   * [W]checkpoint-2 prepared;
   * [W]result with file-2 emitted;
   * [W] file-3 created;
   * [W] endInput;
   * [W] result with file-3 emitted;
   * **[C] endInput, snapshot-2 with file-2, file-3 committed;**
   * [C] checkpoint-2 notified;
   
   ```
   --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
   +    LOG.info("Checkpoint notified, #{}", checkpointId);
        if (checkpointId > maxCommittedCheckpointId) {
          commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
          this.maxCommittedCheckpointId = checkpointId;
   ...
      public void endInput() throws IOException {
   +    LOG.info("End input reached");
        // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
        long currentCheckpointId = Long.MAX_VALUE;
        dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
   --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
   +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
      @Override
      public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
   +    LOG.info("Before checkpoint #{}", checkpointId);
        // close all open files and emit files to downstream committer operator
        emit(writer.complete());
   ...
      @Override
      public void endInput() throws IOException {
   +    LOG.info("End input reached");
        // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining
        // completed files to downstream before closing the writer so that we won't miss any of them.
        emit(writer.complete());
   ```
   
   ```
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_table
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_table	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 create_table: Table(tableName:test_table, dbName:db, owner:tangyi, createTime:540425, lastAccessTime:540425, retention:2147483647, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:null), FieldSchema(name:data, type:string, comment:null)], location:file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_table, inputFormat:org.apache.hadoop.mapred.FileInputFormat, outputFormat:org.apache.hadoop.mapred.FileOutputFormat, compressed:false, numBuckets:0, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{}), bucketCols:null, sortCols:null, parameters:null), partitionKeys:[], parameters:{EXTERNAL=TRUE, write.format.default=ORC, metadata_location=file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_table/metadata/00000-b94688ab-1fd9-465a-bed0-e4b1a9
 3c98fb.metadata.json, uuid=ebd3ca64-66b1-40b1-8a6c-6b356dcd1e39, table_type=ICEBERG}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 create_table: Table(tableName:test_table, dbName:db, owner:tangyi, createTime:540425, lastAccessTime:540425, retention:2147483647, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:null), FieldSchema(name:data, type:string, comment:null)], location:file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_table, inputFormat:org.apache.hadoop.mapred.FileInputFormat, outputFormat:org.apache.hadoop.mapred.FileOutputFormat, compressed:false, numBuckets:0, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{}), bucketCols:null, sortCols:null, parameters:null), partitionKeys:[], parameters:{EXTERNAL=TRUE, write.format.default=ORC, metadata_location=file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_table/metadata/00000
 -b94688ab-1fd9-465a-bed0-e4b1a93c98fb.metadata.json, uuid=ebd3ca64-66b1-40b1-8a6c-6b356dcd1e39, table_type=ICEBERG}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)	
   [pool-11-thread-1] INFO hive.log - Updating table stats fast for test_table
   [pool-11-thread-1] INFO hive.log - Updated size of table test_table to 1245
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Successfully committed to table testhive.db.test_table in 809 ms
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_table
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_table	
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_table/metadata/00000-b94688ab-1fd9-465a-bed0-e4b1a93c98fb.metadata.json
   [Test worker] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by catalog: hive.db.test_table
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_database: db
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_database: db	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 create_table: Table(tableName:test_hash_distribution_mode, dbName:db, owner:tangyi, createTime:540425, lastAccessTime:540425, retention:2147483647, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:null), FieldSchema(name:data, type:string, comment:null)], location:file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode, inputFormat:org.apache.hadoop.mapred.FileInputFormat, outputFormat:org.apache.hadoop.mapred.FileOutputFormat, compressed:false, numBuckets:0, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{}), bucketCols:null, sortCols:null, parameters:null), partitionKeys:[], parameters:{EXTERNAL=TRUE, write.format.default=ORC, metadata_location=file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution
 _mode/metadata/00000-12b9361e-35a0-4862-94ec-9bae7deb0fe1.metadata.json, write.distribution-mode=hash, uuid=f2be590f-787b-4bf9-a4fa-4e8339d4e1a6, table_type=ICEBERG}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 create_table: Table(tableName:test_hash_distribution_mode, dbName:db, owner:tangyi, createTime:540425, lastAccessTime:540425, retention:2147483647, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:null), FieldSchema(name:data, type:string, comment:null)], location:file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode, inputFormat:org.apache.hadoop.mapred.FileInputFormat, outputFormat:org.apache.hadoop.mapred.FileOutputFormat, compressed:false, numBuckets:0, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{}), bucketCols:null, sortCols:null, parameters:null), partitionKeys:[], parameters:{EXTERNAL=TRUE, write.format.default=ORC, metadata_location=file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit131413815952520959
 98/db.db/test_hash_distribution_mode/metadata/00000-12b9361e-35a0-4862-94ec-9bae7deb0fe1.metadata.json, write.distribution-mode=hash, uuid=f2be590f-787b-4bf9-a4fa-4e8339d4e1a6, table_type=ICEBERG}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)	
   [pool-11-thread-1] INFO hive.log - Updating table stats fast for test_hash_distribution_mode
   [pool-11-thread-1] INFO hive.log - Updated size of table test_hash_distribution_mode to 1513
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Successfully committed to table testhive.db.test_hash_distribution_mode in 93 ms
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_database: db
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_database: db	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=default_catalog
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=default_catalog	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_database: default_catalog
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_database: default_catalog	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_database: db
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_database: db	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_database: default_catalog
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_database: default_catalog	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/00000-12b9361e-35a0-4862-94ec-9bae7deb0fe1.metadata.json
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/00000-12b9361e-35a0-4862-94ec-9bae7deb0fe1.metadata.json
   [Test worker] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by catalog: testhive.db.test_hash_distribution_mode
   [Test worker] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.iceberg.io.WriteResult does not contain a setter for field dataFiles
   [Test worker] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.iceberg.io.WriteResult cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 'insert-into_testhive.db.test_hash_distribution_mode' (0462ddc2e6a8276f79b3914f9b4fcd19).
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 'insert-into_testhive.db.test_hash_distribution_mode' (0462ddc2e6a8276f79b3914f9b4fcd19).
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender LeaderContender: JobMasterServiceLeadershipRunner
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_6 .
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - Initializing job 'insert-into_testhive.db.test_hash_distribution_mode' (0462ddc2e6a8276f79b3914f9b4fcd19).
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for insert-into_testhive.db.test_hash_distribution_mode (0462ddc2e6a8276f79b3914f9b4fcd19).
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job insert-into_testhive.db.test_hash_distribution_mode (0462ddc2e6a8276f79b3914f9b4fcd19).
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms.
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology - Built 1 pipelined regions in 0 ms
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@2df480e8
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend loader loads the state backend as HashMapStateBackend
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - Checkpoint storage is set to 'jobmanager'
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No checkpoint found during restore.
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@6a8e2389 for insert-into_testhive.db.test_hash_distribution_mode (0462ddc2e6a8276f79b3914f9b4fcd19).
   [jobmanager-io-thread-1] INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Received confirmation of leadership for leader akka://flink/user/rpc/jobmanager_6 , session=2cc8799f-155e-4f8f-a8b4-1b9177294dfe
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job 'insert-into_testhive.db.test_hash_distribution_mode' (0462ddc2e6a8276f79b3914f9b4fcd19) under job master id a8b41b9177294dfe2cc8799f155e4f8f.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job insert-into_testhive.db.test_hash_distribution_mode (0462ddc2e6a8276f79b3914f9b4fcd19) switched from state CREATED to RUNNING.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1) (876164887f156d6a29a5e26a541b1d8b) switched from CREATED to SCHEDULED.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergStreamWriter (1/1) (c5292a7cfc1d527e13a5eeb9926a4a95) switched from CREATED to SCHEDULED.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1) (ca05ac4af8adb64ada4a789b93be3848) switched from CREATED to SCHEDULED.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.jobmaster.JobMaster - Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_4(a3ea462904f2b99a88ed07a16e394a8b)
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved ResourceManager address, beginning registration
   [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering job manager a8b41b9177294dfe2cc8799f155e4f8f@akka://flink/user/rpc/jobmanager_6 for job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registered job manager a8b41b9177294dfe2cc8799f155e4f8f@akka://flink/user/rpc/jobmanager_6 for job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager successfully registered at ResourceManager, leader id: a3ea462904f2b99a88ed07a16e394a8b.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Received resource requirements from job 0462ddc2e6a8276f79b3914f9b4fcd19: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
   [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request 55cac133873b326447786e98d4a8bf2b for job 0462ddc2e6a8276f79b3914f9b4fcd19 from resource manager with leader id a3ea462904f2b99a88ed07a16e394a8b.
   [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for 55cac133873b326447786e98d4a8bf2b.
   [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Add job 0462ddc2e6a8276f79b3914f9b4fcd19 for job leader monitoring.
   [mini-cluster-io-thread-2] INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Try to register at job manager akka://flink/user/rpc/jobmanager_6 with leader id 2cc8799f-155e-4f8f-a8b4-1b9177294dfe.
   [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Resolved JobManager address, beginning registration
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Successful registration at job manager akka://flink/user/rpc/jobmanager_6 for job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Establish JobManager connection for job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to the leader of job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1) (876164887f156d6a29a5e26a541b1d8b) switched from SCHEDULED to DEPLOYING.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1) (attempt #0) with attempt id 876164887f156d6a29a5e26a541b1d8b to 1948ad9a-cb40-4a3d-9770-db7de5a7bae5 @ localhost (dataPort=-1) with allocation id 55cac133873b326447786e98d4a8bf2b
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergStreamWriter (1/1) (c5292a7cfc1d527e13a5eeb9926a4a95) switched from SCHEDULED to DEPLOYING.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying IcebergStreamWriter (1/1) (attempt #0) with attempt id c5292a7cfc1d527e13a5eeb9926a4a95 to 1948ad9a-cb40-4a3d-9770-db7de5a7bae5 @ localhost (dataPort=-1) with allocation id 55cac133873b326447786e98d4a8bf2b
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 55cac133873b326447786e98d4a8bf2b.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1) (ca05ac4af8adb64ada4a789b93be3848) switched from SCHEDULED to DEPLOYING.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1) (attempt #0) with attempt id ca05ac4af8adb64ada4a789b93be3848 to 1948ad9a-cb40-4a3d-9770-db7de5a7bae5 @ localhost (dataPort=-1) with allocation id 55cac133873b326447786e98d4a8bf2b
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader - StateChangelogStorageLoader initialized with shortcut names {memory}.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader - Creating a changelog storage with name 'memory'.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0 (876164887f156d6a29a5e26a541b1d8b), deploy into slot with allocation id 55cac133873b326447786e98d4a8bf2b.
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0 (876164887f156d6a29a5e26a541b1d8b) switched from CREATED to DEPLOYING.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 55cac133873b326447786e98d4a8bf2b.
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0 (876164887f156d6a29a5e26a541b1d8b) [DEPLOYING].
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task IcebergStreamWriter (1/1)#0 (c5292a7cfc1d527e13a5eeb9926a4a95), deploy into slot with allocation id 55cac133873b326447786e98d4a8bf2b.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergStreamWriter (1/1)#0 (c5292a7cfc1d527e13a5eeb9926a4a95) switched from CREATED to DEPLOYING.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task IcebergStreamWriter (1/1)#0 (c5292a7cfc1d527e13a5eeb9926a4a95) [DEPLOYING].
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 55cac133873b326447786e98d4a8bf2b.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (ca05ac4af8adb64ada4a789b93be3848), deploy into slot with allocation id 55cac133873b326447786e98d4a8bf2b.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (ca05ac4af8adb64ada4a789b93be3848) switched from CREATED to DEPLOYING.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (ca05ac4af8adb64ada4a789b93be3848) [DEPLOYING].
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 55cac133873b326447786e98d4a8bf2b.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@eb5eb98
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend loader loads the state backend as HashMapStateBackend
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Checkpoint storage is set to 'jobmanager'
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@6873aa5c
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend loader loads the state backend as HashMapStateBackend
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Checkpoint storage is set to 'jobmanager'
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@6e0bd0bd
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend loader loads the state backend as HashMapStateBackend
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Checkpoint storage is set to 'jobmanager'
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 0462ddc2e6a8276f79b3914f9b4fcd19 because Checkpoint triggering task Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1) of job 0462ddc2e6a8276f79b3914f9b4fcd19 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (ca05ac4af8adb64ada4a789b93be3848) switched from DEPLOYING to INITIALIZING.
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0 (876164887f156d6a29a5e26a541b1d8b) switched from DEPLOYING to INITIALIZING.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergStreamWriter (1/1)#0 (c5292a7cfc1d527e13a5eeb9926a4a95) switched from DEPLOYING to INITIALIZING.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1) (ca05ac4af8adb64ada4a789b93be3848) switched from DEPLOYING to INITIALIZING.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1) (876164887f156d6a29a5e26a541b1d8b) switched from DEPLOYING to INITIALIZING.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergStreamWriter (1/1) (c5292a7cfc1d527e13a5eeb9926a4a95) switched from DEPLOYING to INITIALIZING.
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] WARN org.apache.flink.metrics.MetricGroup - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) exceeded the 80 characters length limit and was truncated.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder - Finished to build heap keyed state-backend.
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/00000-12b9361e-35a0-4862-94ec-9bae7deb0fe1.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by catalog: testhive.db.test_hash_distribution_mode
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergStreamWriter (1/1)#0 (c5292a7cfc1d527e13a5eeb9926a4a95) switched from INITIALIZING to RUNNING.
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0 (876164887f156d6a29a5e26a541b1d8b) switched from INITIALIZING to RUNNING.
   [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergStreamWriter (1/1) (c5292a7cfc1d527e13a5eeb9926a4a95) switched from INITIALIZING to RUNNING.
   [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1) (876164887f156d6a29a5e26a541b1d8b) switched from INITIALIZING to RUNNING.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (ca05ac4af8adb64ada4a789b93be3848) switched from INITIALIZING to RUNNING.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1) (ca05ac4af8adb64ada4a789b93be3848) switched from INITIALIZING to RUNNING.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.HadoopShimsPre2_7 - Can't get KeyProvider for ORC encryption from hadoop.security.key.provider.path.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-99c5c573-3f2b-46df-8a23-e359b36d2c6e-00001.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CHECKPOINT) @ 1645512904342 for job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.WriterImpl - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-99c5c573-3f2b-46df-8a23-e359b36d2c6e-00001.orc with stripeSize: 67108864 options: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergStreamWriter - Before checkpoint #1
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-99c5c573-3f2b-46df-8a23-e359b36d2c6e-00002.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.WriterImpl - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-99c5c573-3f2b-46df-8a23-e359b36d2c6e-00002.orc with stripeSize: 67108864 options: Compress: ZLIB buffer: 262144
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Start to flush snapshot state to state backend, table: testhive.db.test_hash_distribution_mode, checkpointId: 1
   [jobmanager-io-thread-5] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job 0462ddc2e6a8276f79b3914f9b4fcd19 (724151 bytes, checkpointDuration=1418 ms, finalizationTime=6 ms).
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1645512905767 for job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Checkpoint notified, #1
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource - generated 100000 rows
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0 (876164887f156d6a29a5e26a541b1d8b) switched from RUNNING to FINISHED.
   [Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0 (876164887f156d6a29a5e26a541b1d8b).
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1)#0 876164887f156d6a29a5e26a541b1d8b.
   [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id]) -> Calc(select=[id, _UTF-16LE'aaa' AS data]) (1/1) (876164887f156d6a29a5e26a541b1d8b) switched from RUNNING to FINISHED.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergStreamWriter - Before checkpoint #2
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-99c5c573-3f2b-46df-8a23-e359b36d2c6e-00003.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.WriterImpl - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-99c5c573-3f2b-46df-8a23-e359b36d2c6e-00003.orc with stripeSize: 67108864 options: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergStreamWriter - End input reached
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergStreamWriter (1/1)#0 (c5292a7cfc1d527e13a5eeb9926a4a95) switched from RUNNING to FINISHED.
   [IcebergStreamWriter (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for IcebergStreamWriter (1/1)#0 (c5292a7cfc1d527e13a5eeb9926a4a95).
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task IcebergStreamWriter (1/1)#0 c5292a7cfc1d527e13a5eeb9926a4a95.
   [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergStreamWriter (1/1) (c5292a7cfc1d527e13a5eeb9926a4a95) switched from RUNNING to FINISHED.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Committing append with 1 data files and 0 delete files to table testhive.db.test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 alter_table: db=db tbl=test_hash_distribution_mode newtbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 alter_table: db=db tbl=test_hash_distribution_mode newtbl=test_hash_distribution_mode	
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreTableOperations - Successfully committed to table testhive.db.test_hash_distribution_mode in 193 ms
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.SnapshotProducer - Committed snapshot 5883620300433327646 (MergeAppend)
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/00001-c3fbec69-b9e9-4734-ada1-46b3cf34af00.metadata.json
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Committed in 378 ms
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Start to flush snapshot state to state backend, table: testhive.db.test_hash_distribution_mode, checkpointId: 2
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - End input reached
   [jobmanager-io-thread-10] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 0462ddc2e6a8276f79b3914f9b4fcd19 (15055 bytes, checkpointDuration=586 ms, finalizationTime=3 ms).
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 0462ddc2e6a8276f79b3914f9b4fcd19 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 0462ddc2e6a8276f79b3914f9b4fcd19 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 0462ddc2e6a8276f79b3914f9b4fcd19 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 0462ddc2e6a8276f79b3914f9b4fcd19 because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Committing append with 2 data files and 0 delete files to table testhive.db.test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 alter_table: db=db tbl=test_hash_distribution_mode newtbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 alter_table: db=db tbl=test_hash_distribution_mode newtbl=test_hash_distribution_mode	
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreTableOperations - Successfully committed to table testhive.db.test_hash_distribution_mode in 104 ms
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.SnapshotProducer - Committed snapshot 8717152434606897052 (MergeAppend)
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/00002-3bfc06f1-94da-4443-8dc4-c14e35da3fc0.metadata.json
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Committed in 258 ms
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter - Checkpoint notified, #2
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (ca05ac4af8adb64ada4a789b93be3848) switched from RUNNING to FINISHED.
   [IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 (ca05ac4af8adb64ada4a789b93be3848).
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1)#0 ca05ac4af8adb64ada4a789b93be3848.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IcebergFilesCommitter -> Sink: IcebergSink testhive.db.test_hash_distribution_mode (1/1) (ca05ac4af8adb64ada4a789b93be3848) switched from RUNNING to FINISHED.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job insert-into_testhive.db.test_hash_distribution_mode (0462ddc2e6a8276f79b3914f9b4fcd19) switched from state RUNNING to FINISHED.
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Clearing resource requirements of job 0462ddc2e6a8276f79b3914f9b4fcd19
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 0462ddc2e6a8276f79b3914f9b4fcd19 reached terminal state FINISHED.
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job 'insert-into_testhive.db.test_hash_distribution_mode' (0462ddc2e6a8276f79b3914f9b4fcd19).
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool - Releasing slot [55cac133873b326447786e98d4a8bf2b].
   [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection c1d6e8672929f08ffdd6d908c456d65a: Stopping JobMaster for job 'insert-into_testhive.db.test_hash_distribution_mode' (0462ddc2e6a8276f79b3914f9b4fcd19).
   [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{taskHeapMemory=256.000gb (274877906944 bytes), taskOffHeapMemory=256.000gb (274877906944 bytes), managedMemory=20.000mb (20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationId: 55cac133873b326447786e98d4a8bf2b, jobId: 0462ddc2e6a8276f79b3914f9b4fcd19).
   [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Disconnect job manager a8b41b9177294dfe2cc8799f155e4f8f@akka://flink/user/rpc/jobmanager_6 for job 0462ddc2e6a8276f79b3914f9b4fcd19 from the resource manager.
   [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Remove job 0462ddc2e6a8276f79b3914f9b4fcd19 from job leader monitoring.
   [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 0462ddc2e6a8276f79b3914f9b4fcd19.
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/00002-3bfc06f1-94da-4443-8dc4-c14e35da3fc0.metadata.json
   [Test worker] INFO org.apache.iceberg.BaseMetastoreCatalog - Table loaded by catalog: hive.db.test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [Test worker] INFO org.apache.iceberg.BaseTableScan - Scanning table hive.db.test_hash_distribution_mode snapshot 5883620300433327646 created at 2022-02-22 06:55:06.089 with filter true
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/00002-3bfc06f1-94da-4443-8dc4-c14e35da3fc0.metadata.json
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/00002-3bfc06f1-94da-4443-8dc4-c14e35da3fc0.metadata.json
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_hash_distribution_mode	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 drop_table : db=db tbl=test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 drop_table : db=db tbl=test_hash_distribution_mode	
   [Test worker] INFO org.apache.iceberg.CatalogUtil - Manifests to delete: GenericManifestFile{path=file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/46ff29de-4b47-4980-b7c5-9f017f6e1722-m0.avro, length=6101, partition_spec_id=0, added_snapshot_id=8717152434606897052, added_data_files_count=2, added_rows_count=90069, existing_data_files_count=0, existing_rows_count=0, deleted_data_files_count=0, deleted_rows_count=0, partitions=[GenericPartitionFieldSummary{contains_null=false, contains_nan=false, lower_bound=[97, 97, 97], upper_bound=[97, 97, 97]}], key_metadata=null}, GenericManifestFile{path=file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_hash_distribution_mode/metadata/eb44b7e6-0d4f-4cd3-b8d9-709e4618e1e3-m0.avro, length=6055, partition_spec_id=0, added_snapshot_id=5883620300433327646, added_data_files_count=1, added_rows_count=9931, existing_data_files_count=0, ex
 isting_rows_count=0, deleted_data_files_count=0, deleted_rows_count=0, partitions=[GenericPartitionFieldSummary{contains_null=false, contains_nan=false, lower_bound=[97, 97, 97], upper_bound=[97, 97, 97]}], key_metadata=null}
   [Test worker] INFO org.apache.iceberg.hive.HiveCatalog - Dropped table: db.test_hash_distribution_mode
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_table
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_table	
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_table/metadata/00000-b94688ab-1fd9-465a-bed0-e4b1a93c98fb.metadata.json
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_table
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_table	
   [Test worker] INFO org.apache.iceberg.BaseMetastoreTableOperations - Refreshing table metadata from new version: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit13141381595252095998/db.db/test_table/metadata/00000-b94688ab-1fd9-465a-bed0-e4b1a93c98fb.metadata.json
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_table : db=db tbl=test_table
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_table : db=db tbl=test_table	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 drop_table : db=db tbl=test_table
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 drop_table : db=db tbl=test_table	
   [Test worker] INFO org.apache.iceberg.CatalogUtil - Manifests to delete: 
   [Test worker] INFO org.apache.iceberg.hive.HiveCatalog - Dropped table: db.test_table
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_database: db
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_database: db	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 drop_database: db
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 drop_database: db	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_all_tables: db=db
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_all_tables: db=db	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 1: source:127.0.0.1 get_functions: db=db pat=*
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=tangyi	ip=127.0.0.1	cmd=source:127.0.0.1 get_functions: db=db pat=*	
   [pool-11-thread-1] INFO org.apache.hadoop.hive.metastore.ObjectStore - Dropping database db along with all tables
   [Test worker] INFO org.apache.iceberg.hive.HiveCatalog - Dropped namespace: db
   [Test worker] ERROR org.apache.iceberg.flink.TestFlinkTableSink - 
   --------------------------------------------------------------------------------
   Test testHashDistributeMode[catalogName=testhive, baseNamespace=, format=ORC, isStreaming=true](org.apache.iceberg.flink.TestFlinkTableSink) failed with:
   java.lang.AssertionError: There should be 1 data file in partition 'aaa' expected:<1> but was:<2>
   	at org.junit.Assert.fail(Assert.java:89)
   	at org.junit.Assert.failNotEquals(Assert.java:835)
   	at org.junit.Assert.assertEquals(Assert.java:647)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx closed issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
openinx closed issue #2575:
URL: https://github.com/apache/iceberg/issues/2575


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
yittg commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1047543032


   @openinx sure, i'll check it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx edited a comment on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
openinx edited a comment on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1046845868


   Here we have only 1 parallelism in this unit test to write records in this stream flow: 
   
   ```
   Source -> Shuffle -> IcebergStreamWriter -> IcebergFilesCommitter.
   ```
   
   And we have the following records that need to write into the partitioned table (partition field is `data`):
   
   ```
   (1, 'aaa'), (1, 'bbb'), (1, 'ccc')
   (2, 'aaa'), (2, 'bbb'), (2, 'ccc')
   (3, 'aaa'), (3, 'bbb'), (3, 'ccc')
   ```
   
   Then: 
   
   **Step#1**  we write the following records into orc files: 
   
   ```
   (1, 'aaa'), (1, 'bbb'), (1, 'ccc')
   (2, 'aaa'), (2, 'bbb'), (2, 'ccc')
   (3, 'bbb'), (3, 'ccc')
   
   # Notice: the record (3, 'aaa') was not emitted to the orc file in the checkpoint#1 
   ```
   
   **Step#2**  checkpoint barrier was encountered, so the IcebergStreamWriter emits the `1.orc`, `2.orc`, `3.orc` to IcebergFilesCommitter.
   **Step#3**  IcebergFilesCommitter was trying to execute `snapshotState`;
   **Step#4**  The IcebergStreamWriter emitted a `4.orc` with `(3, 'aaa')` included (endInput), and then close itself. 
   **Step#5**  The IcebergFilesCommitter commit the transaction with 4 orc data files. (notifyCheckpointComplete)
   
   The log message seems did the above steps, and finally commit all the 4 orc data files in a single transaction. But in fact, the flink  IcebergFilesCommitter won't accept any new file (`4.orc`) to the pending transaction when it is executing the `snapshotState` in **Step#3** because the flink's `StreamTask` is a single thread consuming message from a FIFO queue. I think there must be other wrong thing but I cannot reproduce this thing.
   
   Anyway, the root cause for this failure unit test is: we don't control all the events precisely in a single checkpoint (In this case, few records are accumulated in one checkpoint, but there is still someone which was remained for the `endInput` to emit).  So I think the solution I proposed in this https://github.com/apache/iceberg/pull/4117#issuecomment-1042718844 should still work to fix it fundamentally. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
yittg commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1046695750


   @openinx i can not reproduce it stably for now,
   I think you can just repeat it multiple times, for example, by generating thousands of `parameters`, or script.
   And i think decreasing the checkpoint interval can help reproducing it, for example, to 100ms.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
yittg commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1047432236


   The following test can easily failed, i think it is equivalent to the original IIUC. It will lead to different kinds of error:
   1. Committing with more than one files in one snapshot;
   2. Committing with one file in each snapshot, but failed on the final assert.
   
   Hope it can help, @openinx.
   
   ```
     @Test
     public void testHashDistributeMode() throws Exception {
       String tableName = "test_hash_distribution_mode";
       Map<String, String> tableProps = ImmutableMap.of(
           "write.format.default", format.name(),
           TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName()
       );
       sql("CREATE TABLE default_catalog.default_database.src (id INT) WITH %s",
           toWithClause(ImmutableMap.of(
               "connector","datagen",
               "number-of-rows", "100000",
               "rows-per-second", "100000",
               "fields.id.kind", "sequence",
               "fields.id.start", "1",
               "fields.id.end", "100000")));
       sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
           tableName, toWithClause(tableProps));
   
       try {
         // Insert data set.
   
         sql("INSERT INTO %s SELECT id, 'aaa' as data FROM default_catalog.default_database.src", tableName);
   
         Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
   
         // Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval,
         // thus producing multiple snapshots.  Here we assert that each snapshot has only 1 file per partition.
         Map<Long, List<DataFile>> snapshotToDataFiles = SimpleDataUtil.snapshotToDataFiles(table);
         for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
           Assert.assertEquals("There should be 1 data file in partition 'aaa'", 1,
               SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size());
         }
       } finally {
         sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
       }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1046845868


   Here we have only 1 parallelism in this unit test to write records in this stream flow: 
   
   ```
   Source -> Shuffle -> IcebergStreamWriter -> IcebergFilesCommitter.
   ```
   
   And we have the following records that need to write into the partitioned table (partition field is `data`):
   
   ```
   (1, 'aaa'), (1, 'bbb'), (1, 'ccc')
   (2, 'aaa'), (2, 'bbb'), (2, 'ccc')
   (3, 'aaa'), (3, 'bbb'), (3, 'ccc')
   ```
   
   Then: 
   
   **Step#1**  we write the following records into orc files: 
   
   ```
   (1, 'aaa'), (1, 'bbb'), (1, 'ccc')
   (2, 'aaa'), (2, 'bbb'), (2, 'ccc')
   (3, 'bbb'), (3, 'ccc')
   
   # Notice: the record (3, 'aaa') was not emitted to the orc file in the checkpoint#1 
   ```
   
   **Step#2**  checkpoint barrier was encountered, so the IcebergStreamWriter emits the `1.orc`, `2.orc`, `3.orc` to IcebergFilesCommitter.
   **Step#3**  IcebergFilesCommitter was trying to execute `snapshotState`;
   **Step#4**  The IcebergStreamWriter emitted a `4.orc` with `(3, 'aaa')` included (endInput), and then close itself. 
   **Step#5**  The IcebergFilesCommitter commit the transaction with 4 orc data files. (notifyCheckpointComplete)
   
   The log message seems did the above steps, and finally commit all the 4 orc data files in a single transaction. But in fact, the flink  IcebergFilesCommitter won't accept any new file (`4.orc`) to the pending transaction when it is executing the `snapshotState` because the flink's `StreamTask` is a single thread consuming message from a FIFO queue. I think there must be other wrong thing but I cannot reproduce this thing.
   
   Anyway, the root cause for this failure unit test is: we don't control all the events precisely in a single checkpoint (In this case, few records are accumulated in one checkpoint, but there is still someone which was remained for the `endInput` to emit).  So I think the solution I proposed in this https://github.com/apache/iceberg/pull/4117#issuecomment-1042718844 should still work to fix it fundamentally. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
nastra commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-945773103


   I have been running into this quite frequently in the past, so I disabled it in https://github.com/apache/iceberg/pull/3307 until it's fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
yittg commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1047447027


   
   > 2. Committing with one file in each snapshot, but failed on the final assert.
   
   This can be fixed by the following change:
   
   ```
   --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
   +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
   @@ -275,10 +275,10 @@ public class SimpleDataUtil {
          TableScan tableScan = table.newScan();
          if (current.parentId() != null) {
            // Collect the data files that was added only in current snapshot.
   -        tableScan.appendsBetween(current.parentId(), current.snapshotId());
   +        tableScan = tableScan.appendsBetween(current.parentId(), current.snapshotId());
          } else {
            // Collect the data files that was added in the oldest snapshot.
   -        tableScan.useSnapshot(current.snapshotId());
   +        tableScan = tableScan.useSnapshot(current.snapshotId());
          }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1046664018


   Thanks for the detailed log message,  Did you find the way to reproduce this failure in you local host?  ( I still can not reproduce).
   
   I read the log message carefully, it's quite strange that producing 4 data files in a single checkpoint. 
   
   ```
   ➜  ~ cat Untitled-2 | grep 'org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path'
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00001.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=bbb/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00002.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=ccc/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00003.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   [IcebergStreamWriter (1/1)#0] INFO org.apache.orc.impl.PhysicalFsWriter - ORC writer created for path: file:/var/folders/t6/8l83wbcd52g29cp78v66n7rc0000gn/T/junit9733325594032949849/db.db/test_hash_distribution_mode/data/data=aaa/00000-0-0e650bad-9e4f-4953-b2f6-3a99868aa38a-00004.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
   ```
   
   Only the partition 'aaa' produces two orc data files. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #2575: Flakey flink unit tests TestFlinkTableSink#testHashDistributeMode

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #2575:
URL: https://github.com/apache/iceberg/issues/2575#issuecomment-1050477805


   Plan to close this issue now because we've merged a final fix.  Feel free to reopen this if there is any other issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org