You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/26 06:26:18 UTC
[spark] branch master updated: [SPARK-27494][SS] Null values don't
work in Kafka source v2
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d2656aa [SPARK-27494][SS] Null values don't work in Kafka source v2
d2656aa is described below
commit d2656aaecd4a7b5562d8d2065aaa66fdc72d253d
Author: uncleGen <hu...@gmail.com>
AuthorDate: Fri Apr 26 14:25:31 2019 +0800
[SPARK-27494][SS] Null values don't work in Kafka source v2
## What changes were proposed in this pull request?
Right now Kafka source v2 doesn't support null values. The issue is in org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which doesn't handle null values.
## How was this patch tested?
add new unit tests
Closes #24441 from uncleGen/SPARK-27494.
Authored-by: uncleGen <hu...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../kafka010/KafkaRecordToUnsafeRowConverter.scala | 7 ++-
.../sql/kafka010/KafkaContinuousSourceSuite.scala | 4 ++
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 58 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 1 deletion(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
index f35a143..306ef10 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
rowWriter.reset()
+ rowWriter.zeroOutNullBytes()
if (record.key == null) {
rowWriter.setNullAt(0)
} else {
rowWriter.write(0, record.key)
}
- rowWriter.write(1, record.value)
+ if (record.value == null) {
+ rowWriter.setNullAt(1)
+ } else {
+ rowWriter.write(1, record.value)
+ }
rowWriter.write(2, UTF8String.fromString(record.topic))
rowWriter.write(3, record.partition)
rowWriter.write(4, record.offset)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index be0cea2..9b3e78c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo
}
}
}
+
+ test("SPARK-27494: read kafka record containing null key/values.") {
+ testNullableKeyValue(ContinuousTrigger(100))
+ }
}
class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 21634ae..b98f8e9 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
q.stop()
}
}
+
+ test("SPARK-27494: read kafka record containing null key/values.") {
+ testNullableKeyValue(Trigger.ProcessingTime(100))
+ }
}
@@ -1511,6 +1515,60 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
)
}
+
+ protected def testNullableKeyValue(trigger: Trigger): Unit = {
+ val table = "kafka_null_key_value_source_test"
+ withTable(table) {
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+ testUtils.withTranscationalProducer { producer =>
+ val df = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.isolation.level", "read_committed")
+ .option("startingOffsets", "earliest")
+ .option("subscribe", topic)
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ val q = df
+ .writeStream
+ .format("memory")
+ .queryName(table)
+ .trigger(trigger)
+ .start()
+ try {
+ var idx = 0
+ producer.beginTransaction()
+ val expected1 = Seq.tabulate(5) { _ =>
+ producer.send(new ProducerRecord[String, String](topic, null, null)).get()
+ (null, null)
+ }.asInstanceOf[Seq[(String, String)]]
+
+ val expected2 = Seq.tabulate(5) { _ =>
+ idx += 1
+ producer.send(new ProducerRecord[String, String](topic, idx.toString, null)).get()
+ (idx.toString, null)
+ }.asInstanceOf[Seq[(String, String)]]
+
+ val expected3 = Seq.tabulate(5) { _ =>
+ idx += 1
+ producer.send(new ProducerRecord[String, String](topic, null, idx.toString)).get()
+ (null, idx.toString)
+ }.asInstanceOf[Seq[(String, String)]]
+
+ producer.commitTransaction()
+ eventually(timeout(streamingTimeout)) {
+ checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF())
+ }
+ } finally {
+ q.stop()
+ }
+ }
+ }
+ }
}
object KafkaSourceSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org