You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/26 06:26:18 UTC

[spark] branch master updated: [SPARK-27494][SS] Null values don't work in Kafka source v2

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d2656aa  [SPARK-27494][SS] Null values don't work in Kafka source v2
d2656aa is described below

commit d2656aaecd4a7b5562d8d2065aaa66fdc72d253d
Author: uncleGen <hu...@gmail.com>
AuthorDate: Fri Apr 26 14:25:31 2019 +0800

    [SPARK-27494][SS] Null values don't work in Kafka source v2
    
    ## What changes were proposed in this pull request?
    
    Right now Kafka source v2 doesn't support null values. The issue is in org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which doesn't handle null values.
    
    ## How was this patch tested?
    
    add new unit tests
    
    Closes #24441 from uncleGen/SPARK-27494.
    
    Authored-by: uncleGen <hu...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../kafka010/KafkaRecordToUnsafeRowConverter.scala |  7 ++-
 .../sql/kafka010/KafkaContinuousSourceSuite.scala  |  4 ++
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 58 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
index f35a143..306ef10 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
 
   def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
     rowWriter.reset()
+    rowWriter.zeroOutNullBytes()
 
     if (record.key == null) {
       rowWriter.setNullAt(0)
     } else {
       rowWriter.write(0, record.key)
     }
-    rowWriter.write(1, record.value)
+    if (record.value == null) {
+      rowWriter.setNullAt(1)
+    } else {
+      rowWriter.write(1, record.value)
+    }
     rowWriter.write(2, UTF8String.fromString(record.topic))
     rowWriter.write(3, record.partition)
     rowWriter.write(4, record.offset)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index be0cea2..9b3e78c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo
       }
     }
   }
+
+  test("SPARK-27494: read kafka record containing null key/values.") {
+    testNullableKeyValue(ContinuousTrigger(100))
+  }
 }
 
 class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 21634ae..b98f8e9 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
       q.stop()
     }
   }
+
+  test("SPARK-27494: read kafka record containing null key/values.") {
+    testNullableKeyValue(Trigger.ProcessingTime(100))
+  }
 }
 
 
@@ -1511,6 +1515,60 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
       CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
     )
   }
+
+  protected def testNullableKeyValue(trigger: Trigger): Unit = {
+    val table = "kafka_null_key_value_source_test"
+    withTable(table) {
+      val topic = newTopic()
+      testUtils.createTopic(topic)
+      testUtils.withTranscationalProducer { producer =>
+        val df = spark
+          .readStream
+          .format("kafka")
+          .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+          .option("kafka.isolation.level", "read_committed")
+          .option("startingOffsets", "earliest")
+          .option("subscribe", topic)
+          .load()
+          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+          .as[(String, String)]
+
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName(table)
+          .trigger(trigger)
+          .start()
+        try {
+          var idx = 0
+          producer.beginTransaction()
+          val expected1 = Seq.tabulate(5) { _ =>
+            producer.send(new ProducerRecord[String, String](topic, null, null)).get()
+            (null, null)
+          }.asInstanceOf[Seq[(String, String)]]
+
+          val expected2 = Seq.tabulate(5) { _ =>
+            idx += 1
+            producer.send(new ProducerRecord[String, String](topic, idx.toString, null)).get()
+            (idx.toString, null)
+          }.asInstanceOf[Seq[(String, String)]]
+
+          val expected3 = Seq.tabulate(5) { _ =>
+            idx += 1
+            producer.send(new ProducerRecord[String, String](topic, null, idx.toString)).get()
+            (null, idx.toString)
+          }.asInstanceOf[Seq[(String, String)]]
+
+          producer.commitTransaction()
+          eventually(timeout(streamingTimeout)) {
+            checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF())
+          }
+        } finally {
+          q.stop()
+        }
+      }
+    }
+  }
 }
 
 object KafkaSourceSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org