You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sergey Oboguev <ob...@gmail.com> on 2020/09/23 20:47:22 UTC
Spark watermarked aggregation query and append output mode
Hi,
I am trying to aggregate Spark time-stamped structured stream to get
per-device (source) averages for every second of incoming data.
dataset.printSchema(); // see the output below
Dataset<Row> ds1 = dataset
.withWatermark("timestamp", "1 second")
.groupBy(
functions.window(dataset.col("timestamp"), "1 second", "1 second"),
dataset.col("source"))
.agg(
functions.avg("D0").as("AVG_D0"),
functions.avg("I0").as("AVG_I0"))
.orderBy("window");
StreamingQuery query = ds1.writeStream()
.outputMode(OutputMode.Append())
.format("console")
.option("truncate", "false")
.option("numRows", Integer.MAX_VALUE)
.start();
query.awaitTermination();
I am using Spark 2.4.6.
According to
https://spark.apache.org/docs/2.4.6/structured-streaming-programming-guide.html#output-modes
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
the above construct should work fine.
Yet I am getting an exception in the query start():
11:05:27.282 [main] ERROR my.sparkbench.example.Example - Exception
org.apache.spark.sql.AnalysisException: *Append output mode not
supported when there are streaming aggregations on streaming
DataFrames/DataSets without watermark*;;
Sort [window#44 ASC NULLS FIRST], true
+- Aggregate [window#71, source#0], [window#71 AS window#44, source#0,
avg(D0#12) AS AVG_D0#68, avg(I0#2L) AS AVG_I0#70]
+- Filter isnotnull(timestamp#1)
+- Project [named_struct(start,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(1000000 as double)))
as double) = (cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(1000000 as double)))
THEN (CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(1000000 as double))) +
cast(1 as bigint)) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
LongType) - 0) as double) / cast(1000000 as double))) END + cast(0 as
bigint)) - cast(1 as bigint)) * 1000000) + 0), LongType,
TimestampType), end, precisetimestampconversion((((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(1000000 as double)))
as double) = (cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(1000000 as double)))
THEN (CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(1000000 as double))) +
cast(1 as bigint)) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
LongType) - 0) as double) / cast(1000000 as double))) END + cast(0 as
bigint)) - cast(1 as bigint)) * 1000000) + 0) + 1000000), LongType,
TimestampType)) AS window#71, source#0, timestamp#1-T1000ms, I0#2L,
I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L,
D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21]
+- EventTimeWatermark timestamp#1: timestamp, interval 1 seconds
+- StreamingRelationV2
my.sparkbench.datastreamreader.MyStreamingSource@6897a4a,
my.sparkbench.datastreamreader.MyStreamingSource, [source#0,
timestamp#1, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L,
I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18,
D7#19, D8#20, D9#21]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:256)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
at my.sparkbench.example.Example.streamGroupByResult(Example.java:113)
at my.sparkbench.example.Example.exec_main(Example.java:76)
at my.sparkbench.example.Example.do_main(Example.java:42)
at my.sparkbench.example.Example.main(Example.java:34)
even though there is a watermark on the stream.
Schema printout looks fine:
root
|-- source: string (nullable = false)
|-- timestamp: timestamp (nullable = false)
|-- I0: long (nullable = false)
|-- I1: long (nullable = false)
|-- I2: long (nullable = false)
|-- I3: long (nullable = false)
|-- I4: long (nullable = false)
|-- I5: long (nullable = false)
|-- I6: long (nullable = false)
|-- I7: long (nullable = false)
|-- I8: long (nullable = false)
|-- I9: long (nullable = false)
|-- D0: double (nullable = false)
|-- D1: double (nullable = false)
|-- D2: double (nullable = false)
|-- D3: double (nullable = false)
|-- D4: double (nullable = false)
|-- D5: double (nullable = false)
|-- D6: double (nullable = false)
|-- D7: double (nullable = false)
|-- D8: double (nullable = false)
|-- D9: double (nullable = false)
Actual data looks fine too. If I feed it to
dataset.writeStream().format("console").option("truncate",
"false").outputMode(OutputMode.Append()).start();
then I am getting output
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
|source |timestamp |I0 |I1 |I2 |I3 |I4 |I5 |I6 |I7 |I8 |I9
|D0 |D1 |D2 |D3 |D4 |D5 |D6 |D7 |D8 |D9 |
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
|DEV-0001|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10
|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
only showing top 20 rows
and then follow-up batches of a similar look.
There is no exception if I use COMPLETE output mode, but then old results
(from the start of the timeline) are reported in every batch and that’s not
what I want. I want only new query result records to be reported. Thus I
want the APPEND mode – but it causes an exception.
Why is the exception and how can I make it work?
Tiny project that isolates the problem is here:
https://github.com/oboguev/SparkQuestion
Thanks for advice.
Re: Spark watermarked aggregation query and append output mode
Posted by Sergey Oboguev <ob...@gmail.com>.
Thanks!
It appears one should use not *dataset.col("timestamp")*
but rather* functions.col("timestamp").*
Re: Spark watermarked aggregation query and append output mode
Posted by German Schiavon <gs...@gmail.com>.
Hi,
try this :
dataset.printSchema(); // see the output below
Dataset<Row> ds1 = dataset
.withWatermark("timestamp", "1 second")
.groupBy(
functions.window(*col("timestamp")*, "1 second", "1 second"),
*col("source")*)
.agg(
functions.avg("D0").as("AVG_D0"),
functions.avg("I0").as("AVG_I0"))
.orderBy("window");
On Wed, 23 Sep 2020 at 22:51, Sergey Oboguev <ob...@gmail.com> wrote:
> Hi,
>
> I am trying to aggregate Spark time-stamped structured stream to get
> per-device (source) averages for every second of incoming data.
>
> dataset.printSchema(); // see the output below
>
> Dataset<Row> ds1 = dataset
> .withWatermark("timestamp", "1 second")
> .groupBy(
> functions.window(dataset.col("timestamp"), "1 second", "1 second"),
> dataset.col("source"))
> .agg(
> functions.avg("D0").as("AVG_D0"),
> functions.avg("I0").as("AVG_I0"))
> .orderBy("window");
>
> StreamingQuery query = ds1.writeStream()
> .outputMode(OutputMode.Append())
> .format("console")
> .option("truncate", "false")
> .option("numRows", Integer.MAX_VALUE)
> .start();
>
> query.awaitTermination();
>
> I am using Spark 2.4.6.
>
> According to
>
> https://spark.apache.org/docs/2.4.6/structured-streaming-programming-guide.html#output-modes
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
> the above construct should work fine.
>
> Yet I am getting an exception in the query start():
>
> 11:05:27.282 [main] ERROR my.sparkbench.example.Example - Exception
> org.apache.spark.sql.AnalysisException: *Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark*;;
> Sort [window#44 ASC NULLS FIRST], true
> +- Aggregate [window#71, source#0], [window#71 AS window#44, source#0, avg(D0#12) AS AVG_D0#68, avg(I0#2L) AS AVG_I0#70]
> +- Filter isnotnull(timestamp#1)
> +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 1000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 1000000) + 0) + 1000000), LongType, TimestampType)) AS window#71, source#0, timestamp#1-T1000ms, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21]
> +- EventTimeWatermark timestamp#1: timestamp, interval 1 seconds
> +- StreamingRelationV2 my.sparkbench.datastreamreader.MyStreamingSource@6897a4a, my.sparkbench.datastreamreader.MyStreamingSource, [source#0, timestamp#1, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21]
>
> at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
> at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111)
> at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:256)
> at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322)
> at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
> at my.sparkbench.example.Example.streamGroupByResult(Example.java:113)
> at my.sparkbench.example.Example.exec_main(Example.java:76)
> at my.sparkbench.example.Example.do_main(Example.java:42)
> at my.sparkbench.example.Example.main(Example.java:34)
>
> even though there is a watermark on the stream.
>
> Schema printout looks fine:
>
> root
> |-- source: string (nullable = false)
> |-- timestamp: timestamp (nullable = false)
> |-- I0: long (nullable = false)
> |-- I1: long (nullable = false)
> |-- I2: long (nullable = false)
> |-- I3: long (nullable = false)
> |-- I4: long (nullable = false)
> |-- I5: long (nullable = false)
> |-- I6: long (nullable = false)
> |-- I7: long (nullable = false)
> |-- I8: long (nullable = false)
> |-- I9: long (nullable = false)
> |-- D0: double (nullable = false)
> |-- D1: double (nullable = false)
> |-- D2: double (nullable = false)
> |-- D3: double (nullable = false)
> |-- D4: double (nullable = false)
> |-- D5: double (nullable = false)
> |-- D6: double (nullable = false)
> |-- D7: double (nullable = false)
> |-- D8: double (nullable = false)
> |-- D9: double (nullable = false)
>
> Actual data looks fine too. If I feed it to
>
> dataset.writeStream().format("console").option("truncate", "false").outputMode(OutputMode.Append()).start();
>
> then I am getting output
>
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
> |source |timestamp |I0 |I1 |I2 |I3 |I4 |I5 |I6 |I7 |I8 |I9 |D0 |D1 |D2 |D3 |D4 |D5 |D6 |D7 |D8 |D9 |
> +--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
> |DEV-0001|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0002|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0003|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0004|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0001|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0002|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0003|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0004|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0001|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0002|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0003|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0004|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0001|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0002|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0003|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0004|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0001|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0002|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0003|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> |DEV-0004|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
> +--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
> only showing top 20 rows
>
> and then follow-up batches of a similar look.
>
> There is no exception if I use COMPLETE output mode, but then old results
> (from the start of the timeline) are reported in every batch and that’s not
> what I want. I want only new query result records to be reported. Thus I
> want the APPEND mode – but it causes an exception.
>
> Why is the exception and how can I make it work?
>
> Tiny project that isolates the problem is here:
> https://github.com/oboguev/SparkQuestion
>
> Thanks for advice.
>