You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Meiling He <he...@gmail.com> on 2021/06/04 15:05:27 UTC

Databricks to Kafka connection error (overloaded method value createDirectStream with alternatives))

Hi,

I tried to connect to Kafka from Databricks with this code, but I
encountered the error:

Does anyone have an idea how to fix this?

Thanks,
Meiling
*code:*
%scala
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}

// val conf = new SparkConf().setAppName(“KafkaTest”).setMaster(“local”)
val conf = new SparkConf().setMaster("local[*]").setAppName("Ingesting Data
from Kafka")
conf.set("spark.streaming.ui.retainedBatches", "5")
conf.set("spark.streaming.backpressure.enabled", "true")       // Enable
Back Pressure
val ssc = new StreamingContext(conf, batchDuration = Seconds(5))

// val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, kafkaTopics)

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "host1:port1,host2:port2,host3:port3",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  StreamingContext,
  PreferConsistent, Subscribe[String,
String](topics.toString.split(",").toSet, kafkaParams)
stream.map(record => (record.key, record.value))
display(stream)


*error:* overloaded method value createDirectStream with alternatives:
  (jssc:
org.apache.spark.streaming.api.java.JavaStreamingContext,locationStrategy:
org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy:
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String],perPartitionConfig:
org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
<and>
  (jssc:
org.apache.spark.streaming.api.java.JavaStreamingContext,locationStrategy:
org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy:
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
<and>
  (ssc: org.apache.spark.streaming.StreamingContext,locationStrategy:
org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy:
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String],perPartitionConfig:
org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
<and>
  (ssc: org.apache.spark.streaming.StreamingContext,locationStrategy:
org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy:
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
 cannot be applied to (org.apache.spark.streaming.StreamingContext.type,
org.apache.spark.streaming.kafka010.LocationStrategy,
org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])
val stream = KafkaUtils.createDirectStream[String, String](