You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/07 21:47:48 UTC
spark git commit: [SPARK-18588][TESTS] Fix flaky test:
KafkaSourceStressForDontFailOnDataLossSuite
Repository: spark
Updated Branches:
refs/heads/master bb94f61a7 -> edc87e189
[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite
## What changes were proposed in this pull request?
Fixed the following failures:
```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout.
```
```
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
at java.util.ArrayList.addAll(ArrayList.java:577)
at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257)
at org.apache.kafka.clients.Metadata.update(Metadata.java:177)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at
...
```
## How was this patch tested?
Tested in #16048 by running many times.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #16109 from zsxwing/fix-kafka-flaky-test.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edc87e18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edc87e18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edc87e18
Branch: refs/heads/master
Commit: edc87e18922b98be47c298cdc3daa2b049a737e9
Parents: bb94f61
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Dec 7 13:47:44 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 7 13:47:44 2016 -0800
----------------------------------------------------------------------
.../sql/kafka010/CachedKafkaConsumer.scala | 39 ++++++++--
.../apache/spark/sql/kafka010/KafkaSource.scala | 2 +-
.../spark/sql/kafka010/KafkaSourceSuite.scala | 11 ++-
.../spark/sql/kafka010/KafkaTestUtils.scala | 75 +++++++++++++-------
.../spark/sql/test/SharedSQLContext.scala | 8 ++-
5 files changed, 96 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e9..3f396a7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
var toFetchOffset = offset
while (toFetchOffset != UNKNOWN_OFFSET) {
try {
- return fetchData(toFetchOffset, pollTimeoutMs)
+ return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private(
}
/**
- * Get the record at `offset`.
+ * Get the record for the given offset if available. Otherwise it will either throw error
+ * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
+ * or null.
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/
private def fetchData(
offset: Long,
- pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+ untilOffset: Long,
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
// This is the first fetch, or the last pre-fetched data has been drained.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
@@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private(
} else {
val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1
- // `seek` is always called before "poll". So "record.offset" must be same as "offset".
- assert(record.offset == offset,
- s"The fetched data has a different offset: expected $offset but was ${record.offset}")
- record
+ // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
+ // available offset. Hence we need to handle offset mismatch.
+ if (record.offset > offset) {
+ // This may happen when some records aged out but their offsets already got verified
+ if (failOnDataLoss) {
+ reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
+ // Never happen as "reportDataLoss" will throw an exception
+ null
+ } else {
+ if (record.offset >= untilOffset) {
+ reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
+ null
+ } else {
+ reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
+ record
+ }
+ }
+ } else if (record.offset < offset) {
+ // This should not happen. If it does happen, then we probably misunderstand Kafka internal
+ // mechanism.
+ throw new IllegalStateException(
+ s"Tried to fetch $offset but the returned record offset was ${record.offset}")
+ } else {
+ record
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index d9ab4bb..92ee0ed 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
private val offsetFetchAttemptIntervalMs =
- sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
+ sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
private val maxOffsetsPerTrigger =
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 2d6ccb2..0e40aba 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
+import org.apache.spark.SparkContext
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
@@ -811,6 +812,11 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
override def beforeAll(): Unit = {
super.beforeAll()
testUtils = new KafkaTestUtils {
@@ -839,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
}
}
- ignore("stress test for failOnDataLoss=false") {
+ test("stress test for failOnDataLoss=false") {
val reader = spark
.readStream
.format("kafka")
@@ -848,6 +854,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
.option("subscribePattern", "failOnDataLoss.*")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
+ .option("fetchOffset.retryIntervalMs", "3000")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index f43917e..fd1689a 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -184,7 +184,7 @@ class KafkaTestUtils extends Logging {
def deleteTopic(topic: String): Unit = {
val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
AdminUtils.deleteTopic(zkUtils, topic)
- verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
+ verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
}
/** Add new paritions to a Kafka topic */
@@ -286,36 +286,57 @@ class KafkaTestUtils extends Logging {
props
}
+ /** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
private def verifyTopicDeletion(
+ topic: String,
+ numPartitions: Int,
+ servers: Seq[KafkaServer]): Unit = {
+ val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+
+ import ZkUtils._
+ // wait until admin path for delete topic is deleted, signaling completion of topic deletion
+ assert(
+ !zkUtils.pathExists(getDeleteTopicPath(topic)),
+ s"${getDeleteTopicPath(topic)} still exists")
+ assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
+ // ensure that the topic-partition has been deleted from all brokers' replica managers
+ assert(servers.forall(server => topicAndPartitions.forall(tp =>
+ server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+ s"topic $topic still exists in the replica manager")
+ // ensure that logs from all replicas are deleted if delete topic is marked successful
+ assert(servers.forall(server => topicAndPartitions.forall(tp =>
+ server.getLogManager().getLog(tp).isEmpty)),
+ s"topic $topic still exists in log mananger")
+ // ensure that topic is removed from all cleaner offsets
+ assert(servers.forall(server => topicAndPartitions.forall { tp =>
+ val checkpoints = server.getLogManager().logDirs.map { logDir =>
+ new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+ }
+ checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
+ }), s"checkpoint for topic $topic still exists")
+ // ensure the topic is gone
+ assert(
+ !zkUtils.getAllTopics().contains(topic),
+ s"topic $topic still exists on zookeeper")
+ }
+
+ /** Verify topic is deleted. Retry to delete the topic if not. */
+ private def verifyTopicDeletionWithRetries(
zkUtils: ZkUtils,
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]) {
- import ZkUtils._
- val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
- def isDeleted(): Boolean = {
- // wait until admin path for delete topic is deleted, signaling completion of topic deletion
- val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
- val topicPath = !zkUtils.pathExists(getTopicPath(topic))
- // ensure that the topic-partition has been deleted from all brokers' replica managers
- val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
- server.replicaManager.getPartition(tp.topic, tp.partition) == None))
- // ensure that logs from all replicas are deleted if delete topic is marked successful
- val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
- server.getLogManager().getLog(tp).isEmpty))
- // ensure that topic is removed from all cleaner offsets
- val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
- val checkpoints = server.getLogManager().logDirs.map { logDir =>
- new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
- }
- checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
- })
- // ensure the topic is gone
- val deleted = !zkUtils.getAllTopics().contains(topic)
- deletePath && topicPath && replicaManager && logManager && cleaner && deleted
- }
- eventually(timeout(60.seconds)) {
- assert(isDeleted, s"$topic not deleted after timeout")
+ eventually(timeout(60.seconds), interval(200.millis)) {
+ try {
+ verifyTopicDeletion(topic, numPartitions, servers)
+ } catch {
+ case e: Throwable =>
+ // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
+ // chance that a topic will be recreated after deletion due to the asynchronous update.
+ // Hence, delete the topic and retry.
+ AdminUtils.deleteTopic(zkUtils, topic)
+ throw e
+ }
}
}
@@ -331,7 +352,7 @@ class KafkaTestUtils extends Logging {
case _ =>
false
}
- eventually(timeout(10.seconds)) {
+ eventually(timeout(60.seconds)) {
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index db24ee8..2239f10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
*/
protected implicit def sqlContext: SQLContext = _spark.sqlContext
+ protected def createSparkSession: TestSparkSession = {
+ new TestSparkSession(
+ sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
+ }
+
/**
* Initialize the [[TestSparkSession]].
*/
protected override def beforeAll(): Unit = {
SparkSession.sqlListener.set(null)
if (_spark == null) {
- _spark = new TestSparkSession(
- sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
+ _spark = createSparkSession
}
// Ensure we have initialized the context before calling parent code
super.beforeAll()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org