You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/12 07:59:42 UTC

[GitHub] [hudi] raviMoengage opened a new issue, #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

raviMoengage opened a new issue, #5565:
URL: https://github.com/apache/hudi/issues/5565

   
   **Describe the problem you faced**
   
   We are unable to make async compaction work on the MOR table using spark streaming.
   
   **Expected behavior**
   
   As per the [documentation](https://hudi.apache.org/docs/compaction#spark-structured-streaming) spark-structured streaming should have async compaction enabled by default for MOR . 
   
   **Environment Description**
   
   * Hudi version : 0.11.0
   
   * Spark version : 3.1.3
   
   * Hadoop version : 3.2.0
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   ### Async compaction
    These configuration are used for enabling async compaction.
   
   ```
   hoodie.datasource.compaction.async.enable = true
   hoodie.compact.inline.max.delta.commits = 1 
   ```
   
   Async compaction is not enabled, here are the sample logs
   ```
   22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Commit 20220512003040185 successful!
   22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Config.inlineCompactionEnabled ? false
   22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Compaction Scheduled is Optional.empty
   22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Config.asyncClusteringEnabled ? false
   22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Clustering Scheduled is Optional.empty
   22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Is Async Compaction Enabled ? false
   ```
   
   **Context**
   
   - The default value of [asyncCompactionTriggerFnDefined](https://github.com/apache/hudi/blob/release-0.11.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L64) is false.
   
   - Since `asyncCompactionTriggerFn` has Option.empty as default value hence it stays false here [HoodieSparkSqlWriter.scala ](https://github.com/apache/hudi/blob/release-0.11.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L100)
   
   - So `isAsyncCompactionEnabled` function return false because `asyncCompactionTriggerFnDefined` is false. [HoodieSparkSqlWriter.scala ](https://github.com/apache/hudi/blob/release-0.11.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L709)
   
   
   Hudi Configuration.
   ```
   hudi-conf = {
   "hoodie.table.name" = table_name
   "hoodie.datasource.write.table.name" = table_name
   "hoodie.datasource.write.table.type" = MERGE_ON_READ
   "hoodie.datasource.write.operation" = upsert
   "hoodie.datasource.write.recordkey.field" = record_key
   "hoodie.datasource.write.precombine.field" = time
   "hoodie.datasource.write.hive_style_partitioning" = true
   "hoodie.datasource.write.partitionpath.field" = key_part
   "hoodie.datasource.write.streaming.ignore.failed.batch" = true
   "hoodie.file.index.enable" = true
   "hoodie.index.type" = SIMPLE
   "hoodie.cleaner.policy" = KEEP_LATEST_COMMITS
   "hoodie.cleaner.delete.bootstrap.base.file" = false
   "hoodie.clean.async" = true
   "hoodie.clean.automatic" = true
   "hoodie.cleaner.commits.retained" = 10
   "hoodie.cleaner.parallelism" = 300
   "hoodie.cleaner.incremental.mode" = true
   "hoodie.cleaner.policy.failed.writes" = LAZY
   "hoodie.datasource.compaction.async.enable"=true
   "hoodie.compact.inline.max.delta.commits"=1
   "hoodie.insert.shuffle.parallelism" = 300
   "hoodie.upsert.shuffle.parallelism" = 300
   "hoodie.write.concurrency.mode" = optimistic_concurrency_control
   "hoodie.write.lock.provider" = "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider"
   "hoodie.write.lock.zookeeper.url" = localhost
   "hoodie.write.lock.zookeeper.port" = 2181
   "hoodie.write.lock.zookeeper.lock_key" = device
   "hoodie.write.lock.zookeeper.base_path" = /hudi-datalake
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125511982

   Can you show us how you are actually writing to hudi. there is forEachBatch and there is data stream way. forEachBatch is actually directly writing to spark-datasource writes. So likely you are taking that route. 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] raviMoengage commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
raviMoengage commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1126034047

   Hi @nsivabalan,
   
   Yes, I am using forEachBatch for writing in hudi format same as above. True that with `spark-data source writes, async table services won't kick in.`
   
   As per [FAQ](https://hudi.apache.org/learn/faq/#what-options-do-i-have-for-asynchronousoffline-compactions-on-mor-dataset)
   ```
   Alternately, from 0.11.0, to avoid dependency on lock providers, scheduling alone can be done inline by regular writer using the config hoodie.compact.schedule.inline. 
   And compaction execution can be done offline by periodically triggering the Hudi Compactor Utility or Hudi CLI.
   ```
   I tried this `hoodie.compact.schedule.inline = true `, but there is no compaction getting scheduled.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125515116

   above examples are for COW. you can replace it w/ MOR. also, for MOR, you don't even need to set hoodie.datasource.compaction.async.enable explicitly. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] raviMoengage commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
raviMoengage commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1130005623

   Hi @nsivabalan, After checking again. I found out there was 2 `compaction.inflight` timeline instant.
   
   So I changed the base path and tried the following config and it's working fine.
   ```
   option("hoodie.compact.schedule.inline","true").
   option("hoodie.compact.inline.max.delta.commits","1").
   ```
   For the 2 inflight compactions, I had to run offline compaction in execute mode with instant-time.
   
   Thanks for the support. 
   
   Closing the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] raviMoengage closed issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
raviMoengage closed issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
URL: https://github.com/apache/hudi/issues/5565


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125513315

   here is the forEachBatch that I have come across from some other community user 
   ```
   
   
     val query = df.
     writeStream.
     queryName("demo").
     foreachBatch { (batchDF: org.apache.spark.sql.DataFrame, _: Long) => { 
           batchDF.persist() 
   	        println(LocalDateTime.now() + " start writing cow table") 
   	        batchDF.write.format("org.apache.hudi").
   	        option(TABLE_TYPE.key, "COPY_ON_WRITE").
   	        option(PRECOMBINE_FIELD.key, "kafka_timestamp").
   	        option(RECORDKEY_FIELD.key, "kafka_partition_offset").
   	        option(PARTITIONPATH_FIELD.key, "partition_date").
   	        option(HIVE_SYNC_ENABLED.key, false).
   	        option(HIVE_STYLE_PARTITIONING.key, true).
   	        option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
   	        option(STREAMING_IGNORE_FAILED_BATCH.key, false).
   	        option(STREAMING_RETRY_CNT.key, 0).
   	        option("hoodie.table.name", "copy_on_write_table").
   	        option("hoodie.clustering.plan.strategy.target.file.max.bytes","10485760").
   	        option("hoodie.clustering.plan.strategy.small.file.limit","50000").
   	        option("hoodie.clustering.plan.strategy.sort.columns","partition_date,kafka_partition_offset").
   	        option("hoodie.clustering.async.max.commits","2").
   	        option("hoodie.datasource.clustering.async.enable","true").
   	        option("hoodie.clustering.async.enabled","true").
   	        option("hoodie.clustering.updates.strategy","org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").
   	        mode(Append).
   	        save("/tmp/hudi_streaming_kafka/COPY_ON_WRITE")
   	        println(LocalDateTime.now() + " finish")
           batchDF.unpersist()
           }
         }.option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").start()
   
   ```
   
   this uses spark-datasource writes and not using streaming sink to write to hudi. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1126350573

   here is the sample script I tried w/ spark-datasource directly. 
   
   ```
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   
   val tableName = "hudi_trips_cow"
   val basePath = "file:///tmp/hudi_trips_cow"
   val dataGen = new DataGenerator
   
   // spark-shell
   val inserts = convertToStringList(dataGen.generateInserts(10))
   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   
   import org.apache.spark.sql.functions.lit;
   
   
   df.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option(PARTITIONPATH_FIELD_OPT_KEY, "ppath").
     option(TABLE_NAME, tableName).
     option("hoodie.datasource.write.table.type","MERGE_ON_READ").
     mode(Overwrite).
     save(basePath)
   
   
   
   df.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option(PARTITIONPATH_FIELD_OPT_KEY, "ppath").
     option(TABLE_NAME, tableName).
     option("hoodie.datasource.write.table.type","MERGE_ON_READ").
     option("hoodie.compact.schedule.inline","true").
     option("hoodie.compact.inline.max.delta.commits","1").
     mode(Append).
     save(basePath)
   
   // there should be 2 delta commits. 
   ```
   in another shell 
   ```
   ls -ltr /tmp/hudi_trips_cow/.hoodie/
   total 40
   drwxr-xr-x  2 nsb  wheel    64 May 13 14:45 archived
   -rw-r--r--  1 nsb  wheel     0 May 13 14:45 20220513144543292.deltacommit.requested
   drwxr-xr-x  4 nsb  wheel   128 May 13 14:45 metadata
   -rw-r--r--  1 nsb  wheel   903 May 13 14:45 hoodie.properties
   -rw-r--r--  1 nsb  wheel  1778 May 13 14:45 20220513144543292.deltacommit.inflight
   -rw-r--r--  1 nsb  wheel  2993 May 13 14:45 20220513144543292.deltacommit
   -rw-r--r--  1 nsb  wheel     0 May 13 14:46 20220513144616019.deltacommit.requested
   -rw-r--r--  1 nsb  wheel  3123 May 13 14:46 20220513144616019.deltacommit.inflight
   -rw-r--r--  1 nsb  wheel  3799 May 13 14:46 20220513144616019.deltacommit
   ```
   
   now,lets try to schedule compaction. 
   continue w/ same spark shell as before
   ```
   df.write.format("hudi").
        |   options(getQuickstartWriteConfigs).
        |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        |   option(PARTITIONPATH_FIELD_OPT_KEY, "ppath").
        |   option(TABLE_NAME, tableName).
        |   option("hoodie.datasource.write.table.type","MERGE_ON_READ").
        |   option("hoodie.compact.schedule.inline","true").
        |   option("hoodie.compact.inline.max.delta.commits","1").
        |   mode(Append).
        |   save(basePath)
   ```
   
   
   now lets list the .hoodie
   ```
   ls -ltr /tmp/hudi_trips_cow/.hoodie/
   total 64
   drwxr-xr-x  2 nsb  wheel    64 May 13 14:45 archived
   -rw-r--r--  1 nsb  wheel     0 May 13 14:45 20220513144543292.deltacommit.requested
   drwxr-xr-x  4 nsb  wheel   128 May 13 14:45 metadata
   -rw-r--r--  1 nsb  wheel   903 May 13 14:45 hoodie.properties
   -rw-r--r--  1 nsb  wheel  1778 May 13 14:45 20220513144543292.deltacommit.inflight
   -rw-r--r--  1 nsb  wheel  2993 May 13 14:45 20220513144543292.deltacommit
   -rw-r--r--  1 nsb  wheel     0 May 13 14:46 20220513144616019.deltacommit.requested
   -rw-r--r--  1 nsb  wheel  3123 May 13 14:46 20220513144616019.deltacommit.inflight
   -rw-r--r--  1 nsb  wheel  3799 May 13 14:46 20220513144616019.deltacommit
   -rw-r--r--  1 nsb  wheel     0 May 13 14:47 20220513144711915.deltacommit.requested
   -rw-r--r--  1 nsb  wheel  3123 May 13 14:47 20220513144711915.deltacommit.inflight
   -rw-r--r--  1 nsb  wheel  3953 May 13 14:47 20220513144711915.deltacommit
   -rw-r--r--  1 nsb  wheel  1651 May 13 14:47 20220513144714578.compaction.requested
   ```
   
   Check out the last entry. its compaction.requested. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125512163

   w/ spark-data source writes, async table services won't kick in. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1126350936

   only two configs I added are 
   ```
   option("hoodie.compact.schedule.inline","true").
   option("hoodie.compact.inline.max.delta.commits","1").
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125512915

   sample test script that worked for me
   ```
   import org.apache.spark.sql.SparkSession
   import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
   import org.apache.spark.sql.avro.to_avro
   import org.apache.spark.sql.types.{IntegerType, StringType, LongType, StructType}
   import java.time.LocalDateTime
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.spark.sql.streaming.OutputMode;
   import org.apache.spark.sql.streaming.ProcessingTime;
   
   val dataStreamReader = spark.
         readStream.
         format("kafka").
         option("kafka.bootstrap.servers", "localhost:9092").
         option("subscribe", "impressions").
         option("startingOffsets", "earliest").
         option("maxOffsetsPerTrigger", 5000).
         option("failOnDataLoss", false)
   
    val schema = new StructType().
         add("impresssiontime",LongType).
         add("impressionid",StringType).
         add("userid",StringType).
         add("adid",StringType)
   
    val df = dataStreamReader.load().
    selectExpr(
           "topic as kafka_topic",
           "CAST(partition AS STRING) kafka_partition",
           "cast(timestamp as String) kafka_timestamp",
           "CAST(offset AS STRING) kafka_offset",
           "CAST(key AS STRING) kafka_key",
           "CAST(value AS STRING) kafka_value",
           "current_timestamp() current_time").
           selectExpr(
           "kafka_topic",
           "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
           "kafka_offset",
           "kafka_timestamp",
           "kafka_key",
           "kafka_value",
           "substr(current_time,1,10) partition_date").select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"), schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid", "data.userid","data.adid","partition_date")
   
   // writing to hudi
     val writer = df.
     writeStream.format("org.apache.hudi").
     option(TABLE_TYPE.key, "COPY_ON_WRITE").
             option(PRECOMBINE_FIELD.key, "impresssiontime").
             option(RECORDKEY_FIELD.key, "adid").
             option(PARTITIONPATH_FIELD.key, "userid").
             option(HIVE_SYNC_ENABLED.key, false).
             option(HIVE_STYLE_PARTITIONING.key, true).
             option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
             option(STREAMING_IGNORE_FAILED_BATCH.key, false).
             option(STREAMING_RETRY_CNT.key, 0).
             option("hoodie.table.name", "copy_on_write_table").
             option("hoodie.cleaner.commits.retained","6").
             option("hoodie.keep.min.commits","10").
             option("hoodie.keep.max.commits","15").
             option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").
             outputMode(OutputMode.Append());
   
    writer.trigger(new ProcessingTime(30000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE");
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org