You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by chris snow <ch...@gmail.com> on 2018/01/18 11:13:31 UTC

Structured Streaming with Kafka seems to be losing config options

I'm trying to connect spark 2.2 structured streaming to a kafka broker that
has sasl enabled.  My spark script:


/////
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark =
SparkSession.builder.appName("StreamingRetailTransactions").config("master",
"local").getOrCreate()

import spark.implicits._

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "localhost:9093").
                option("subscribe", "transactions_load").
                option("security.protocol", "SASL_PLAINTEXT").
                option("sasl.mechanism", "PLAIN").
                option("auto.offset.reset","earliest").
                option("group.id", System.currentTimeMillis).
                load()

val query = df.writeStream.format("console").start()
/////

If I step through the spark code with a debugger, I can see the options are
set against the DataStreamReader:

[image: Inline images 1]

However, at the point where the KafkaConsumer is created in the
SubscribeStrategy, the parameters are lost:

[image: Inline images 2]

Because these parameters are lost, spark is unable to connect to kafka (I
also see a bunch of sasl issues in the kafka log files).

If I manually add the kafka parameters as per the code below, I am able to
connect over sasl:

/**
 * Subscribe to a fixed collection of topics.
 */
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {

  override def createConsumer(
      kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte],
Array[Byte]] = {

    // Manually set lost sasl parameters

    kafkaParams.put("security.protocol", "SASL_PLAINTEXT")
    kafkaParams.put("sasl.mechanism", "PLAIN")

    val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
    consumer.subscribe(topics.asJava)
    consumer
  }

  override def toString: String = s"Subscribe[${topics.mkString(", ")}]"
}


Does this look like it could be a bug with spark structured streaming?

Re: Structured Streaming with Kafka seems to be losing config options

Posted by chris snow <ch...@gmail.com>.
Ah, it looks like I have to prefix the kafka options with "kafka.":

val specifiedKafkaParams =
  parameters
    .keySet
    .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    .map { k => k.drop(6).toString -> parameters(k) }
    .toMap




On 18 January 2018 at 11:13, chris snow <ch...@gmail.com> wrote:

> I'm trying to connect spark 2.2 structured streaming to a kafka broker
> that has sasl enabled.  My spark script:
>
>
> /////
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.SparkSession
>
> val spark = SparkSession.builder.appName("StreamingRetailTransactions").config("master",
> "local").getOrCreate()
>
> import spark.implicits._
>
> val df = spark.readStream.
>                 format("kafka").
>                 option("kafka.bootstrap.servers", "localhost:9093").
>                 option("subscribe", "transactions_load").
>                 option("security.protocol", "SASL_PLAINTEXT").
>                 option("sasl.mechanism", "PLAIN").
>                 option("auto.offset.reset","earliest").
>                 option("group.id", System.currentTimeMillis).
>                 load()
>
> val query = df.writeStream.format("console").start()
> /////
>
> If I step through the spark code with a debugger, I can see the options
> are set against the DataStreamReader:
>
> [image: Inline images 1]
>
> However, at the point where the KafkaConsumer is created in the
> SubscribeStrategy, the parameters are lost:
>
> [image: Inline images 2]
>
> Because these parameters are lost, spark is unable to connect to kafka (I
> also see a bunch of sasl issues in the kafka log files).
>
> If I manually add the kafka parameters as per the code below, I am able to
> connect over sasl:
>
> /**
>  * Subscribe to a fixed collection of topics.
>  */
> case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
>
>   override def createConsumer(
>       kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
>
>     // Manually set lost sasl parameters
>
>     kafkaParams.put("security.protocol", "SASL_PLAINTEXT")
>     kafkaParams.put("sasl.mechanism", "PLAIN")
>
>     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
>     consumer.subscribe(topics.asJava)
>     consumer
>   }
>
>   override def toString: String = s"Subscribe[${topics.mkString(", ")}]"
> }
>
>
> Does this look like it could be a bug with spark structured streaming?
>
>