You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by KhajaAsmath Mohammed <md...@gmail.com> on 2017/11/09 22:28:37 UTC

Spark Streaming in Spark 2.1 with Kafka 0.9

Hi,

I am not successful when using using spark 2.1 with Kafka 0.9, can anyone
please share the code snippet to use it.

val sparkSession: SparkSession = runMode match {
  case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
  case "yarn"  =>
SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
}
val streamingContext = new StreamingContext(sparkSession.sparkContext,
Seconds(20))
println("streamingContext --------->"+streamingContext)
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
println("topics
--------->"+config.getString(Constants.Properties.KafkaBrokerList))

val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams,
topicsSet)


with above code , job gets aborted.


------------------------------------


I used code snippet of 0.10 too but no luck.


val streamingContext = new StreamingContext(sparkConfig, Seconds(20))
println("streamingContext --------->"+streamingContext)
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
println("topics
--------->"+config.getString(Constants.Properties.KafkaBrokerList))

//val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams,
topicsSet)

val messages = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))


any suggestions on how to use Spark2.1 with Kafka streaming ?

Thanks,

Asmath