You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by tianlangstudio <ti...@aliyun.com.INVALID> on 2021/11/23 00:54:48 UTC

回复:Spark - ElasticSearch Integration

Hello
1. Maybe you can define and save schema in somewhere before use it in SparkStreaming.Then read it from somewhere when start streaming.
2. What about process events in method foreachBatch


 
Fusion Zhu


------------------------------------------------------------------
发件人:Siva Samraj <sa...@gmail.com>
发送时间:2021年11月22日(星期一) 22:08
收件人:user <us...@spark.apache.org>
主 题:Spark - ElasticSearch Integration

Hi All,

I want to write a Spark Streaming Job from Kafka to Elasticsearch. Here I want to detect the schema dynamically while reading it from Kafka.
Can you help me to do that.?
I know, this can be done in Spark Batch Processing via the below line.
val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema
But while executing the same via Spark Streaming Job, we cannot do the above since streaming can have only on Action.
Please let me know.

Thanks
Siva