You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/03/07 00:38:31 UTC

spark git commit: [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires

Repository: spark
Updated Branches:
  refs/heads/master f6471dc0d -> b0a5cd890


[SPARK-19719][SS] Kafka writer for both structured streaming and batch queires

## What changes were proposed in this pull request?

Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka.
### Streaming Kafka Sink
- When addBatch is called
-- If batchId is great than the last written batch
--- Write batch to Kafka
---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record.
-- Else ignore

### Batch Kafka Sink
- KafkaSourceProvider will implement CreatableRelationProvider
- CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka
- Topic will be taken from the record, if present, or from topic option, which overrides topic in record.
- Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException

tdas zsxwing

## How was this patch tested?

### The following unit tests will be included
- write to stream with topic field: valid stream write with data that includes an existing topic in the schema
- write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic
- write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field
- write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers.
- write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics.
- write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here.

### Examples
```scala
// Structured Streaming
val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value")
 .selectExpr("value as key", "value as value")
 .writeStream
 .format("kafka")
 .option("checkpointLocation", checkpointDir)
 .outputMode(OutputMode.Append)
 .option("kafka.bootstrap.servers", brokerAddress)
 .option("topic", topic)
 .queryName("kafkaStream")
 .start()

// Batch
val df = spark
 .sparkContext
 .parallelize(Seq("1", "2", "3", "4", "5"))
 .map(v => (topic, v))
 .toDF("topic", "value")

df.write
 .format("kafka")
 .option("kafka.bootstrap.servers",brokerAddress)
 .option("topic", topic)
 .save()
```
Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tc...@gmail.com>

Closes #17043 from tcondie/kafka-writer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0a5cd89
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0a5cd89
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0a5cd89

Branch: refs/heads/master
Commit: b0a5cd89097c563e9949d8cfcf84d18b03b8d24c
Parents: f6471dc
Author: Tyson Condie <tc...@gmail.com>
Authored: Mon Mar 6 16:39:05 2017 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Mar 6 16:39:05 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaSink.scala   |  43 ++
 .../sql/kafka010/KafkaSourceProvider.scala      |  83 +++-
 .../spark/sql/kafka010/KafkaWriteTask.scala     | 123 ++++++
 .../apache/spark/sql/kafka010/KafkaWriter.scala |  97 +++++
 .../spark/sql/kafka010/KafkaSinkSuite.scala     | 412 +++++++++++++++++++
 5 files changed, 753 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b0a5cd89/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
new file mode 100644
index 0000000..08914d8
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.Sink
+
+private[kafka010] class KafkaSink(
+    sqlContext: SQLContext,
+    executorKafkaParams: ju.Map[String, Object],
+    topic: Option[String]) extends Sink with Logging {
+  @volatile private var latestBatchId = -1L
+
+  override def toString(): String = "KafkaSink"
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    if (batchId <= latestBatchId) {
+      logInfo(s"Skipping already committed batch $batchId")
+    } else {
+      KafkaWriter.write(sqlContext.sparkSession,
+        data.queryExecution, executorKafkaParams, topic)
+      latestBatchId = batchId
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b0a5cd89/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 6a74567..febe3c2 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -23,12 +23,14 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.streaming.Source
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -36,8 +38,12 @@ import org.apache.spark.sql.types.StructType
  * IllegalArgumentException when the Kafka Dataset is created, so that it can catch
  * missing options even before the query is started.
  */
-private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSourceProvider
-  with RelationProvider with Logging {
+private[kafka010] class KafkaSourceProvider extends DataSourceRegister
+    with StreamSourceProvider
+    with StreamSinkProvider
+    with RelationProvider
+    with CreatableRelationProvider
+    with Logging {
   import KafkaSourceProvider._
 
   override def shortName(): String = "kafka"
@@ -152,6 +158,72 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with Stre
       endingRelationOffsets)
   }
 
+  override def createSink(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink = {
+    val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
+    val specifiedKafkaParams = kafkaParamsForProducer(parameters)
+    new KafkaSink(sqlContext,
+      new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic)
+  }
+
+  override def createRelation(
+      outerSQLContext: SQLContext,
+      mode: SaveMode,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    mode match {
+      case SaveMode.Overwrite | SaveMode.Ignore =>
+        throw new AnalysisException(s"Save mode $mode not allowed for Kafka. " +
+          s"Allowed save modes are ${SaveMode.Append} and " +
+          s"${SaveMode.ErrorIfExists} (default).")
+      case _ => // good
+    }
+    val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
+    val specifiedKafkaParams = kafkaParamsForProducer(parameters)
+    KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
+      new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic)
+
+    /* This method is suppose to return a relation that reads the data that was written.
+     * We cannot support this for Kafka. Therefore, in order to make things consistent,
+     * we return an empty base relation.
+     */
+    new BaseRelation {
+      override def sqlContext: SQLContext = unsupportedException
+      override def schema: StructType = unsupportedException
+      override def needConversion: Boolean = unsupportedException
+      override def sizeInBytes: Long = unsupportedException
+      override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
+      private def unsupportedException =
+        throw new UnsupportedOperationException("BaseRelation from Kafka write " +
+          "operation is not usable.")
+    }
+  }
+
+  private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = {
+    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
+    if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) {
+      throw new IllegalArgumentException(
+        s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys "
+          + "are serialized with ByteArraySerializer.")
+    }
+
+    if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
+    {
+      throw new IllegalArgumentException(
+        s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as "
+          + "value are serialized with ByteArraySerializer.")
+    }
+    parameters
+      .keySet
+      .filter(_.toLowerCase.startsWith("kafka."))
+      .map { k => k.drop(6).toString -> parameters(k) }
+      .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName,
+        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName)
+  }
+
   private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) =
     ConfigUpdater("source", specifiedKafkaParams)
       .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
