You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/15 18:01:23 UTC

[spark] branch master updated: [SPARK-28404][SS] Fix negative timeout value in RateStreamContinuousPartitionReader

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f7ccc5  [SPARK-28404][SS] Fix negative timeout value in RateStreamContinuousPartitionReader
8f7ccc5 is described below

commit 8f7ccc5e9c8be723947947c4130a48781bf6e355
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Mon Jul 15 11:01:03 2019 -0700

    [SPARK-28404][SS] Fix negative timeout value in RateStreamContinuousPartitionReader
    
    ## What changes were proposed in this pull request?
    
    `System.currentTimeMillis` read two times in a loop in `RateStreamContinuousPartitionReader`. If the test machine is slow enough and it spends quite some time between the `while` condition check and the `Thread.sleep` then the timeout value is negative and throws `IllegalArgumentException`.
    
    In this PR I've fixed this issue.
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #25162 from gaborgsomogyi/SPARK-28404.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../execution/streaming/continuous/ContinuousRateStreamSource.scala | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index d55f71c..e1b7a8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -134,8 +134,10 @@ class RateStreamContinuousPartitionReader(
     nextReadTime += readTimeIncrement
 
     try {
-      while (System.currentTimeMillis < nextReadTime) {
-        Thread.sleep(nextReadTime - System.currentTimeMillis)
+      var toWaitMs = nextReadTime - System.currentTimeMillis
+      while (toWaitMs > 0) {
+        Thread.sleep(toWaitMs)
+        toWaitMs = nextReadTime - System.currentTimeMillis
       }
     } catch {
       case _: InterruptedException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org