You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gerard Maas (JIRA)" <ji...@apache.org> on 2018/04/22 20:02:00 UTC

[jira] [Updated] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond

     [ https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gerard Maas updated SPARK-24046:
--------------------------------
    Summary: Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond  (was: Rate Source does gradually increase rate when rampUpTime>=RowsPerSecond)

> Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
> --------------------------------------------------------------------------
>
>                 Key: SPARK-24046
>                 URL: https://issues.apache.org/jira/browse/SPARK-24046
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>         Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4
> (Environment is not important, the issue lies in the rate calculation)
>            Reporter: Gerard Maas
>            Priority: Major
>              Labels: RateSource
>
> When using the rate source in Structured streaming, the `rampUpTime` feature fails to gradually increase the stream rate when the `rampUpTime` option is equal or greater than `rowsPerSecond`. 
> When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 0 values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.
> The following scenario, executed in the `spark-shell` demonstrates this issue:
> {code:java}
> // Using rampUpTime(10) > rowsPerSecond(5)  
> {code}
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 5)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 5
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 6
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 7
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 8
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 9
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 10
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 11
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 17:08:...| 0|
> |2018-04-22 17:08:...| 1|
> |2018-04-22 17:08:...| 2|
> |2018-04-22 17:08:...| 3|
> |2018-04-22 17:08:...| 4|
> +--------------------+-----+
> -------------------------------------------
> Batch: 12
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 17:08:...| 5|
> |2018-04-22 17:08:...| 6|
> |2018-04-22 17:08:...| 7|
> |2018-04-22 17:08:...| 8|
> |2018-04-22 17:08:...| 9|
> +--------------------+-----+
> {code}
>  
> This scenario shows rowsPerSecond == rampUpTime,  which also fails
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@149ef64a
> scala> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 5
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 6
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 7
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 8
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 9
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 10
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 11
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:32:...| 0|
> |2018-04-22 15:32:...| 1|
> |2018-04-22 15:32:...| 2|
> |2018-04-22 15:32:...| 3|
> |2018-04-22 15:32:...| 4|
> |2018-04-22 15:32:...| 5|
> |2018-04-22 15:32:...| 6|
> |2018-04-22 15:32:...| 7|
> |2018-04-22 15:32:...| 8|
> |2018-04-22 15:32:...| 9|
> +--------------------+-----+
> -------------------------------------------
> Batch: 12
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:32:...| 10|
> |2018-04-22 15:32:...| 11|
> |2018-04-22 15:32:...| 12|
> |2018-04-22 15:32:...| 13|
> |2018-04-22 15:32:...| 14|
> |2018-04-22 15:32:...| 15|
> |2018-04-22 15:32:...| 16|
> |2018-04-22 15:32:...| 17|
> |2018-04-22 15:32:...| 18|
> |2018-04-22 15:32:...| 19|
> +--------------------+-----+
> {code}
>  
> In contrast, when `rowsPerSecond > rampUpTime` the gradual increase happens as expected.
>  
> {code:java}
> .option("rowsPerSecond", 11)
> .option("rampUpTime", 10){code}
>  
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 11)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19c6e821
> scala> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +---------+-----+
> |timestamp|value|
> +---------+-----+
> +---------+-----+
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 0|
> +--------------------+-----+
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 1|
> |2018-04-22 15:34:...| 2|
> +--------------------+-----+
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 3|
> |2018-04-22 15:34:...| 4|
> |2018-04-22 15:34:...| 5|
> +--------------------+-----+
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 6|
> |2018-04-22 15:34:...| 7|
> |2018-04-22 15:34:...| 8|
> |2018-04-22 15:34:...| 9|
> +--------------------+-----+
> -------------------------------------------
> Batch: 5
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 10|
> |2018-04-22 15:34:...| 11|
> |2018-04-22 15:34:...| 12|
> |2018-04-22 15:34:...| 13|
> |2018-04-22 15:34:...| 14|
> +--------------------+-----+
> -------------------------------------------
> Batch: 6
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 15|
> |2018-04-22 15:34:...| 16|
> |2018-04-22 15:34:...| 17|
> |2018-04-22 15:34:...| 18|
> |2018-04-22 15:34:...| 19|
> |2018-04-22 15:34:...| 20|
> +--------------------+-----+
> -------------------------------------------
> Batch: 7
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 21|
> |2018-04-22 15:34:...| 22|
> |2018-04-22 15:34:...| 23|
> |2018-04-22 15:34:...| 24|
> |2018-04-22 15:34:...| 25|
> |2018-04-22 15:34:...| 26|
> |2018-04-22 15:34:...| 27|
> +--------------------+-----+
> -------------------------------------------
> Batch: 8
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 28|
> |2018-04-22 15:34:...| 29|
> |2018-04-22 15:34:...| 30|
> |2018-04-22 15:34:...| 31|
> |2018-04-22 15:34:...| 32|
> |2018-04-22 15:34:...| 33|
> |2018-04-22 15:34:...| 34|
> |2018-04-22 15:34:...| 35|
> +--------------------+-----+
> -------------------------------------------
> Batch: 9
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 36|
> |2018-04-22 15:34:...| 37|
> |2018-04-22 15:34:...| 38|
> |2018-04-22 15:34:...| 39|
> |2018-04-22 15:34:...| 40|
> |2018-04-22 15:34:...| 41|
> |2018-04-22 15:34:...| 42|
> |2018-04-22 15:34:...| 43|
> |2018-04-22 15:34:...| 44|
> +--------------------+-----+
> -------------------------------------------
> Batch: 10
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 45|
> |2018-04-22 15:34:...| 46|
> |2018-04-22 15:34:...| 47|
> |2018-04-22 15:34:...| 48|
> |2018-04-22 15:34:...| 49|
> |2018-04-22 15:34:...| 50|
> |2018-04-22 15:34:...| 51|
> |2018-04-22 15:34:...| 52|
> |2018-04-22 15:34:...| 53|
> |2018-04-22 15:34:...| 54|
> +--------------------+-----+
> -------------------------------------------
> Batch: 11
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 55|
> |2018-04-22 15:34:...| 56|
> |2018-04-22 15:34:...| 57|
> |2018-04-22 15:34:...| 58|
> |2018-04-22 15:34:...| 59|
> |2018-04-22 15:34:...| 60|
> |2018-04-22 15:34:...| 61|
> |2018-04-22 15:34:...| 62|
> |2018-04-22 15:34:...| 63|
> |2018-04-22 15:34:...| 64|
> |2018-04-22 15:34:...| 65|
> +--------------------+-----+
> -------------------------------------------
> Batch: 12
> -------------------------------------------
> +--------------------+-----+
> | timestamp|value|
> +--------------------+-----+
> |2018-04-22 15:34:...| 66|
> |2018-04-22 15:34:...| 67|
> |2018-04-22 15:34:...| 68|
> |2018-04-22 15:34:...| 69|
> |2018-04-22 15:34:...| 70|
> |2018-04-22 15:34:...| 71|
> |2018-04-22 15:34:...| 72|
> |2018-04-22 15:34:...| 73|
> |2018-04-22 15:34:...| 74|
> |2018-04-22 15:34:...| 75|
> |2018-04-22 15:34:...| 76|
> +--------------------+-----+
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org