You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/02 10:48:57 UTC

[GitHub] [iceberg] AHuio opened a new issue, #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

AHuio opened a new issue, #6104:
URL: https://github.com/apache/iceberg/issues/6104

   ### Apache Iceberg version
   
   0.14.1
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   > flink: 1.13.5
   > iceberg:  1.13.2 / 1.14.1
   
   When using Rewrite files action API **rewriteDataFiles()**, the new compressed file is generated without a corresponding manifest file, I tried iceberg versions 1.13.2 and 1.14.1 which has a similar problem under the iceberg-catalog of Hive and Hadoop. 
   
   > The Iceberg Maven dependent, table structure and code to compress the file using the Java API is as follows:
   ```
         <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-flink-runtime-1.13</artifactId>
   <!--            <version>0.13.2</version>-->
               <version>0.14.1</version>
           </dependency>
   ```
   ```
   name: iceberg_hive_catalog
   type: iceberg
   catalog-type: hive
   uri: thrift://xxxxx:9083
   clients: 5
   property-version: 1
   warehouse: hdfs://nameservice1/user/hive/warehouse/
   
   create table iceberg_hive_catalog.dhome_db.ods_d_base_inf_229_iceberg (
   `did` string,
   `name` string,
   `address` string,
   `did_seq`  string,
   PRIMARY KEY (did_seq) NOT ENFORCED
   ) with (
    'format-version'='2',
    'write.upsert.enabled'='true',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.metadata.previous-versions-max'='5',
    'flink.rewrite.enable' = 'true',
    'flink.rewrite.parallelism' = '5',
    'flink.rewrite.target-file-size-bytes' = '536870912',
    'flink.rewrite.max-files-count' = '5'
   );
   ```
   ```
   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   TableIdentifier identifier = TableIdentifier.of("dhome_db", "ods_d_base_inf_229_iceberg");
   TableLoader tableLoader = TableLoader.fromCatalog(hive_iceberg, identifier);
   tableLoader.open();
   Table table_iceberg = tableLoader.loadTable();
   
   Actions.forTable(env, table_iceberg)
   		.rewriteDataFiles()
   		.maxParallelism(5)
   		.targetSizeInBytes(128*1024*1024)
   		.execute();
   ```
   
   > The results:
   <img width="1068" alt="result" src="https://user-images.githubusercontent.com/20868410/199470742-abe8e600-0bd5-4c6d-838b-6a96745cd55e.PNG">
   
   
   If there is anything wrong with the question, please correct it, thank you.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SHuixo commented on issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
