You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/04/12 18:20:28 UTC

spark git commit: [SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in Structured Streaming tests

Repository: spark
Updated Branches:
  refs/heads/master 99a947312 -> 924c42477


[SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in Structured Streaming tests

## What changes were proposed in this pull request?

Some Structured Streaming tests show flakiness such as:
```
[info] - prune results by current_date, complete mode - 696 *** FAILED *** (10 seconds, 937 milliseconds)
[info]   Timed out while stopping and waiting for microbatchthread to terminate.: The code passed to failAfter did not complete within 10 seconds.
```

This happens when we wait for the stream to stop, but it doesn't. The reason it doesn't stop is that we interrupt the microBatchThread, but Hadoop's `Shell.runCommand` swallows the interrupt exception, and the exception is not propagated upstream to the microBatchThread. Then this thread continues to run, only to start blocking on the `streamManualClock`.

## How was this patch tested?

Thousand retries locally and [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75720/testReport) of the flaky tests

Author: Burak Yavuz <br...@gmail.com>

Closes #17613 from brkyvz/flaky-stream-agg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/924c4247
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/924c4247
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/924c4247

Branch: refs/heads/master
Commit: 924c42477b5d6ed3c217c8eaaf4dc64b2379851a
Parents: 99a9473
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Apr 12 11:24:59 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Apr 12 11:24:59 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 56 +++++++++-----------
 .../apache/spark/sql/streaming/StreamTest.scala |  6 +++
 2 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/924c4247/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8857966..bcf0d97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -284,42 +284,38 @@ class StreamExecution(
         triggerExecutor.execute(() => {
           startTrigger()
 
-          val continueToRun =
-            if (isActive) {
-              reportTimeTaken("triggerExecution") {
-                if (currentBatchId < 0) {
-                  // We'll do this initialization only once
-                  populateStartOffsets(sparkSessionToRunBatches)
-                  logDebug(s"Stream running from $committedOffsets to $availableOffsets")
-                } else {
-                  constructNextBatch()
-                }
-                if (dataAvailable) {
-                  currentStatus = currentStatus.copy(isDataAvailable = true)
-                  updateStatusMessage("Processing new data")
-                  runBatch(sparkSessionToRunBatches)
-                }
+          if (isActive) {
+            reportTimeTaken("triggerExecution") {
+              if (currentBatchId < 0) {
+                // We'll do this initialization only once
+                populateStartOffsets(sparkSessionToRunBatches)
+                logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+              } else {
+                constructNextBatch()
               }
-              // Report trigger as finished and construct progress object.
-              finishTrigger(dataAvailable)
               if (dataAvailable) {
-                // Update committed offsets.
-                batchCommitLog.add(currentBatchId)
-                committedOffsets ++= availableOffsets
-                logDebug(s"batch ${currentBatchId} committed")
-                // We'll increase currentBatchId after we complete processing current batch's data
-                currentBatchId += 1
-              } else {
-                currentStatus = currentStatus.copy(isDataAvailable = false)
-                updateStatusMessage("Waiting for data to arrive")
-                Thread.sleep(pollingDelayMs)
+                currentStatus = currentStatus.copy(isDataAvailable = true)
+                updateStatusMessage("Processing new data")
+                runBatch(sparkSessionToRunBatches)
               }
-              true
+            }
+            // Report trigger as finished and construct progress object.
+            finishTrigger(dataAvailable)
+            if (dataAvailable) {
+              // Update committed offsets.
+              batchCommitLog.add(currentBatchId)
+              committedOffsets ++= availableOffsets
+              logDebug(s"batch ${currentBatchId} committed")
+              // We'll increase currentBatchId after we complete processing current batch's data
+              currentBatchId += 1
             } else {
-              false
+              currentStatus = currentStatus.copy(isDataAvailable = false)
+              updateStatusMessage("Waiting for data to arrive")
+              Thread.sleep(pollingDelayMs)
             }
+          }
           updateStatusMessage("Waiting for next trigger")
-          continueToRun
+          isActive
         })
         updateStatusMessage("Stopped")
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/924c4247/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 03aa45b..5bc36dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -277,6 +277,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
 
     def threadState =
       if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
+    def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) {
+      s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
+    } else {
+      ""
+    }
 
     def testState =
       s"""
@@ -287,6 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
          |Output Mode: $outputMode
          |Stream state: $currentOffsets
          |Thread state: $threadState
+         |$threadStackTrace
          |${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""}
          |
          |== Sink ==


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org