You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/12 14:30:41 UTC

[GitHub] [iceberg] felixYyu opened a new issue #3885: [OOM] MERGE INTO table with Spark Structured Streaming

felixYyu opened a new issue #3885:
URL: https://github.com/apache/iceberg/issues/3885


   spark 3.0.1
   iceberg-spark3-runtime 0.12.1
   
   MySQL binlog with Maxwell tool to Kafka
   ```
   val df = spark
         .readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", bootstrapServers)
         .option("subscribe", topic)
         .option("failOnDataLoss", false)
         .option("maxOffsetsPerTrigger", 1000000)
         .load()
   
       val stream = df
         .writeStream
         .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
         .option("fanout-enabled", "true")
         .option("checkpointLocation", checkpointPath)
         .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
           batchDF.persist()
           batchDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").toDF("key", "value")
           // insert/update/delete DML binlog
           batchDF.filter(batchDF("type") === "insert").createOrReplaceTempView("tmp_data_insert")
           batchDF.filter(batchDF("type") === "update").createOrReplaceTempView("tmp_data_update")
           batchDF.filter(batchDF("type") === "delete").createOrReplaceTempView("tmp_data_delete")
           spark.sql("MERGE INTO hadoop_prod.wms.wms_do t " +
                          " USING (SELECT * FROM tmp_data_insert order by partition_col) s" +
                          " ON t.id = s.id AND t.ts > '2022-01-11' " +
                          "WHEN NOT MATCHED THEN INSERT *")
           spark.sql("MERGE INTO hadoop_prod.wms.wms_do t" +
                          " USING (SELECT * FROM tmp_data_insert order by partition_col) s" +
                          " ON t.id = s.id AND t.ts > '2022-01-01' " +
                          "WHEN MATCHED THEN UPDATE SET *")
           spark.sql("DELETE FROM hadoop_prod.wms.wms_do t" +
                          " WHERE EXISTS (SELECT * FROM tmp_data_delete WHERE t.id = id ")
           batchDF.unpersist()
           empty()
         }.start()
   
       stream.awaitTermination()
       stream.stop()
       spark.stop()
   ```
   
   when the job is runing several hours , oom error happen and this job failed.
   
   ```
   Application application_1641894951363_2568 failed 2 times due to AM Container for appattempt_1641894951363_2568_000002 exited with exitCode: -104
   For more detailed output, check application tracking page:http://hadoopmanager136:18088/proxy/application_1641894951363_2568/Then, click on links to logs of each attempt.
   Diagnostics: Container [pid=268684,containerID=container_e91_1641894951363_2568_02_000001] is running beyond physical memory limits. Current usage: 4.7 GB of 4.5 GB physical memory used; 6.8 GB of 9.4 GB virtual memory used. Killing container.
   Dump of the process-tree for container_e91_1641894951363_2568_02_000001 :
   |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
   |- 268689 268684 268684 268684 (java) 20942 1263 7232856064 1229083 /usr/java/jdk1.8.0_181/bin/java -server -Xmx4096m -Djava.io.tmpdir=/data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/tmp -Dspark.yarn.app.container.log.dir=/data003/yarn/container-logs/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class com.jiuye.data.lake.incubation.v2.IcebergMergeData --jar file:/opt/maintain/scripts/iceberg/iceberg-sql-jar/rds-datalake-1.0.0.jar --properties-file /data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/__spark_conf__/__spark_conf__.properties --dist-cache-conf /data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/__spark_conf__/__spark_dist_cache__.properties
   |- 268684 268681 268684 268684 (bash) 0 0 115847168 671 /bin/bash -c /usr/java/jdk1.8.0_181/bin/java -server -Xmx4096m -Djava.io.tmpdir=/data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/tmp -Dspark.yarn.app.container.log.dir=/data003/yarn/container-logs/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'com.jiuye.data.lake.incubation.v2.IcebergMergeData' --jar file:/opt/maintain/scripts/iceberg/iceberg-sql-jar/rds-datalake-1.0.0.jar --properties-file /data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/__spark_conf__/__spark_conf__.properties --dist-cache-conf /data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/__spark_conf__/__spark_dist_cache__.properties 1> /data003/yarn/container-logs/application_1641894951363_2568/
 container_e91_1641894951363_2568_02_000001/stdout 2> /data003/yarn/container-logs/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/stderr
   Container killed on request. Exit code is 143
   Container exited with a non-zero exit code 143
   Failing this attempt. Failing the application.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #3885: [OOM] MERGE INTO table with Spark Structured Streaming

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #3885:
URL: https://github.com/apache/iceberg/issues/3885#issuecomment-1015171549


   `unpersist` is by default a non-blocking operation. You might consider passing `blocking=true` (I believe that's the argument) to ensure that the dataframe is truly unpersisted when you make that call. This can add time of course, as it's blocking, but will lower the likelihood of OOMs if that's where you think they are coming from.
   
   A small adjustment to see how it's working. But I would go with Russell's answer.
   
   Also, if you update to Spark 3.1, you can use multiple MACHED statements in the same query, which could also majorly reduce your runtime (or at least code complexity).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on issue #3885: [OOM] MERGE INTO table with Spark Structured Streaming

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on issue #3885:
URL: https://github.com/apache/iceberg/issues/3885#issuecomment-1012319398


   OOM's can happen for a lot of reasons, did you see a memory leak? Or monotonically increasing heap size? I would make sure it isn't one particular merge causing the issue before diagnosing. 
   
   The Error there looks like an Operating System kill, so it's not explicitly a JVM thing either. It could just be that the container is too small for the heap + offheap usage configured in spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] felixYyu commented on issue #3885: [OOM] MERGE INTO table with Spark Structured Streaming

Posted by GitBox <gi...@apache.org>.
felixYyu commented on issue #3885:
URL: https://github.com/apache/iceberg/issues/3885#issuecomment-1011109129


   I think each batch DataFrame didn't unpersist(), I don't know the real reason of runtime exception ,how to MERGE INTO table with Structured Streaming writeStream foreachBatch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] felixYyu closed issue #3885: [OOM] MERGE INTO table with Spark Structured Streaming

Posted by GitBox <gi...@apache.org>.
felixYyu closed issue #3885:
URL: https://github.com/apache/iceberg/issues/3885


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] felixYyu commented on issue #3885: [OOM] MERGE INTO table with Spark Structured Streaming

Posted by GitBox <gi...@apache.org>.
felixYyu commented on issue #3885:
URL: https://github.com/apache/iceberg/issues/3885#issuecomment-1022008157


   thanks @RussellSpitzer @kbendick , I will test this case in higher version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org