You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/18 08:18:47 UTC
[GitHub] [hudi] shahiidiqbal commented on issue #4839: Hudi upsert doesnt trigger compaction for MOR
shahiidiqbal commented on issue #4839:
URL: https://github.com/apache/hudi/issues/4839#issuecomment-1044122188
@nsivabalan
We use Kafka-connect to get changes (CDC) from MongoDB and after that we use Spark streaming to read topics from Kafka and store data into our data lake through Hudi.
Here is the spark streaming code to read kafka topic and then write data through Hudi:
def write_hudi_table(df, epoch_id):
#we do some cleansing here
df.write.format('org.apache.hudi') \
.options(**tableHudiOptions) \
.mode('append') \
.save(f'{HUDI_TABLE_BASE_PATH}/{TABLE_NAME}/')
rawDf = spark.readStream \
.format("kafka") \
.options(**kafkaOptions) \
.option("subscribe", TOPIC_NAME) \
.load() \
.select(F.col("value").cast("string"))
query = finalDf.writeStream \
.queryName(f"Writing Table {TABLE_NAME}") \
.foreachBatch(write_hudi_table) \
.option("checkpointLocation", f"{CHECKPOINT_BASE_PATH}/{TABLE_NAME}/checkpoint/") \
.start()
query.awaitTermination()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org