You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/07 21:47:48 UTC

spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

Repository: spark
Updated Branches:
  refs/heads/master bb94f61a7 -> edc87e189


[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

## What changes were proposed in this pull request?

Fixed the following failures:

```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout.
```

```
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
	at java.util.ArrayList.addAll(ArrayList.java:577)
	at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257)
	at org.apache.kafka.clients.Metadata.update(Metadata.java:177)
	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605)
	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
	at
...
```

## How was this patch tested?

Tested in #16048 by running many times.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16109 from zsxwing/fix-kafka-flaky-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edc87e18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edc87e18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edc87e18

Branch: refs/heads/master
Commit: edc87e18922b98be47c298cdc3daa2b049a737e9
Parents: bb94f61
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Dec 7 13:47:44 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 7 13:47:44 2016 -0800

----------------------------------------------------------------------
 .../sql/kafka010/CachedKafkaConsumer.scala      | 39 ++++++++--
 .../apache/spark/sql/kafka010/KafkaSource.scala |  2 +-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 11 ++-
 .../spark/sql/kafka010/KafkaTestUtils.scala     | 75 +++++++++++++-------
 .../spark/sql/test/SharedSQLContext.scala       |  8 ++-
 5 files changed, 96 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e9..3f396a7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
     var toFetchOffset = offset
     while (toFetchOffset != UNKNOWN_OFFSET) {
       try {
-        return fetchData(toFetchOffset, pollTimeoutMs)
+        return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
       } catch {
         case e: OffsetOutOfRangeException =>
           // When there is some error thrown, it's better to use a new consumer to drop all cached
@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private(
   }
 
   /**
-   * Get the record at `offset`.
+   * Get the record for the given offset if available. Otherwise it will either throw error
+   * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
+   * or null.
    *
    * @throws OffsetOutOfRangeException if `offset` is out of range
    * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
    */
   private def fetchData(
       offset: Long,
-      pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+      untilOffset: Long,
+      pollTimeoutMs: Long,
+      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
     if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
       // This is the first fetch, or the last pre-fetched data has been drained.
       // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
@@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private(
     } else {
       val record = fetchedData.next()
       nextOffsetInFetchedData = record.offset + 1
-      // `seek` is always called before "poll". So "record.offset" must be same as "offset".
-      assert(record.offset == offset,
-        s"The fetched data has a different offset: expected $offset but was ${record.offset}")
-      record
+      // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
+      // available offset. Hence we need to handle offset mismatch.
+      if (record.offset > offset) {
+        // This may happen when some records aged out but their offsets already got verified
+        if (failOnDataLoss) {
+          reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
+          // Never happen as "reportDataLoss" will throw an exception
+          null
+        } else {
+          if (record.offset >= untilOffset) {
+            reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
+            null
+          } else {
+            reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
+            record
+          }
+        }
+      } else if (record.offset < offset) {
+        // This should not happen. If it does happen, then we probably misunderstand Kafka internal
+        // mechanism.
+        throw new IllegalStateException(
+          s"Tried to fetch $offset but the returned record offset was ${record.offset}")
+      } else {
+        record
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index d9ab4bb..92ee0ed 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
     sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
 
   private val offsetFetchAttemptIntervalMs =
-    sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
+    sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
 
   private val maxOffsetsPerTrigger =
     sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)

http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 2d6ccb2..0e40aba 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.ForeachWriter
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
 
@@ -811,6 +812,11 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
 
   private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
 
+  override def createSparkSession(): TestSparkSession = {
+    // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+    new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+  }
+
   override def beforeAll(): Unit = {
     super.beforeAll()
     testUtils = new KafkaTestUtils {
@@ -839,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
     }
   }
 
-  ignore("stress test for failOnDataLoss=false") {
+  test("stress test for failOnDataLoss=false") {
     val reader = spark
       .readStream
       .format("kafka")
@@ -848,6 +854,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
       .option("subscribePattern", "failOnDataLoss.*")
       .option("startingOffsets", "earliest")
       .option("failOnDataLoss", "false")
+      .option("fetchOffset.retryIntervalMs", "3000")
     val kafka = reader.load()
       .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
       .as[(String, String)]

http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index f43917e..fd1689a 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -184,7 +184,7 @@ class KafkaTestUtils extends Logging {
   def deleteTopic(topic: String): Unit = {
     val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
     AdminUtils.deleteTopic(zkUtils, topic)
-    verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
+    verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
   }
 
   /** Add new paritions to a Kafka topic */
@@ -286,36 +286,57 @@ class KafkaTestUtils extends Logging {
     props
   }
 
+  /** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
   private def verifyTopicDeletion(
+      topic: String,
+      numPartitions: Int,
+      servers: Seq[KafkaServer]): Unit = {
+    val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+
+    import ZkUtils._
+    // wait until admin path for delete topic is deleted, signaling completion of topic deletion
+    assert(
+      !zkUtils.pathExists(getDeleteTopicPath(topic)),
+      s"${getDeleteTopicPath(topic)} still exists")
+    assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
+    // ensure that the topic-partition has been deleted from all brokers' replica managers
+    assert(servers.forall(server => topicAndPartitions.forall(tp =>
+      server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+      s"topic $topic still exists in the replica manager")
+    // ensure that logs from all replicas are deleted if delete topic is marked successful
+    assert(servers.forall(server => topicAndPartitions.forall(tp =>
+      server.getLogManager().getLog(tp).isEmpty)),
+      s"topic $topic still exists in log mananger")
+    // ensure that topic is removed from all cleaner offsets
+    assert(servers.forall(server => topicAndPartitions.forall { tp =>
+      val checkpoints = server.getLogManager().logDirs.map { logDir =>
+        new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+      }
+      checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
+    }), s"checkpoint for topic $topic still exists")
+    // ensure the topic is gone
+    assert(
+      !zkUtils.getAllTopics().contains(topic),
+      s"topic $topic still exists on zookeeper")
+  }
+
+  /** Verify topic is deleted. Retry to delete the topic if not. */
+  private def verifyTopicDeletionWithRetries(
       zkUtils: ZkUtils,
       topic: String,
       numPartitions: Int,
       servers: Seq[KafkaServer]) {
-    import ZkUtils._
-    val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
-    def isDeleted(): Boolean = {
-      // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-      val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
-      val topicPath = !zkUtils.pathExists(getTopicPath(topic))
-      // ensure that the topic-partition has been deleted from all brokers' replica managers
-      val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
-        server.replicaManager.getPartition(tp.topic, tp.partition) == None))
-      // ensure that logs from all replicas are deleted if delete topic is marked successful
-      val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
-        server.getLogManager().getLog(tp).isEmpty))
-      // ensure that topic is removed from all cleaner offsets
-      val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
-        val checkpoints = server.getLogManager().logDirs.map { logDir =>
-          new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
-        }
-        checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
-      })
-      // ensure the topic is gone
-      val deleted = !zkUtils.getAllTopics().contains(topic)
-      deletePath && topicPath && replicaManager && logManager && cleaner && deleted
-    }
-    eventually(timeout(60.seconds)) {
-      assert(isDeleted, s"$topic not deleted after timeout")
+    eventually(timeout(60.seconds), interval(200.millis)) {
+      try {
+        verifyTopicDeletion(topic, numPartitions, servers)
+      } catch {
+        case e: Throwable =>
+          // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
+          // chance that a topic will be recreated after deletion due to the asynchronous update.
+          // Hence, delete the topic and retry.
+          AdminUtils.deleteTopic(zkUtils, topic)
+          throw e
+      }
     }
   }
 
@@ -331,7 +352,7 @@ class KafkaTestUtils extends Logging {
       case _ =>
         false
     }
-    eventually(timeout(10.seconds)) {
+    eventually(timeout(60.seconds)) {
       assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index db24ee8..2239f10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
    */
   protected implicit def sqlContext: SQLContext = _spark.sqlContext
 
+  protected def createSparkSession: TestSparkSession = {
+    new TestSparkSession(
+      sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
+  }
+
   /**
    * Initialize the [[TestSparkSession]].
    */
   protected override def beforeAll(): Unit = {
     SparkSession.sqlListener.set(null)
     if (_spark == null) {
-      _spark = new TestSparkSession(
-        sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
+      _spark = createSparkSession
     }
     // Ensure we have initialized the context before calling parent code
     super.beforeAll()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org