You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/21 13:25:05 UTC

[GitHub] [hudi] bkosuru opened a new issue #4864: [SUPPORT]

bkosuru opened a new issue #4864:
URL: https://github.com/apache/hudi/issues/4864


   Hello,
   
   Insert with INSERT_DROP_DUPS_OPT_KEY fails after several hours. Any suggestions to make it work? See the details below.
   We want to prevent inserting duplicate records.
   
   Hudi table size: 13.4 TB
   Data size to insert: 3.8TB (uncompressed) {failed for 200GB input also}
   The table has 2 partitions - spog/g=g1/p=p1
   The data to be inserted belongs to one partition g=g2
   The partition size is for g=g2 is 2TB
   g2 has 44 p partitions with sizes ranging from 1.3 M to 270G
   
   Environment Description:
   Hudi version : 0.8.0
   Spark version : 2.4.4
   Storage (HDFS/S3/GCS..) : HDFS
   Running on Docker? (yes/no) : No
   Table type: COW
   
   Spark settings:
   new SparkConf()
   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   .set("spark.ui.enabled", "false")
   .set("spark.sql.parquet.mergeSchema", "false")
   .set("spark.sql.files.ignoreCorruptFiles", "true")
   .set("spark.sql.hive.convertMetastoreParquet", "false")
   
   --driver-memory 25G \
   --executor-memory 50G \
   --executor-cores 2 \
   --num-executors 400 \
   --conf spark.dynamicAllocation.enabled=False \
   --conf spark.network.timeout=240s \
   --conf spark.shuffle.sasl.timeout=60000 \
   --conf spark.driver.maxResultSize=20g \
   --conf spark.port.maxRetries=60 \
   --conf spark.shuffle.service.enabled=True \
   --conf spark.sql.shuffle.partitions=3000 \
   --conf "spark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
   --conf "spark.executor.extraJavaOptions=-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
   --conf spark.driver.memoryOverhead=1024 \
   --conf spark.executor.memoryOverhead=3072 \
   --conf spark.yarn.max.executor.failures=100 \
   --conf spark.kryoserializer.buffer.max=512m \
   --conf spark.task.maxFailures=4 \
   --conf spark.rdd.compress=True \
   
   private val AVG_RECORD_SIZE: Int =
   256 // approx bytes of our average record, contra Hudi default assumption of 1024
   private val ONE_GIGABYTE: Int =
   1024 * 1024 * 1024 // used for Parquet file size & block size
   private val BLOOM_MAX_ENTRIES: Int = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)
   
   df.write
   .format("hudi")
   // DataSourceWriteOptions
   .option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
   .option( KEYGENERATOR_CLASS_OPT_KEY,"com.xyz.SpoKeyGenerator")
   .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL)
   .option(INSERT_DROP_DUPS_OPT_KEY, value = true)
   .option(INSERT_PARALLELISM, 2000)
   .option(PARTITIONPATH_FIELD_OPT_KEY, "g,p")
   .option(PRECOMBINE_FIELD_OPT_KEY, "isDeleted")
   .option(RECORDKEY_FIELD_OPT_KEY, "s,o")
   .option(URL_ENCODE_PARTITIONING_OPT_KEY, value = true)
   // HoodieIndexConfig
   .option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES)
   .option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)
   // HoodieCompactionConfig
   .option(COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 64)
   // HoodieStorageConfig
   .option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
   .option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
   .option(PARQUET_FILE_MAX_BYTES,ONE_GIGABYTE)
   // Commit history
   .option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
   .option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
   .option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)
   // HoodieWriteConfig
   .option(EMBEDDED_TIMELINE_SERVER_ENABLED, "false")
   .option(TABLE_NAME, "spog")
   .mode(SaveMode.Append)
   
   class SpoKeyGenerator(props: TypedProperties)
   extends ComplexKeyGenerator(props) {
   
   def hash128(s: String): String = {
   val h: Array[Long] = MurmurHash3.hash128(s.getBytes)
   h(0).toString + h(1).toString
   }
   
   override def getRecordKey(record: GenericRecord): String = {
   val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false)
   val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false)
   genKey(s, o)
   }
   
   private def genKey(s: String, o: String): String = hash128(s + o)
   
   override def getRecordKey(row: Row): String = {
   val s = row.getAs(0).toString
   val o = row.getAs(1).toString
   genKey(s, o)
   }
   
   }
   
   Thanks,
   Bindu


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1073106703


   from the screen shot, it seems index lookup is failing. I don't see any failure wrt drop duplicates. 
   Can you try tuning the bloom index configs?
   If your data is immutable, you can try setting the operation type to "insert". It may not involve any index look up. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1061352779


   @harsh1231 : Can you please follow up on this when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bkosuru commented on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
