You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/12 07:59:42 UTC
[GitHub] [hudi] raviMoengage opened a new issue, #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
raviMoengage opened a new issue, #5565:
URL: https://github.com/apache/hudi/issues/5565
**Describe the problem you faced**
We are unable to make async compaction work on the MOR table using spark streaming.
**Expected behavior**
As per the [documentation](https://hudi.apache.org/docs/compaction#spark-structured-streaming) spark-structured streaming should have async compaction enabled by default for MOR .
**Environment Description**
* Hudi version : 0.11.0
* Spark version : 3.1.3
* Hadoop version : 3.2.0
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : No
### Async compaction
These configuration are used for enabling async compaction.
```
hoodie.datasource.compaction.async.enable = true
hoodie.compact.inline.max.delta.commits = 1
```
Async compaction is not enabled, here are the sample logs
```
22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Commit 20220512003040185 successful!
22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Config.inlineCompactionEnabled ? false
22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Compaction Scheduled is Optional.empty
22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Config.asyncClusteringEnabled ? false
22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Clustering Scheduled is Optional.empty
22/05/12 00:49:22 INFO HoodieSparkSqlWriter$: Is Async Compaction Enabled ? false
```
**Context**
- The default value of [asyncCompactionTriggerFnDefined](https://github.com/apache/hudi/blob/release-0.11.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L64) is false.
- Since `asyncCompactionTriggerFn` has Option.empty as default value hence it stays false here [HoodieSparkSqlWriter.scala ](https://github.com/apache/hudi/blob/release-0.11.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L100)
- So `isAsyncCompactionEnabled` function return false because `asyncCompactionTriggerFnDefined` is false. [HoodieSparkSqlWriter.scala ](https://github.com/apache/hudi/blob/release-0.11.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L709)
Hudi Configuration.
```
hudi-conf = {
"hoodie.table.name" = table_name
"hoodie.datasource.write.table.name" = table_name
"hoodie.datasource.write.table.type" = MERGE_ON_READ
"hoodie.datasource.write.operation" = upsert
"hoodie.datasource.write.recordkey.field" = record_key
"hoodie.datasource.write.precombine.field" = time
"hoodie.datasource.write.hive_style_partitioning" = true
"hoodie.datasource.write.partitionpath.field" = key_part
"hoodie.datasource.write.streaming.ignore.failed.batch" = true
"hoodie.file.index.enable" = true
"hoodie.index.type" = SIMPLE
"hoodie.cleaner.policy" = KEEP_LATEST_COMMITS
"hoodie.cleaner.delete.bootstrap.base.file" = false
"hoodie.clean.async" = true
"hoodie.clean.automatic" = true
"hoodie.cleaner.commits.retained" = 10
"hoodie.cleaner.parallelism" = 300
"hoodie.cleaner.incremental.mode" = true
"hoodie.cleaner.policy.failed.writes" = LAZY
"hoodie.datasource.compaction.async.enable"=true
"hoodie.compact.inline.max.delta.commits"=1
"hoodie.insert.shuffle.parallelism" = 300
"hoodie.upsert.shuffle.parallelism" = 300
"hoodie.write.concurrency.mode" = optimistic_concurrency_control
"hoodie.write.lock.provider" = "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider"
"hoodie.write.lock.zookeeper.url" = localhost
"hoodie.write.lock.zookeeper.port" = 2181
"hoodie.write.lock.zookeeper.lock_key" = device
"hoodie.write.lock.zookeeper.base_path" = /hudi-datalake
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125511982
Can you show us how you are actually writing to hudi. there is forEachBatch and there is data stream way. forEachBatch is actually directly writing to spark-datasource writes. So likely you are taking that route.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] raviMoengage commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
raviMoengage commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1126034047
Hi @nsivabalan,
Yes, I am using forEachBatch for writing in hudi format same as above. True that with `spark-data source writes, async table services won't kick in.`
As per [FAQ](https://hudi.apache.org/learn/faq/#what-options-do-i-have-for-asynchronousoffline-compactions-on-mor-dataset)
```
Alternately, from 0.11.0, to avoid dependency on lock providers, scheduling alone can be done inline by regular writer using the config hoodie.compact.schedule.inline.
And compaction execution can be done offline by periodically triggering the Hudi Compactor Utility or Hudi CLI.
```
I tried this `hoodie.compact.schedule.inline = true `, but there is no compaction getting scheduled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125515116
above examples are for COW. you can replace it w/ MOR. also, for MOR, you don't even need to set hoodie.datasource.compaction.async.enable explicitly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] raviMoengage commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
raviMoengage commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1130005623
Hi @nsivabalan, After checking again. I found out there was 2 `compaction.inflight` timeline instant.
So I changed the base path and tried the following config and it's working fine.
```
option("hoodie.compact.schedule.inline","true").
option("hoodie.compact.inline.max.delta.commits","1").
```
For the 2 inflight compactions, I had to run offline compaction in execute mode with instant-time.
Thanks for the support.
Closing the issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] raviMoengage closed issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
raviMoengage closed issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
URL: https://github.com/apache/hudi/issues/5565
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125513315
here is the forEachBatch that I have come across from some other community user
```
val query = df.
writeStream.
queryName("demo").
foreachBatch { (batchDF: org.apache.spark.sql.DataFrame, _: Long) => {
batchDF.persist()
println(LocalDateTime.now() + " start writing cow table")
batchDF.write.format("org.apache.hudi").
option(TABLE_TYPE.key, "COPY_ON_WRITE").
option(PRECOMBINE_FIELD.key, "kafka_timestamp").
option(RECORDKEY_FIELD.key, "kafka_partition_offset").
option(PARTITIONPATH_FIELD.key, "partition_date").
option(HIVE_SYNC_ENABLED.key, false).
option(HIVE_STYLE_PARTITIONING.key, true).
option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
option(STREAMING_IGNORE_FAILED_BATCH.key, false).
option(STREAMING_RETRY_CNT.key, 0).
option("hoodie.table.name", "copy_on_write_table").
option("hoodie.clustering.plan.strategy.target.file.max.bytes","10485760").
option("hoodie.clustering.plan.strategy.small.file.limit","50000").
option("hoodie.clustering.plan.strategy.sort.columns","partition_date,kafka_partition_offset").
option("hoodie.clustering.async.max.commits","2").
option("hoodie.datasource.clustering.async.enable","true").
option("hoodie.clustering.async.enabled","true").
option("hoodie.clustering.updates.strategy","org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").
mode(Append).
save("/tmp/hudi_streaming_kafka/COPY_ON_WRITE")
println(LocalDateTime.now() + " finish")
batchDF.unpersist()
}
}.option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").start()
```
this uses spark-datasource writes and not using streaming sink to write to hudi.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1126350573
here is the sample script I tried w/ spark-datasource directly.
```
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
import org.apache.spark.sql.functions.lit;
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "ppath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Overwrite).
save(basePath)
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "ppath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.compact.schedule.inline","true").
option("hoodie.compact.inline.max.delta.commits","1").
mode(Append).
save(basePath)
// there should be 2 delta commits.
```
in another shell
```
ls -ltr /tmp/hudi_trips_cow/.hoodie/
total 40
drwxr-xr-x 2 nsb wheel 64 May 13 14:45 archived
-rw-r--r-- 1 nsb wheel 0 May 13 14:45 20220513144543292.deltacommit.requested
drwxr-xr-x 4 nsb wheel 128 May 13 14:45 metadata
-rw-r--r-- 1 nsb wheel 903 May 13 14:45 hoodie.properties
-rw-r--r-- 1 nsb wheel 1778 May 13 14:45 20220513144543292.deltacommit.inflight
-rw-r--r-- 1 nsb wheel 2993 May 13 14:45 20220513144543292.deltacommit
-rw-r--r-- 1 nsb wheel 0 May 13 14:46 20220513144616019.deltacommit.requested
-rw-r--r-- 1 nsb wheel 3123 May 13 14:46 20220513144616019.deltacommit.inflight
-rw-r--r-- 1 nsb wheel 3799 May 13 14:46 20220513144616019.deltacommit
```
now,lets try to schedule compaction.
continue w/ same spark shell as before
```
df.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "ppath").
| option(TABLE_NAME, tableName).
| option("hoodie.datasource.write.table.type","MERGE_ON_READ").
| option("hoodie.compact.schedule.inline","true").
| option("hoodie.compact.inline.max.delta.commits","1").
| mode(Append).
| save(basePath)
```
now lets list the .hoodie
```
ls -ltr /tmp/hudi_trips_cow/.hoodie/
total 64
drwxr-xr-x 2 nsb wheel 64 May 13 14:45 archived
-rw-r--r-- 1 nsb wheel 0 May 13 14:45 20220513144543292.deltacommit.requested
drwxr-xr-x 4 nsb wheel 128 May 13 14:45 metadata
-rw-r--r-- 1 nsb wheel 903 May 13 14:45 hoodie.properties
-rw-r--r-- 1 nsb wheel 1778 May 13 14:45 20220513144543292.deltacommit.inflight
-rw-r--r-- 1 nsb wheel 2993 May 13 14:45 20220513144543292.deltacommit
-rw-r--r-- 1 nsb wheel 0 May 13 14:46 20220513144616019.deltacommit.requested
-rw-r--r-- 1 nsb wheel 3123 May 13 14:46 20220513144616019.deltacommit.inflight
-rw-r--r-- 1 nsb wheel 3799 May 13 14:46 20220513144616019.deltacommit
-rw-r--r-- 1 nsb wheel 0 May 13 14:47 20220513144711915.deltacommit.requested
-rw-r--r-- 1 nsb wheel 3123 May 13 14:47 20220513144711915.deltacommit.inflight
-rw-r--r-- 1 nsb wheel 3953 May 13 14:47 20220513144711915.deltacommit
-rw-r--r-- 1 nsb wheel 1651 May 13 14:47 20220513144714578.compaction.requested
```
Check out the last entry. its compaction.requested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125512163
w/ spark-data source writes, async table services won't kick in.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1126350936
only two configs I added are
```
option("hoodie.compact.schedule.inline","true").
option("hoodie.compact.inline.max.delta.commits","1").
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] nsivabalan commented on issue #5565: [SUPPORT] Async compaction is not triggered on the MOR Hudi table using spark streaming
Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125512915
sample test script that worked for me
```
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.types.{IntegerType, StringType, LongType, StructType}
import java.time.LocalDateTime
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;
val dataStreamReader = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("subscribe", "impressions").
option("startingOffsets", "earliest").
option("maxOffsetsPerTrigger", 5000).
option("failOnDataLoss", false)
val schema = new StructType().
add("impresssiontime",LongType).
add("impressionid",StringType).
add("userid",StringType).
add("adid",StringType)
val df = dataStreamReader.load().
selectExpr(
"topic as kafka_topic",
"CAST(partition AS STRING) kafka_partition",
"cast(timestamp as String) kafka_timestamp",
"CAST(offset AS STRING) kafka_offset",
"CAST(key AS STRING) kafka_key",
"CAST(value AS STRING) kafka_value",
"current_timestamp() current_time").
selectExpr(
"kafka_topic",
"concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
"kafka_offset",
"kafka_timestamp",
"kafka_key",
"kafka_value",
"substr(current_time,1,10) partition_date").select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"), schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid", "data.userid","data.adid","partition_date")
// writing to hudi
val writer = df.
writeStream.format("org.apache.hudi").
option(TABLE_TYPE.key, "COPY_ON_WRITE").
option(PRECOMBINE_FIELD.key, "impresssiontime").
option(RECORDKEY_FIELD.key, "adid").
option(PARTITIONPATH_FIELD.key, "userid").
option(HIVE_SYNC_ENABLED.key, false).
option(HIVE_STYLE_PARTITIONING.key, true).
option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
option(STREAMING_IGNORE_FAILED_BATCH.key, false).
option(STREAMING_RETRY_CNT.key, 0).
option("hoodie.table.name", "copy_on_write_table").
option("hoodie.cleaner.commits.retained","6").
option("hoodie.keep.min.commits","10").
option("hoodie.keep.max.commits","15").
option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").
outputMode(OutputMode.Append());
writer.trigger(new ProcessingTime(30000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org