You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jacek Laskowski <ja...@japila.pl> on 2017/08/11 07:23:27 UTC

[SS] watermark, eventTime and "StreamExecution: Streaming query made progress"

Hi,

I'm curious why watermark is updated the next streaming batch after
it's been observed [1]? The report (from
ProgressReporter/StreamExecution) does not look right to me as
avg/max/min are already calculated according to the watermark [2]

My recommendation would be to do the update [2] in the same streaming
batch it was observed. Why not? Please enlighten.

17/08/11 09:04:20 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:20.004Z",
  "batchId" : 1,
  "numInputRows" : 2,
  "inputRowsPerSecond" : 0.7601672367920943,
  "processedRowsPerSecond" : 25.31645569620253,
  "durationMs" : {
    "addBatch" : 48,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 79,
    "walCommit" : 23
  },
  "eventTime" : {
    "avg" : "2017-08-11T07:04:17.782Z",
    "max" : "2017-08-11T07:04:18.282Z",
    "min" : "2017-08-11T07:04:17.282Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
  },

...

17/08/11 09:04:30 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:30.003Z",
  "batchId" : 2,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 1.000100010001,
  "processedRowsPerSecond" : 56.17977528089888,
  "durationMs" : {
    "addBatch" : 147,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 178,
    "walCommit" : 22
  },
  "eventTime" : {
    "avg" : "2017-08-11T07:04:23.782Z",
    "max" : "2017-08-11T07:04:28.282Z",
    "min" : "2017-08-11T07:04:19.282Z",
    "watermark" : "2017-08-11T07:04:08.282Z"
  },

[1] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L538
[2] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L257

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: [SS] watermark, eventTime and "StreamExecution: Streaming query made progress"

Posted by Michael Armbrust <mi...@databricks.com>.
The point here is to tell you what watermark value was used when executing
this batch.  You don't know the new watermark until the batch is over and
we don't want to do two passes over the data.  In general the semantics of
the watermark are designed to be conservative (i.e. just because data is
older than the watermark does not mean it will be dropped, but data will
never be dropped until after it is below the watermark).

On Fri, Aug 11, 2017 at 12:23 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> I'm curious why watermark is updated the next streaming batch after
> it's been observed [1]? The report (from
> ProgressReporter/StreamExecution) does not look right to me as
> avg/max/min are already calculated according to the watermark [2]
>
> My recommendation would be to do the update [2] in the same streaming
> batch it was observed. Why not? Please enlighten.
>
> 17/08/11 09:04:20 INFO StreamExecution: Streaming query made progress: {
>   "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
>   "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
>   "name" : "rates-to-console",
>   "timestamp" : "2017-08-11T07:04:20.004Z",
>   "batchId" : 1,
>   "numInputRows" : 2,
>   "inputRowsPerSecond" : 0.7601672367920943,
>   "processedRowsPerSecond" : 25.31645569620253,
>   "durationMs" : {
>     "addBatch" : 48,
>     "getBatch" : 6,
>     "getOffset" : 0,
>     "queryPlanning" : 1,
>     "triggerExecution" : 79,
>     "walCommit" : 23
>   },
>   "eventTime" : {
>     "avg" : "2017-08-11T07:04:17.782Z",
>     "max" : "2017-08-11T07:04:18.282Z",
>     "min" : "2017-08-11T07:04:17.282Z",
>     "watermark" : "1970-01-01T00:00:00.000Z"
>   },
>
> ...
>
> 17/08/11 09:04:30 INFO StreamExecution: Streaming query made progress: {
>   "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
>   "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
>   "name" : "rates-to-console",
>   "timestamp" : "2017-08-11T07:04:30.003Z",
>   "batchId" : 2,
>   "numInputRows" : 10,
>   "inputRowsPerSecond" : 1.000100010001,
>   "processedRowsPerSecond" : 56.17977528089888,
>   "durationMs" : {
>     "addBatch" : 147,
>     "getBatch" : 6,
>     "getOffset" : 0,
>     "queryPlanning" : 1,
>     "triggerExecution" : 178,
>     "walCommit" : 22
>   },
>   "eventTime" : {
>     "avg" : "2017-08-11T07:04:23.782Z",
>     "max" : "2017-08-11T07:04:28.282Z",
>     "min" : "2017-08-11T07:04:19.282Z",
>     "watermark" : "2017-08-11T07:04:08.282Z"
>   },
>
> [1] https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/execution/streaming/
> StreamExecution.scala?utf8=%E2%9C%93#L538
> [2] https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/execution/streaming/
> ProgressReporter.scala#L257
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>