You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karthikjay <as...@gmail.com> on 2018/05/17 20:30:16 UTC
[structured-streaming] foreachPartition alternative in structured
streaming.
I am reading data from Kafka using structured streaming and I need to save
the data to InfluxDB. In the regular Dstreams based approach I did this as
follows:
val messages:DStream[(String, String)] = kafkaStream.map(record =>
(record.topic, record.value))
messages.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val influxService = new InfluxService()
val connection = influxService.createInfluxDBConnectionWithParams(
host,
port,
username,
password,
database
)
partitionOfRecords.foreach(record => {
ABCService.handleData(connection, record._1, record._2)
}
)
}
}
ssc.start()
logger.info("Started Spark-Kafka streaming session")
ssc.awaitTermination()
Note: I create connection object inside foreachpartition. How do I do this
in Structured Streaming ? I tried connection pooling approach (where I
create a pool of connections on the master node and pass it to worker nodes
) here
<https://stackoverflow.com/questions/50205650/spark-connection-pooling-is-this-the-right-approach>
and the workers could not get the connection pool object. Anything obvious
that I am missing here ?
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org