SHuixo commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1313404740

   > > the repeated compression process of full data
   > 
   > Do you mean each compaction will compact all data? The compaction won't be repeated. I mean during one compaction, some files are compacted to one file. In next compaction, it won't compact these files again. I don't think it's a repeated compression process. From my side, it's still a incremental process.
   > 
   > Back to your problem, actuall, there's a pr #2680 to use disk to avoid OOM. And I think it'll also fix your problem. I think you can try to 1: increase the memory of TM. 2: do compaction more frequently , so that every compaction won't compact much data, which may well relieve the OOM.
   
   The new features of **rocksdb** mentioned in the PR #2680 have not found the relevant configuration items in the latest 1.0.0 version? Do you have to wait for a subsequent version to be added?    
   
   In compressing data it is indeed possible to solve the **OOM** problem by increasing the memory of TaskManger and increasing the compression frequency of data.
   
   However, when processing the Flink cdc real-time stream written to iceberg's table, the checkpoint interval is set to 5 minutes and the compressor interval is 30 minutes, and the **commit exception** problem will be encountered when the data compression task is performed.
   The subsequent restarted compression task will remain in the Map phase until the task processing **timeout exception** and exited.
   
   > commit exception:
   
   ``` 
   2022-11-11 17:06:57,583 WARN  org.apache.iceberg.actions.BaseRewriteDataFilesAction        [] - Failed to commit rewrite, cleaning up rewritten files
   org.apache.iceberg.exceptions.ValidationException: Cannot commit, found new delete for replaced data file: GenericDataFile{content=data, file_path=hdfs://nameservice1/user/hive/warehouse/dhome_db.db/ods_d_base_inf_229_iceberg/data/00001-0-70f034b7-9725-4d90-b1ad-95907d30ed19-00001.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{}, record_count=951380, file_size_in_bytes=38224976, column_sizes=null, value_counts=org.apache.iceberg.util.SerializableMap@187f476, null_value_counts=org.apache.iceberg.util.SerializableMap@17a, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f05dccea, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@ffcfbef8, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=null}
   	at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:50) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.MergingSnapshotProducer.validateNoNewDeletesForDataFiles(MergingSnapshotProducer.java:418) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.MergingSnapshotProducer.validateNoNewDeletesForDataFiles(MergingSnapshotProducer.java:367) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.BaseRewriteFiles.validate(BaseRewriteFiles.java:108) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:175) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:296) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:295) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.actions.BaseSnapshotUpdateAction.commit(BaseSnapshotUpdateAction.java:41) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.actions.BaseRewriteDataFilesAction.doReplace(BaseRewriteDataFilesAction.java:298) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.actions.BaseRewriteDataFilesAction.replaceDataFiles(BaseRewriteDataFilesAction.java:277) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.apache.iceberg.actions.BaseRewriteDataFilesAction.execute(BaseRewriteDataFilesAction.java:252) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at org.XXXX.FlinkCompactIcebergData.main(FlinkCompactIcebergData.java:73) ~[DevXXXX-0.14.1-1.13.5.jar:?]
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_74]
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_74]
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_74]
   	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
   	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.5.jar:1.13.5]
   	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.5.jar:1.13.5]
   	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.5.jar:1.13.5]
   	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.11-1.13.5.jar:1.13.5]
   	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.11-1.13.5.jar:1.13.5]
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_74]
   	at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.11-1.13.5.jar:1.13.5]
   	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.11-1.13.5.jar:1.13.5]
   	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.11-1.13.5.jar:1.13.5]
   	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.5.jar:1.13.5]
   	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.5.jar:1.13.5]
   	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.5.jar:1.13.5]
   	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.5.jar:1.13.5]
   ```
   
   [compact-iceberg-file-commit-exception.log](https://github.com/apache/iceberg/files/10001421/compact-iceberg-file-commit-exception.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SHuixo commented on issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
SHuixo commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1309843130

   Think you @luoyuxia for your replay.
   
   Later, I tried again a few times and found that when the cumulative storage of iceberg data small files was relatively small, the flink 1.13.5 compressed file ran normally and could generate snapshot files.
   
   However, when the file volume accumulates a lot, it takes a long time to rewrite the data each time, and it is easy to cause **OOM** exceptions, here are my attempts at **Flink 1.13.5 / 1.15.2, iceberg 1.14.1**  and the log logs generated by the task.
   
   > The following figure shows that the compression task has been in the Map stage:
   <img width="872" alt="dag-13-1" src="https://user-images.githubusercontent.com/20868410/201018299-e64b3a02-3ff2-4d49-b1cc-e7bdf703f3aa.PNG">
   
   > OOM exception information that occurs when the compression task occurs:
   
   **flink 1.13.5:**
   [error-flink-1.13.5.log](https://github.com/apache/iceberg/files/9978008/error-flink-1.13.5.log)
   
   
   **flink 1.15.2:**
   [error-flink-1.15.2.log](https://github.com/apache/iceberg/files/9978009/error-flink-1.15.2.log)
   
   
   Here I want to ask, if the data is continuously written to iceberg, the problem of data compression OOM is inevitable, and the compression time will become longer and longer.
   
    I see that there are API methods   **appendsBetween() / appendsAfter()** related to incremental compression in the source code, does this mean that incremental compression can be used to replace the repeated compression process of full data in the future?
   
   thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] luoyuxia commented on issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1322899747

   > This means that in the CDC data that is streaming to Iceberg, don't have a viable data compression scheme for data streams that contain delete operations at this stage?
   
   Yes, I'm afraid so.
   
   > Here's a question,is it possible to pause the writer for data compression once, and when the data compression is completed, resume the data writing from the checkpoint again, and handle the above commit exception by cyclically suspending, compressing, and writing again?
   
   I think it's possible. From the code, IIUC, the exception happens when you start a compression, but it find delete files before finish compression. And once it won't produce any deletes files between start and finish a compression, there shouldn't throw exception.
   But I'm not sure, you can have a try and to see whether it works.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SHuixo closed issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
SHuixo closed issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)
URL: https://github.com/apache/iceberg/issues/6104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] luoyuxia commented on issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1310193327

   > the repeated compression process of full data
   The compaction won't be repeated. I mean during one compaction, some files are compacted to one file. In next compaction, it won't compact these files again. I don't think it's a repeated compression process. From my side, it's still a incremental process.
    
   Back to your problem, actuall, there's a pr #2680 to use disk to avoid OOM. And I think it'll also fix your problem.
   I think you can try to
   1:  increase the memorag of TM. 
   2:  do compaction more frequently , so that every compaction won't compact much data, which may well  relieve the OOM.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] luoyuxia commented on issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1305422261

   Seems the files has been compressed, but it fails to to generate snapshot.  Maybe some exception happen when commit.
   Is there any error or warning in the log?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SHuixo commented on issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
SHuixo commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1322093088

   > Yes, we have to wait it to be merged.
   
   Good, looking forward to the merger of this rockdb new feature.
   
   > Had a look about your exception log. The reason is the cdc contains a delete row, but the compaction for such case that contains delete files hasn't been supported.
   
   This means that in the CDC data that is streaming to Iceberg, don't have a viable data compression scheme for data streams that contain delete operations at this stage?
   
   In order to verify whether the **commit exception** in the flow scenario has a similar problem in the batch scenario, we made the following attempts: 
   
   ```
   1. Enable flink CDC streaming writing iceberg, and the checkpoint is 5 minutes; 
   2. When the writer is running, start the compression program until a **commit exception** occurs; 
   3. When the above **commit exception** occurs, stop the CDC data writing program and compression program; 
   4. Turn on the data compression program again until the program is up and running. 
   ```
   
   > The following figure shows the start and end times when flink CDC writes data to iceberg:
   
   <img width="941" alt="stream-write-iceberg" src="https://user-images.githubusercontent.com/20868410/203070483-d39c3107-ca61-4d31-bb30-7d63cf821697.PNG">
   
   > Some of the logs are as follows:
   
   [compact-data-when-stream-write.log](https://github.com/apache/iceberg/files/10056982/compact-data-when-stream-write.log)
   [compact-data-when-write-finish.log](https://github.com/apache/iceberg/files/10056985/compact-data-when-write-finish.log)
   
   
   Here's a question,is it possible to pause the writer for data compression once, and when the data compression is completed, resume the data writing from the checkpoint again, and handle the above commit exception by cyclically suspending, compressing, and writing again?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SHuixo commented on issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
SHuixo commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1322917998

   Yes, we tried it and found it worked. 
   The following two log messages are the results of the compression test.
    
   > [compact-data-when-stream-write.log](https://github.com/apache/iceberg/files/10057144/compact-data-when-stream-write.log)
   > [compact-data-when-write-finished.log](https://github.com/apache/iceberg/files/10057146/compact-data-when-write-finish.log)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] luoyuxia commented on issue #6104: Rewrite iceberg small files with flink succeeds but no snapshot is generated (V2 - upsert model)

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1314686886

   > The new features of rocksdb mentioned in the PR https://github.com/apache/iceberg/pull/2680 have not found the relevant configuration items in the latest 1.0.0 version? Do you have to wait for a subsequent version to be added?
   Yes, we have to wait it to be merged.
   
   > In compressing data it is indeed possible to solve the OOM problem by increasing the memory of TaskManger and increasing the compression frequency of data.
   I agree with you, so I think use rocksdb as backend is a better solution
   
   >  and the commit exception problem will be encountered when the data compression task is performed.
   Had a look about your exception log. The reason is the cdc contains a delete row, but the compaction for such case that contains delete files hasn't been supported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org