You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/04/12 18:20:28 UTC
spark git commit: [SPARK-20301][FLAKY-TEST] Fix Hadoop
Shell.runCommand flakiness in Structured Streaming tests
Repository: spark
Updated Branches:
refs/heads/master 99a947312 -> 924c42477
[SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in Structured Streaming tests
## What changes were proposed in this pull request?
Some Structured Streaming tests show flakiness such as:
```
[info] - prune results by current_date, complete mode - 696 *** FAILED *** (10 seconds, 937 milliseconds)
[info] Timed out while stopping and waiting for microbatchthread to terminate.: The code passed to failAfter did not complete within 10 seconds.
```
This happens when we wait for the stream to stop, but it doesn't. The reason it doesn't stop is that we interrupt the microBatchThread, but Hadoop's `Shell.runCommand` swallows the interrupt exception, and the exception is not propagated upstream to the microBatchThread. Then this thread continues to run, only to start blocking on the `streamManualClock`.
## How was this patch tested?
Thousand retries locally and [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75720/testReport) of the flaky tests
Author: Burak Yavuz <br...@gmail.com>
Closes #17613 from brkyvz/flaky-stream-agg.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/924c4247
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/924c4247
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/924c4247
Branch: refs/heads/master
Commit: 924c42477b5d6ed3c217c8eaaf4dc64b2379851a
Parents: 99a9473
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Apr 12 11:24:59 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Apr 12 11:24:59 2017 -0700
----------------------------------------------------------------------
.../execution/streaming/StreamExecution.scala | 56 +++++++++-----------
.../apache/spark/sql/streaming/StreamTest.scala | 6 +++
2 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/924c4247/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8857966..bcf0d97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -284,42 +284,38 @@ class StreamExecution(
triggerExecutor.execute(() => {
startTrigger()
- val continueToRun =
- if (isActive) {
- reportTimeTaken("triggerExecution") {
- if (currentBatchId < 0) {
- // We'll do this initialization only once
- populateStartOffsets(sparkSessionToRunBatches)
- logDebug(s"Stream running from $committedOffsets to $availableOffsets")
- } else {
- constructNextBatch()
- }
- if (dataAvailable) {
- currentStatus = currentStatus.copy(isDataAvailable = true)
- updateStatusMessage("Processing new data")
- runBatch(sparkSessionToRunBatches)
- }
+ if (isActive) {
+ reportTimeTaken("triggerExecution") {
+ if (currentBatchId < 0) {
+ // We'll do this initialization only once
+ populateStartOffsets(sparkSessionToRunBatches)
+ logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+ } else {
+ constructNextBatch()
}
- // Report trigger as finished and construct progress object.
- finishTrigger(dataAvailable)
if (dataAvailable) {
- // Update committed offsets.
- batchCommitLog.add(currentBatchId)
- committedOffsets ++= availableOffsets
- logDebug(s"batch ${currentBatchId} committed")
- // We'll increase currentBatchId after we complete processing current batch's data
- currentBatchId += 1
- } else {
- currentStatus = currentStatus.copy(isDataAvailable = false)
- updateStatusMessage("Waiting for data to arrive")
- Thread.sleep(pollingDelayMs)
+ currentStatus = currentStatus.copy(isDataAvailable = true)
+ updateStatusMessage("Processing new data")
+ runBatch(sparkSessionToRunBatches)
}
- true
+ }
+ // Report trigger as finished and construct progress object.
+ finishTrigger(dataAvailable)
+ if (dataAvailable) {
+ // Update committed offsets.
+ batchCommitLog.add(currentBatchId)
+ committedOffsets ++= availableOffsets
+ logDebug(s"batch ${currentBatchId} committed")
+ // We'll increase currentBatchId after we complete processing current batch's data
+ currentBatchId += 1
} else {
- false
+ currentStatus = currentStatus.copy(isDataAvailable = false)
+ updateStatusMessage("Waiting for data to arrive")
+ Thread.sleep(pollingDelayMs)
}
+ }
updateStatusMessage("Waiting for next trigger")
- continueToRun
+ isActive
})
updateStatusMessage("Stopped")
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/924c4247/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 03aa45b..5bc36dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -277,6 +277,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def threadState =
if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
+ def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) {
+ s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
+ } else {
+ ""
+ }
def testState =
s"""
@@ -287,6 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
|Output Mode: $outputMode
|Stream state: $currentOffsets
|Thread state: $threadState
+ |$threadStackTrace
|${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""}
|
|== Sink ==
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org