You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by KhajaAsmath Mohammed <md...@gmail.com> on 2017/11/09 22:28:37 UTC
Spark Streaming in Spark 2.1 with Kafka 0.9
Hi,
I am not successful when using using spark 2.1 with Kafka 0.9, can anyone
please share the code snippet to use it.
val sparkSession: SparkSession = runMode match {
case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
case "yarn" =>
SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
}
val streamingContext = new StreamingContext(sparkSession.sparkContext,
Seconds(20))
println("streamingContext --------->"+streamingContext)
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
println("topics
--------->"+config.getString(Constants.Properties.KafkaBrokerList))
val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams,
topicsSet)
with above code , job gets aborted.
------------------------------------
I used code snippet of 0.10 too but no luck.
val streamingContext = new StreamingContext(sparkConfig, Seconds(20))
println("streamingContext --------->"+streamingContext)
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
println("topics
--------->"+config.getString(Constants.Properties.KafkaBrokerList))
//val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams,
topicsSet)
val messages = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
any suggestions on how to use Spark2.1 with Kafka streaming ?
Thanks,
Asmath