You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/15 18:01:23 UTC
[spark] branch master updated: [SPARK-28404][SS] Fix negative
timeout value in RateStreamContinuousPartitionReader
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8f7ccc5 [SPARK-28404][SS] Fix negative timeout value in RateStreamContinuousPartitionReader
8f7ccc5 is described below
commit 8f7ccc5e9c8be723947947c4130a48781bf6e355
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Mon Jul 15 11:01:03 2019 -0700
[SPARK-28404][SS] Fix negative timeout value in RateStreamContinuousPartitionReader
## What changes were proposed in this pull request?
`System.currentTimeMillis` read two times in a loop in `RateStreamContinuousPartitionReader`. If the test machine is slow enough and it spends quite some time between the `while` condition check and the `Thread.sleep` then the timeout value is negative and throws `IllegalArgumentException`.
In this PR I've fixed this issue.
## How was this patch tested?
Existing unit tests.
Closes #25162 from gaborgsomogyi/SPARK-28404.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../execution/streaming/continuous/ContinuousRateStreamSource.scala | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index d55f71c..e1b7a8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -134,8 +134,10 @@ class RateStreamContinuousPartitionReader(
nextReadTime += readTimeIncrement
try {
- while (System.currentTimeMillis < nextReadTime) {
- Thread.sleep(nextReadTime - System.currentTimeMillis)
+ var toWaitMs = nextReadTime - System.currentTimeMillis
+ while (toWaitMs > 0) {
+ Thread.sleep(toWaitMs)
+ toWaitMs = nextReadTime - System.currentTimeMillis
}
} catch {
case _: InterruptedException =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org