You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Colin Williams <co...@gmail.com> on 2018/11/13 20:32:21 UTC

inferred schemas for spark streaming from a Kafka source

Does anybody know how to use inferred schemas with structured
streaming: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#schema-inference-and-partition-of-streaming-dataframesdatasets

I have some code like :

object StreamingApp {

  def launch(config: Config, spark: SparkSession): Unit = {
    import spark.implicits._


    val schemaJson = spark.sparkContext.parallelize(List(config.schema))
    val schemaDF = spark.read.json(schemaJson)
    schemaDF.printSchema()

    // read text from kafka
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",config.broker)
      .option("subscribe",config.topic)
      .option("startingOffsets", "earliest")
      .load()

    spark.sql("set spark.sql.streaming.schemaInference=true")

    val jsonOptions = Map[String,String]("mode" -> "FAILFAST")

    val org_store_event_df = df.select(
      col("key").cast("string"),
      from_json(col("value").cast("string"), schemaDF.schema,
jsonOptions)).writeStream
      .format("console")
      .start()
      .awaitTermination()
  }
}

I'd like to compare an inferred schema against my provided, to
determine what I'm missing from my provided scheme or why I arrive
with all nulls in my values column.

currently I'm using a schema to read from a json file. But I'd like to
infer the schema from the stream as suggested by the docs. Then not
sure how to replace from_json so that the value column is read using
an inferred schema, or otherwise.

Maybe it's not supported for kafka streams and only for file streams?
If this is the case then why the have different implementations?

Also shouldn't we make the documentation more clear?

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org