@@ -381,6 +453,7 @@ private[kafka010] object KafkaSourceProvider {
   private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
   private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+  val TOPIC_OPTION_KEY = "topic"
 
   private val deserClassName = classOf[ByteArrayDeserializer].getName
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0a5cd89/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
new file mode 100644
index 0000000..6e160cb
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.producer.{KafkaProducer, _}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
+import org.apache.spark.sql.types.{BinaryType, StringType}
+
+/**
+ * A simple trait for writing out data in a single Spark task, without any concerns about how
+ * to commit or abort tasks. Exceptions thrown by the implementation of this class will
+ * automatically trigger task aborts.
+ */
+private[kafka010] class KafkaWriteTask(
+    producerConfiguration: ju.Map[String, Object],
+    inputSchema: Seq[Attribute],
+    topic: Option[String]) {
+  // used to synchronize with Kafka callbacks
+  @volatile private var failedWrite: Exception = null
+  private val projection = createProjection
+  private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
+
+  /**
+   * Writes key value data out to topics.
+   */
+  def execute(iterator: Iterator[InternalRow]): Unit = {
+    producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
+    while (iterator.hasNext && failedWrite == null) {
+      val currentRow = iterator.next()
+      val projectedRow = projection(currentRow)
+      val topic = projectedRow.getUTF8String(0)
+      val key = projectedRow.getBinary(1)
+      val value = projectedRow.getBinary(2)
+      if (topic == null) {
+        throw new NullPointerException(s"null topic present in the data. Use the " +
+        s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
+      }
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
+      val callback = new Callback() {
+        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
+          if (failedWrite == null && e != null) {
+            failedWrite = e
+          }
+        }
+      }
+      producer.send(record, callback)
+    }
+  }
+
+  def close(): Unit = {
+    if (producer != null) {
+      checkForErrors
+      producer.close()
+      checkForErrors
+      producer = null
+    }
+  }
+
+  private def createProjection: UnsafeProjection = {
+    val topicExpression = topic.map(Literal(_)).orElse {
+      inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME)
+    }.getOrElse {
+      throw new IllegalStateException(s"topic option required when no " +
+        s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
+    }
+    topicExpression.dataType match {
+      case StringType => // good
+      case t =>
+        throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
+          s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
+          s"must be a ${StringType}")
+    }
+    val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME)
+      .getOrElse(Literal(null, BinaryType))
+    keyExpression.dataType match {
+      case StringType | BinaryType => // good
+      case t =>
+        throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " +
+          s"attribute unsupported type $t")
+    }
+    val valueExpression = inputSchema
+      .find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse(
+      throw new IllegalStateException(s"Required attribute " +
+        s"'${KafkaWriter.VALUE_ATTRIBUTE_NAME}' not found")
+    )
+    valueExpression.dataType match {
+      case StringType | BinaryType => // good
+      case t =>
+        throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
+          s"attribute unsupported type $t")
+    }
+    UnsafeProjection.create(
+      Seq(topicExpression, Cast(keyExpression, BinaryType),
+        Cast(valueExpression, BinaryType)), inputSchema)
+  }
+
+  private def checkForErrors: Unit = {
+    if (failedWrite != null) {
+      throw failedWrite
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/b0a5cd89/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
new file mode 100644
index 0000000..a637d52
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.types.{BinaryType, StringType}
+import org.apache.spark.util.Utils
+
+/**
+ * The [[KafkaWriter]] class is used to write data from a batch query
+ * or structured streaming query, given by a [[QueryExecution]], to Kafka.
+ * The data is assumed to have a value column, and an optional topic and key
+ * columns. If the topic column is missing, then the topic must come from
+ * the 'topic' configuration option. If the key column is missing, then a
+ * null valued key field will be added to the
+ * [[org.apache.kafka.clients.producer.ProducerRecord]].
+ */
+private[kafka010] object KafkaWriter extends Logging {
+  val TOPIC_ATTRIBUTE_NAME: String = "topic"
+  val KEY_ATTRIBUTE_NAME: String = "key"
+  val VALUE_ATTRIBUTE_NAME: String = "value"
+
+  override def toString: String = "KafkaWriter"
+
+  def validateQuery(
+      queryExecution: QueryExecution,
+      kafkaParameters: ju.Map[String, Object],
+      topic: Option[String] = None): Unit = {
+    val schema = queryExecution.logical.output
+    schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
+      if (topic == None) {
+        throw new AnalysisException(s"topic option required when no " +
+          s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
+          s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.")
+      } else {
+        Literal(topic.get, StringType)
+      }
+    ).dataType match {
+      case StringType => // good
+      case _ =>
+        throw new AnalysisException(s"Topic type must be a String")
+    }
+    schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
+      Literal(null, StringType)
+    ).dataType match {
+      case StringType | BinaryType => // good
+      case _ =>
+        throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
+          s"must be a String or BinaryType")
+    }
+    schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
+      throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
+    ).dataType match {
+      case StringType | BinaryType => // good
+      case _ =>
+        throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
+          s"must be a String or BinaryType")
+    }
+  }
+
+  def write(
+      sparkSession: SparkSession,
+      queryExecution: QueryExecution,
+      kafkaParameters: ju.Map[String, Object],
+      topic: Option[String] = None): Unit = {
+    val schema = queryExecution.logical.output
+    validateQuery(queryExecution, kafkaParameters, topic)
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
+      queryExecution.toRdd.foreachPartition { iter =>
+        val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
+        Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
+          finallyBlock = writeTask.close())
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b0a5cd89/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
new file mode 100644
index 0000000..4905356
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{BinaryType, DataType}
+
+class KafkaSinkSuite extends StreamTest with SharedSQLContext {
+  import testImplicits._
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override val streamingTimeout = 30.seconds
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils(
+      withBrokerProps = Map("auto.create.topics.enable" -> "false"))
+    testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    if (testUtils != null) {
+      testUtils.teardown()
+      testUtils = null
+      super.afterAll()
+    }
+  }
+
+  test("batch - write to kafka") {
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+    val df = Seq("1", "2", "3", "4", "5").map(v => (topic, v)).toDF("topic", "value")
+    df.write
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("topic", topic)
+      .save()
+    checkAnswer(
+      createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
+      Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)
+  }
+
+  test("batch - null topic field value, and no topic option") {
+    val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
+    val ex = intercept[SparkException] {
+      df.write
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .save()
+    }
+    assert(ex.getMessage.toLowerCase.contains(
+      "null topic present in the data"))
+  }
+
+  test("batch - unsupported save modes") {
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+    val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
+
+    // Test bad save mode Ignore
+    var ex = intercept[AnalysisException] {
+      df.write
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .mode(SaveMode.Ignore)
+        .save()
+    }
+    assert(ex.getMessage.toLowerCase.contains(
+      s"save mode ignore not allowed for kafka"))
+
+    // Test bad save mode Overwrite
+    ex = intercept[AnalysisException] {
+      df.write
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .mode(SaveMode.Overwrite)
+        .save()
+    }
+    assert(ex.getMessage.toLowerCase.contains(
+      s"save mode overwrite not allowed for kafka"))
+  }
+
+  test("streaming - write to kafka with topic field") {
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF(),
+      withTopic = None,
+      withOutputMode = Some(OutputMode.Append))(
+      withSelectExpr = s"'$topic' as topic", "value")
+
+    val reader = createKafkaReader(topic)
+      .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+      .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+      .as[(Int, Int)]
+      .map(_._2)
+
+    try {
+      input.addData("1", "2", "3", "4", "5")
+      failAfter(streamingTimeout) {
+        writer.processAllAvailable()
+      }
+      checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
+      input.addData("6", "7", "8", "9", "10")
+      failAfter(streamingTimeout) {
+        writer.processAllAvailable()
+      }
+      checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+    } finally {
+      writer.stop()
+    }
+  }
+
+  test("streaming - write aggregation w/o topic field, with topic option") {
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF().groupBy("value").count(),
+      withTopic = Some(topic),
+      withOutputMode = Some(OutputMode.Update()))(
+      withSelectExpr = "CAST(value as STRING) key", "CAST(count as STRING) value")
+
+    val reader = createKafkaReader(topic)
+      .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+      .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+      .as[(Int, Int)]
+
+    try {
+      input.addData("1", "2", "2", "3", "3", "3")
+      failAfter(streamingTimeout) {
+        writer.processAllAvailable()
+      }
+      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
+      input.addData("1", "2", "3")
+      failAfter(streamingTimeout) {
+        writer.processAllAvailable()
+      }
+      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
+    } finally {
+      writer.stop()
+    }
+  }
+
+  test("streaming - aggregation with topic field and topic option") {
+    /* The purpose of this test is to ensure that the topic option
+     * overrides the topic field. We begin by writing some data that
+     * includes a topic field and value (e.g., 'foo') along with a topic
+     * option. Then when we read from the topic specified in the option
+     * we should see the data i.e., the data was written to the topic
+     * option, and not to the topic in the data e.g., foo
+     */
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF().groupBy("value").count(),
+      withTopic = Some(topic),
+      withOutputMode = Some(OutputMode.Update()))(
+      withSelectExpr = "'foo' as topic",
+        "CAST(value as STRING) key", "CAST(count as STRING) value")
+
+    val reader = createKafkaReader(topic)
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
+      .as[(Int, Int)]
+
+    try {
+      input.addData("1", "2", "2", "3", "3", "3")
+      failAfter(streamingTimeout) {
+        writer.processAllAvailable()
+      }
+      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
+      input.addData("1", "2", "3")
+      failAfter(streamingTimeout) {
+        writer.processAllAvailable()
+      }
+      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
+    } finally {
+      writer.stop()
+    }
+  }
+
+
+  test("streaming - write data with bad schema") {
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    /* No topic field or topic option */
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    try {
+      ex = intercept[StreamingQueryException] {
+        writer = createKafkaWriter(input.toDF())(
+          withSelectExpr = "value as key", "value"
+        )
+        input.addData("1", "2", "3", "4", "5")
+        writer.processAllAvailable()
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage
+      .toLowerCase
+      .contains("topic option required when no 'topic' attribute is present"))
+
+    try {
+      /* No value field */
+      ex = intercept[StreamingQueryException] {
+        writer = createKafkaWriter(input.toDF())(
+          withSelectExpr = s"'$topic' as topic", "value as key"
+        )
+        input.addData("1", "2", "3", "4", "5")
+        writer.processAllAvailable()
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found"))
+  }
+
+  test("streaming - write data with valid schema but wrong types") {
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    try {
+      /* topic field wrong type */
+      ex = intercept[StreamingQueryException] {
+        writer = createKafkaWriter(input.toDF())(
+          withSelectExpr = s"CAST('1' as INT) as topic", "value"
+        )
+        input.addData("1", "2", "3", "4", "5")
+        writer.processAllAvailable()
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase.contains("topic type must be a string"))
+
+    try {
+      /* value field wrong type */
+      ex = intercept[StreamingQueryException] {
+        writer = createKafkaWriter(input.toDF())(
+          withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
+        )
+        input.addData("1", "2", "3", "4", "5")
+        writer.processAllAvailable()
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase.contains(
+      "value attribute type must be a string or binarytype"))
+
+    try {
+      ex = intercept[StreamingQueryException] {
+        /* key field wrong type */
+        writer = createKafkaWriter(input.toDF())(
+          withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
+        )
+        input.addData("1", "2", "3", "4", "5")
+        writer.processAllAvailable()
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase.contains(
+      "key attribute type must be a string or binarytype"))
+  }
+
+  test("streaming - write to non-existing topic") {
+    val input = MemoryStream[String]
+    val topic = newTopic()
+
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    try {
+      ex = intercept[StreamingQueryException] {
+        writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))()
+        input.addData("1", "2", "3", "4", "5")
+        writer.processAllAvailable()
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase.contains("job aborted"))
+  }
+
+  test("streaming - exception on config serializer") {
+    val input = MemoryStream[String]
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    ex = intercept[IllegalArgumentException] {
+      writer = createKafkaWriter(
+        input.toDF(),
+        withOptions = Map("kafka.key.serializer" -> "foo"))()
+    }
+    assert(ex.getMessage.toLowerCase.contains(
+      "kafka option 'key.serializer' is not supported"))
+
+    ex = intercept[IllegalArgumentException] {
+      writer = createKafkaWriter(
+        input.toDF(),
+        withOptions = Map("kafka.value.serializer" -> "foo"))()
+    }
+    assert(ex.getMessage.toLowerCase.contains(
+      "kafka option 'value.serializer' is not supported"))
+  }
+
+  test("generic - write big data with small producer buffer") {
+    /* This test ensures that we understand the semantics of Kafka when
+    * is comes to blocking on a call to send when the send buffer is full.
+    * This test will configure the smallest possible producer buffer and
+    * indicate that we should block when it is full. Thus, no exception should
+    * be thrown in the case of a full buffer.
+    */
+    val topic = newTopic()
+    testUtils.createTopic(topic, 1)
+    val options = new java.util.HashMap[String, Object]
+    options.put("bootstrap.servers", testUtils.brokerAddress)
+    options.put("buffer.memory", "16384") // min buffer size
+    options.put("block.on.buffer.full", "true")
+    options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+    options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+    val inputSchema = Seq(AttributeReference("value", BinaryType)())
+    val data = new Array[Byte](15000) // large value
+    val writeTask = new KafkaWriteTask(options, inputSchema, Some(topic))
+    try {
+      val fieldTypes: Array[DataType] = Array(BinaryType)
+      val converter = UnsafeProjection.create(fieldTypes)
+      val row = new SpecificInternalRow(fieldTypes)
+      row.update(0, data)
+      val iter = Seq.fill(1000)(converter.apply(row)).iterator
+      writeTask.execute(iter)
+    } finally {
+      writeTask.close()
+    }
+  }
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def createKafkaReader(topic: String): DataFrame = {
+    spark.read
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("startingOffsets", "earliest")
+      .option("endingOffsets", "latest")
+      .option("subscribe", topic)
+      .load()
+  }
+
+  private def createKafkaWriter(
+      input: DataFrame,
+      withTopic: Option[String] = None,
+      withOutputMode: Option[OutputMode] = None,
+      withOptions: Map[String, String] = Map[String, String]())
+      (withSelectExpr: String*): StreamingQuery = {
+    var stream: DataStreamWriter[Row] = null
+    withTempDir { checkpointDir =>
+      var df = input.toDF()
+      if (withSelectExpr.length > 0) {
+        df = df.selectExpr(withSelectExpr: _*)
+      }
+      stream = df.writeStream
+        .format("kafka")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .queryName("kafkaStream")
+      withTopic.foreach(stream.option("topic", _))
+      withOutputMode.foreach(stream.outputMode(_))
+      withOptions.foreach(opt => stream.option(opt._1, opt._2))
+    }
+    stream.start()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org