You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by chris snow <ch...@gmail.com> on 2018/01/18 11:13:31 UTC
Structured Streaming with Kafka seems to be losing config options
I'm trying to connect spark 2.2 structured streaming to a kafka broker that
has sasl enabled. My spark script:
/////
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark =
SparkSession.builder.appName("StreamingRetailTransactions").config("master",
"local").getOrCreate()
import spark.implicits._
val df = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9093").
option("subscribe", "transactions_load").
option("security.protocol", "SASL_PLAINTEXT").
option("sasl.mechanism", "PLAIN").
option("auto.offset.reset","earliest").
option("group.id", System.currentTimeMillis).
load()
val query = df.writeStream.format("console").start()
/////
If I step through the spark code with a debugger, I can see the options are
set against the DataStreamReader:
[image: Inline images 1]
However, at the point where the KafkaConsumer is created in the
SubscribeStrategy, the parameters are lost:
[image: Inline images 2]
Because these parameters are lost, spark is unable to connect to kafka (I
also see a bunch of sasl issues in the kafka log files).
If I manually add the kafka parameters as per the code below, I am able to
connect over sasl:
/**
* Subscribe to a fixed collection of topics.
*/
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
override def createConsumer(
kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte],
Array[Byte]] = {
// Manually set lost sasl parameters
kafkaParams.put("security.protocol", "SASL_PLAINTEXT")
kafkaParams.put("sasl.mechanism", "PLAIN")
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
consumer.subscribe(topics.asJava)
consumer
}
override def toString: String = s"Subscribe[${topics.mkString(", ")}]"
}
Does this look like it could be a bug with spark structured streaming?
Re: Structured Streaming with Kafka seems to be losing config options
Posted by chris snow <ch...@gmail.com>.
Ah, it looks like I have to prefix the kafka options with "kafka.":
val specifiedKafkaParams =
parameters
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
On 18 January 2018 at 11:13, chris snow <ch...@gmail.com> wrote:
> I'm trying to connect spark 2.2 structured streaming to a kafka broker
> that has sasl enabled. My spark script:
>
>
> /////
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.SparkSession
>
> val spark = SparkSession.builder.appName("StreamingRetailTransactions").config("master",
> "local").getOrCreate()
>
> import spark.implicits._
>
> val df = spark.readStream.
> format("kafka").
> option("kafka.bootstrap.servers", "localhost:9093").
> option("subscribe", "transactions_load").
> option("security.protocol", "SASL_PLAINTEXT").
> option("sasl.mechanism", "PLAIN").
> option("auto.offset.reset","earliest").
> option("group.id", System.currentTimeMillis).
> load()
>
> val query = df.writeStream.format("console").start()
> /////
>
> If I step through the spark code with a debugger, I can see the options
> are set against the DataStreamReader:
>
> [image: Inline images 1]
>
> However, at the point where the KafkaConsumer is created in the
> SubscribeStrategy, the parameters are lost:
>
> [image: Inline images 2]
>
> Because these parameters are lost, spark is unable to connect to kafka (I
> also see a bunch of sasl issues in the kafka log files).
>
> If I manually add the kafka parameters as per the code below, I am able to
> connect over sasl:
>
> /**
> * Subscribe to a fixed collection of topics.
> */
> case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
>
> override def createConsumer(
> kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
>
> // Manually set lost sasl parameters
>
> kafkaParams.put("security.protocol", "SASL_PLAINTEXT")
> kafkaParams.put("sasl.mechanism", "PLAIN")
>
> val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
> consumer.subscribe(topics.asJava)
> consumer
> }
>
> override def toString: String = s"Subscribe[${topics.mkString(", ")}]"
> }
>
>
> Does this look like it could be a bug with spark structured streaming?
>
>