You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Meiling He <he...@gmail.com> on 2021/06/04 15:05:27 UTC
Databricks to Kafka connection error (overloaded method value
createDirectStream with alternatives))
Hi,
I tried to connect to Kafka from Databricks with this code, but I
encountered the error:
Does anyone have an idea how to fix this?
Thanks,
Meiling
*code:*
%scala
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
// val conf = new SparkConf().setAppName(“KafkaTest”).setMaster(“local”)
val conf = new SparkConf().setMaster("local[*]").setAppName("Ingesting Data
from Kafka")
conf.set("spark.streaming.ui.retainedBatches", "5")
conf.set("spark.streaming.backpressure.enabled", "true") // Enable
Back Pressure
val ssc = new StreamingContext(conf, batchDuration = Seconds(5))
// val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, kafkaTopics)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "host1:port1,host2:port2,host3:port3",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
StreamingContext,
PreferConsistent, Subscribe[String,
String](topics.toString.split(",").toSet, kafkaParams)
stream.map(record => (record.key, record.value))
display(stream)
*error:* overloaded method value createDirectStream with alternatives:
(jssc:
org.apache.spark.streaming.api.java.JavaStreamingContext,locationStrategy:
org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy:
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String],perPartitionConfig:
org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
<and>
(jssc:
org.apache.spark.streaming.api.java.JavaStreamingContext,locationStrategy:
org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy:
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
<and>
(ssc: org.apache.spark.streaming.StreamingContext,locationStrategy:
org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy:
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String],perPartitionConfig:
org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
<and>
(ssc: org.apache.spark.streaming.StreamingContext,locationStrategy:
org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy:
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
cannot be applied to (org.apache.spark.streaming.StreamingContext.type,
org.apache.spark.streaming.kafka010.LocationStrategy,
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])
val stream = KafkaUtils.createDirectStream[String, String](