You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/18 08:18:47 UTC

[GitHub] [hudi] shahiidiqbal commented on issue #4839: Hudi upsert doesnt trigger compaction for MOR

shahiidiqbal commented on issue #4839:
URL: https://github.com/apache/hudi/issues/4839#issuecomment-1044122188


   @nsivabalan 
   We use Kafka-connect to get changes (CDC) from MongoDB and after that we use Spark streaming to read topics from Kafka and store data into our data lake through Hudi. 
   
   Here is the spark streaming code to read kafka topic and then write data through Hudi:
   
   def write_hudi_table(df, epoch_id):
           #we do some cleansing here
           df.write.format('org.apache.hudi') \
                   .options(**tableHudiOptions) \
                   .mode('append') \
                   .save(f'{HUDI_TABLE_BASE_PATH}/{TABLE_NAME}/')
   
   rawDf = spark.readStream \
                   .format("kafka") \
                   .options(**kafkaOptions) \
                   .option("subscribe", TOPIC_NAME) \
                   .load() \
                   .select(F.col("value").cast("string"))
   
   query = finalDf.writeStream \
                   .queryName(f"Writing Table {TABLE_NAME}") \
                   .foreachBatch(write_hudi_table) \
                   .option("checkpointLocation", f"{CHECKPOINT_BASE_PATH}/{TABLE_NAME}/checkpoint/") \
                   .start()
   
   query.awaitTermination()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org