You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2018/08/23 18:17:19 UTC
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
GitHub user zsxwing opened a pull request:
https://github.com/apache/spark/pull/22207
[SPARK-25214][SS]Fix the issue that Kafka v2 source may return duplicated records when `failOnDataLoss=false`
## What changes were proposed in this pull request?
When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false`.
This PR fixes the issue and also adds regression tests for all Kafka readers.
## How was this patch tested?
New tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zsxwing/spark SPARK-25214
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/22207.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #22207
----
commit f2d4d67c765a298d23964b26ec07596839f008fa
Author: Shixiong Zhu <zs...@...>
Date: 2018-08-23T17:46:52Z
Fix the issue that Kafka v2 source may return duplicated records when is
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22207
Thanks! Merging to master and 2.3.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212706908
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ /**
+ * @param testStreamingQuery whether to test a streaming query or a batch query.
+ * @param writeToTable the function to write the specified [[DataFrame]] to the given table.
+ */
+ private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+ testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+ eventually(timeout(60.seconds)) {
+ assert(
+ testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+ "Kafka didn't delete records after 1 minute")
+ }
+
+ val table = "DontFailOnDataLoss"
+ withTable(table) {
+ val df = if (testStreamingQuery) {
+ spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ } else {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ }
+ writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
+ val result = spark.table(table).as[String].collect().toList
+ assert(result.distinct.size === result.size, s"$result contains duplicated records")
+ // Make sure Kafka did remove some records so that this test is valid.
+ assert(result.size > 0 && result.size < 50)
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v1") {
+ withSQLConf(
+ "spark.sql.streaming.disabledV2MicroBatchReaders" ->
+ classOf[KafkaSourceProvider].getCanonicalName) {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v2") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: continuous processing") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream
+ .format("memory")
+ .queryName(table)
+ .trigger(Trigger.Continuous(100))
+ .start()
+ try {
+ eventually(timeout(60.seconds)) {
+ assert(spark.table(table).as[String].collect().contains("49"))
+ }
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: batch") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) =>
+ df.write.saveAsTable(table)
+ }
+ }
+}
+
+class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ protected def startStream(ds: Dataset[Int]) = {
+ ds.writeStream.foreach(new ForeachWriter[Int] {
+
+ override def open(partitionId: Long, version: Long): Boolean = {
+ true
+ }
+
+ override def process(value: Int): Unit = {
+ // Slow down the processing speed so that messages may be aged out.
+ Thread.sleep(Random.nextInt(500))
+ }
+
+ override def close(errorOrNull: Throwable): Unit = {
--- End diff --
nit: make single line.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2504/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212707515
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ /**
+ * @param testStreamingQuery whether to test a streaming query or a batch query.
+ * @param writeToTable the function to write the specified [[DataFrame]] to the given table.
+ */
+ private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+ testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+ eventually(timeout(60.seconds)) {
+ assert(
+ testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+ "Kafka didn't delete records after 1 minute")
+ }
+
+ val table = "DontFailOnDataLoss"
+ withTable(table) {
+ val df = if (testStreamingQuery) {
+ spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ } else {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ }
+ writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
+ val result = spark.table(table).as[String].collect().toList
+ assert(result.distinct.size === result.size, s"$result contains duplicated records")
+ // Make sure Kafka did remove some records so that this test is valid.
+ assert(result.size > 0 && result.size < 50)
--- End diff --
I checked Kafka codes and it will keep at least one segment for a topic. I also did a simple test to make sure it will not delete all records: Added `Thread.sleep(120000)` after `eventually(timeout(60.seconds)) { assert( testUtils.getEarliestOffsets(Set(topic)).head._2 > 0, "Kafka didn't delete records after 1 minute") }`. and the assertion still passed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212706859
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ /**
+ * @param testStreamingQuery whether to test a streaming query or a batch query.
+ * @param writeToTable the function to write the specified [[DataFrame]] to the given table.
+ */
+ private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+ testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+ eventually(timeout(60.seconds)) {
+ assert(
+ testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+ "Kafka didn't delete records after 1 minute")
+ }
+
+ val table = "DontFailOnDataLoss"
+ withTable(table) {
+ val df = if (testStreamingQuery) {
+ spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ } else {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ }
+ writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
+ val result = spark.table(table).as[String].collect().toList
+ assert(result.distinct.size === result.size, s"$result contains duplicated records")
+ // Make sure Kafka did remove some records so that this test is valid.
+ assert(result.size > 0 && result.size < 50)
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v1") {
+ withSQLConf(
+ "spark.sql.streaming.disabledV2MicroBatchReaders" ->
+ classOf[KafkaSourceProvider].getCanonicalName) {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v2") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: continuous processing") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream
+ .format("memory")
+ .queryName(table)
+ .trigger(Trigger.Continuous(100))
+ .start()
+ try {
+ eventually(timeout(60.seconds)) {
+ assert(spark.table(table).as[String].collect().contains("49"))
+ }
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: batch") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) =>
+ df.write.saveAsTable(table)
+ }
+ }
+}
+
+class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ protected def startStream(ds: Dataset[Int]) = {
+ ds.writeStream.foreach(new ForeachWriter[Int] {
+
+ override def open(partitionId: Long, version: Long): Boolean = {
--- End diff --
nit: make single line.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22207
**[Test build #95224 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95224/testReport)** for PR 22207 at commit [`3515275`](https://github.com/apache/spark/commit/351527555c53a3977aa86a7057ed0aa12fb0976e).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22207
I just realized the Kafka source v2 is not in 2.3 :)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22207
**[Test build #95224 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95224/testReport)** for PR 22207 at commit [`3515275`](https://github.com/apache/spark/commit/351527555c53a3977aa86a7057ed0aa12fb0976e).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22207
**[Test build #95177 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95177/testReport)** for PR 22207 at commit [`f2d4d67`](https://github.com/apache/spark/commit/f2d4d67c765a298d23964b26ec07596839f008fa).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2538/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22207
**[Test build #95177 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95177/testReport)** for PR 22207 at commit [`f2d4d67`](https://github.com/apache/spark/commit/f2d4d67c765a298d23964b26ec07596839f008fa).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/22207
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212526645
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ /**
+ * @param testStreamingQuery whether to test a streaming query or a batch query.
+ * @param writeToTable the function to write the specified [[DataFrame]] to the given table.
+ */
+ private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+ testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+ eventually(timeout(60.seconds)) {
+ assert(
+ testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+ "Kafka didn't delete records after 1 minute")
+ }
+
+ val table = "DontFailOnDataLoss"
+ withTable(table) {
+ val df = if (testStreamingQuery) {
+ spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ } else {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
--- End diff --
dedup these options into map... just to make sure they are never in consistent.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212706003
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ /**
+ * @param testStreamingQuery whether to test a streaming query or a batch query.
+ * @param writeToTable the function to write the specified [[DataFrame]] to the given table.
+ */
+ private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+ testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+ eventually(timeout(60.seconds)) {
+ assert(
+ testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+ "Kafka didn't delete records after 1 minute")
+ }
+
+ val table = "DontFailOnDataLoss"
+ withTable(table) {
+ val df = if (testStreamingQuery) {
+ spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ } else {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ }
+ writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
+ val result = spark.table(table).as[String].collect().toList
+ assert(result.distinct.size === result.size, s"$result contains duplicated records")
+ // Make sure Kafka did remove some records so that this test is valid.
+ assert(result.size > 0 && result.size < 50)
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v1") {
+ withSQLConf(
+ "spark.sql.streaming.disabledV2MicroBatchReaders" ->
+ classOf[KafkaSourceProvider].getCanonicalName) {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v2") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: continuous processing") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream
+ .format("memory")
+ .queryName(table)
+ .trigger(Trigger.Continuous(100))
+ .start()
+ try {
+ eventually(timeout(60.seconds)) {
+ assert(spark.table(table).as[String].collect().contains("49"))
--- End diff --
doesnt processAllAvailable work in continuous processing?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22207
**[Test build #95179 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95179/testReport)** for PR 22207 at commit [`e968159`](https://github.com/apache/spark/commit/e968159a1fcedadd6885a424656e1c8c5afb1aa7).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2506/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212409454
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -1187,134 +1185,3 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
iterations = 50)
}
}
-
-class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext {
--- End diff --
Moved to KafkaDontFailOnDataLossSuite.scala
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212409340
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---
@@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
}
- override def count(): Long = offsetRanges.map(_.size).sum
--- End diff --
These methods are never used as Dataset always uses this RDD: https://github.com/apache/spark/blob/2a0a8f753bbdc8c251f8e699c0808f35b94cfd20/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L113 and `MapPartitionsRDD` just calls the default RDD implementation. In addition, they may return wrong answers when `failOnDataLoss=false`. Hence, I just removed them.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212410113
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ /**
+ * @param testStreamingQuery whether to test a streaming query or a batch query.
+ * @param writeToTable the function to write the specified [[DataFrame]] to the given table.
+ */
+ private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+ testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+ eventually(timeout(60.seconds)) {
+ assert(
+ testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+ "Kafka didn't delete records after 1 minute")
+ }
+
+ val table = "DontFailOnDataLoss"
+ withTable(table) {
+ val df = if (testStreamingQuery) {
+ spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ } else {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ }
+ writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
+ val result = spark.table(table).as[String].collect().toList
+ assert(result.distinct.size === result.size, s"$result contains duplicated records")
+ // Make sure Kafka did remove some records so that this test is valid.
+ assert(result.size > 0 && result.size < 50)
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v1") {
+ withSQLConf(
+ "spark.sql.streaming.disabledV2MicroBatchReaders" ->
+ classOf[KafkaSourceProvider].getCanonicalName) {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v2") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: continuous processing") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream
+ .format("memory")
+ .queryName(table)
+ .trigger(Trigger.Continuous(100))
+ .start()
+ try {
+ eventually(timeout(60.seconds)) {
+ assert(spark.table(table).as[String].collect().contains("49"))
+ }
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: batch") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) =>
+ df.write.saveAsTable(table)
+ }
+ }
+}
+
+class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest {
--- End diff --
Copied from KafkaMicroBatchSourceSuite.scala. I also moved the set up codes to `KafkaMissingOffsetsTest` to share with KafkaDontFailOnDataLossSuite.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95179/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95177/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212526373
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ /**
+ * @param testStreamingQuery whether to test a streaming query or a batch query.
+ * @param writeToTable the function to write the specified [[DataFrame]] to the given table.
+ */
+ private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+ testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+ eventually(timeout(60.seconds)) {
+ assert(
+ testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+ "Kafka didn't delete records after 1 minute")
+ }
+
+ val table = "DontFailOnDataLoss"
+ withTable(table) {
+ val df = if (testStreamingQuery) {
+ spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ } else {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ }
+ writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
+ val result = spark.table(table).as[String].collect().toList
+ assert(result.distinct.size === result.size, s"$result contains duplicated records")
+ // Make sure Kafka did remove some records so that this test is valid.
+ assert(result.size > 0 && result.size < 50)
--- End diff --
How do you ensure that the above configure retention policy will not completely delete all records?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22207#discussion_r212709927
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+ import testImplicits._
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ /**
+ * @param testStreamingQuery whether to test a streaming query or a batch query.
+ * @param writeToTable the function to write the specified [[DataFrame]] to the given table.
+ */
+ private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+ testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+ eventually(timeout(60.seconds)) {
+ assert(
+ testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+ "Kafka didn't delete records after 1 minute")
+ }
+
+ val table = "DontFailOnDataLoss"
+ withTable(table) {
+ val df = if (testStreamingQuery) {
+ spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ } else {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ .option("startingOffsets", s"""{"$topic":{"0":0}}""")
+ .option("failOnDataLoss", "false")
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .load()
+ }
+ writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
+ val result = spark.table(table).as[String].collect().toList
+ assert(result.distinct.size === result.size, s"$result contains duplicated records")
+ // Make sure Kafka did remove some records so that this test is valid.
+ assert(result.size > 0 && result.size < 50)
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v1") {
+ withSQLConf(
+ "spark.sql.streaming.disabledV2MicroBatchReaders" ->
+ classOf[KafkaSourceProvider].getCanonicalName) {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: v2") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream.format("memory").queryName(table).start()
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("failOnDataLoss=false should not return duplicated records: continuous processing") {
+ verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
+ val query = df.writeStream
+ .format("memory")
+ .queryName(table)
+ .trigger(Trigger.Continuous(100))
+ .start()
+ try {
+ eventually(timeout(60.seconds)) {
+ assert(spark.table(table).as[String].collect().contains("49"))
--- End diff --
I didn't know it works!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22207
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95224/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22207
**[Test build #95179 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95179/testReport)** for PR 22207 at commit [`e968159`](https://github.com/apache/spark/commit/e968159a1fcedadd6885a424656e1c8c5afb1aa7).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org