bkosuru commented on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1047227041


   Hi Sivabalan, 
   Without INSERT_DROP_DUPS_OPT_KEY setting, the job runs fine. Here is the stack trace-
   
   User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 198 in stage 7.0 failed 4 times, most recent failure: Lost task 198.3 in stage 7.0 (TID 8888, xyz1.cnet.com, executor 467): ExecutorLostFailure (executor 467 exited caused by one of the running tasks) Reason: Container marked as failed: container_e330_16441790_15827_02_00078 on host: xyz1.cnet.com. Exit status: 143. Diagnostics: [2022-02-13 08:14:04.532]Container killed on request. Exit code is 143
   [2022-02-13 08:14:04.532]Container exited with a non-zero exit code 143. 
   [2022-02-13 08:14:04.537]Killed by external signal
   
   Driver stacktrace:
   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1892)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1880)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:930)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:930)
   at scala.Option.foreach(Option.scala:257)
   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:930)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2113)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2062)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2051)
   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:741)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
   at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1386)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
   at org.apache.spark.rdd.RDD.take(RDD.scala:1359)
   at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1494)
   at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1494)
   at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1494)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
   at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1493)
   at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
   at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
   at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:181)
   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
   at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
   at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
   at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
   at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
   at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
   at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
   at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
   at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
   at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
   at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1047199352


   may I know whats the exception you are seeing? can you provide us w/ stacktrace. And is it that w/o setting INSERT_DROP_DUPS_OPT_KEY, your job runs fine and its a perf issue only when you set this config? 
   can you throw some more light please. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bkosuru commented on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
bkosuru commented on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1073231293


   Hi Sivabalan,
   Could you please give some suggestions for tuning bloom index configs? Our data is immutable but we have duplicate data. We want to insert unique rows only. We have allocated enough resources(400 executors, 50G) and it still fails. Do you think we should allocate more? Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bkosuru commented on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
bkosuru commented on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1047229347


   ![sparkUI1](https://user-images.githubusercontent.com/7408351/155026411-a61a6f80-82e1-4f5a-b864-21d839eda880.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bkosuru edited a comment on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
bkosuru edited a comment on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1073231293


   Hi @nsivabalan,
   
   Could you please give some suggestions for tuning bloom index configs? Our data is immutable but we have duplicate data. We want to insert unique rows only. We have allocated enough resources(400 executors, 50G) and it still fails. Do you think we should allocate more resources? Is there a way to insert_drop_dup to a single partition to make it more efficient. We know that the data we are going to insert belongs to a single partition. 
   
   Thanks!
   Bindu


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1047199464


   CC @harsh1231 perf ticket. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bkosuru commented on issue #4864: Insert with INSERT_DROP_DUPS_OPT_KEY fails

Posted by GitBox <gi...@apache.org>.
bkosuru commented on issue #4864:
URL: https://github.com/apache/hudi/issues/4864#issuecomment-1047229501


   ![sparkUI2](https://user-images.githubusercontent.com/7408351/155026427-e6f02aea-af1a-4a9f-b1dd-96d99286fe06.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org