You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kaniska Mandal <ka...@gmail.com> on 2017/06/19 18:03:45 UTC

How save streaming aggregations on 'Structured Streams' in parquet format ?

Hi,

My goal is to ~
(1) either chain streaming aggregations in a single query OR
(2) run multiple streaming aggregations and save data in some meaningful
format to execute low latency / failsafe OLAP queries

So my first choice is parquet format , but I failed to make it work !

I am using spark-streaming_2.11-2.1.1

I am facing the following error -
org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets;

- for the following syntax

 StreamingQuery streamingQry = tagBasicAgg.writeStream()

              .format("parquet")

              .trigger(ProcessingTime.create("10 seconds"))

              .queryName("tagAggSummary")

              .outputMode("append")

              .option("checkpointLocation", "/tmp/summary/checkpoints/")

              .option("path", "/data/summary/tags/")

              .start();
But, parquet doesn't support 'complete' outputMode.

So is parquet supported only for batch queries , NOT for streaming queries
?

- note that console outputmode working fine !

Any help will be much appreciated.

Thanks
Kaniska

Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

Posted by kaniska Mandal <ka...@gmail.com>.
Thanks Tathagata for the pointer.

On Mon, Jun 19, 2017 at 8:24 PM, Tathagata Das <ta...@gmail.com>
wrote:

> That is not the write way to use watermark + append output mode. The
> `withWatermark` must be before the aggregation. Something like this.
>
> df.withWatermark("timestamp", "1 hour")
>   .groupBy(window("timestamp", "30 seconds"))
>   .agg(...)
>
> Read more here - https://databricks.com/blog/2017/05/08/event-time-
> aggregation-watermarking-apache-sparks-structured-streaming.html
>
>
> On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <ka...@gmail.com>
> wrote:
>
>> Hi Burak,
>>
>> Per your suggestion, I have specified
>> > deviceBasicAgg.withWatermark("eventtime", "30 seconds");
>> before invoking deviceBasicAgg.writeStream()...
>>
>> But I am still facing ~
>>
>> org.apache.spark.sql.AnalysisException: Append output mode not supported
>> when there are streaming aggregations on streaming DataFrames/DataSets;
>>
>> I am Ok with 'complete' output mode.
>>
>> =================================================
>>
>> I tried another approach - Creating parquet file from the in-memory
>> dataset ~ which seems to work.
>>
>> But I need only the delta, not the cumulative count. Since 'append' mode
>> not supporting streaming Aggregation, I have to use 'complete'
>> outputMode.
>>
>> StreamingQuery streamingQry = deviceBasicAgg.writeStream()
>>
>>               .format("memory")
>>
>>               .trigger(ProcessingTime.create("5 seconds"))
>>
>>               .queryName("deviceBasicAggSummary")
>>
>>               .outputMode("complete")
>>
>>               .option("checkpointLocation", "/tmp/parquet/checkpoints/")
>>
>>               .start();
>>
>> streamingQry.awaitTermination();
>>
>> Thread.sleep(5000);
>>
>> while(true) {
>>
>> Dataset<Row> deviceBasicAggSummaryData = spark.table("deviceBasicAggSum
>> mary");
>>
>> deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"
>> +new Date().getTime()+"/");
>>
>> }
>>
>> =================================================
>>
>> So whats the best practice for 'low latency query on distributed data'
>> using Spark SQL and Structured Streaming ?
>>
>>
>> Thanks
>>
>> Kaniska
>>
>>
>>
>> On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <br...@gmail.com> wrote:
>>
>>> Hi Kaniska,
>>>
>>> In order to use append mode with aggregations, you need to set an event
>>> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when
>>> to output an aggregation result as "final".
>>>
>>> Best,
>>> Burak
>>>
>>> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <
>>> kaniska.mandal@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> My goal is to ~
>>>> (1) either chain streaming aggregations in a single query OR
>>>> (2) run multiple streaming aggregations and save data in some
>>>> meaningful format to execute low latency / failsafe OLAP queries
>>>>
>>>> So my first choice is parquet format , but I failed to make it work !
>>>>
>>>> I am using spark-streaming_2.11-2.1.1
>>>>
>>>> I am facing the following error -
>>>> org.apache.spark.sql.AnalysisException: Append output mode not
>>>> supported when there are streaming aggregations on streaming
>>>> DataFrames/DataSets;
>>>>
>>>> - for the following syntax
>>>>
>>>>  StreamingQuery streamingQry = tagBasicAgg.writeStream()
>>>>
>>>>               .format("parquet")
>>>>
>>>>               .trigger(ProcessingTime.create("10 seconds"))
>>>>
>>>>               .queryName("tagAggSummary")
>>>>
>>>>               .outputMode("append")
>>>>
>>>>               .option("checkpointLocation", "/tmp/summary/checkpoints/"
>>>> )
>>>>
>>>>               .option("path", "/data/summary/tags/")
>>>>
>>>>               .start();
>>>> But, parquet doesn't support 'complete' outputMode.
>>>>
>>>> So is parquet supported only for batch queries , NOT for streaming
>>>> queries ?
>>>>
>>>> - note that console outputmode working fine !
>>>>
>>>> Any help will be much appreciated.
>>>>
>>>> Thanks
>>>> Kaniska
>>>>
>>>>
>>>
>>
>

Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

Posted by Felix Cheung <fe...@hotmail.com>.
And perhaps the error message can be improved here?

________________________________
From: Tathagata Das <ta...@gmail.com>
Sent: Monday, June 19, 2017 8:24:01 PM
To: kaniska Mandal
Cc: Burak Yavuz; user
Subject: Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

That is not the write way to use watermark + append output mode. The `withWatermark` must be before the aggregation. Something like this.

df.withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "30 seconds"))
  .agg(...)

Read more here - https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html


On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <ka...@gmail.com>> wrote:
Hi Burak,

Per your suggestion, I have specified
> deviceBasicAgg.withWatermark("eventtime", "30 seconds");
before invoking deviceBasicAgg.writeStream()...

But I am still facing ~

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;

I am Ok with 'complete' output mode.

=================================================

I tried another approach - Creating parquet file from the in-memory dataset ~ which seems to work.

But I need only the delta, not the cumulative count. Since 'append' mode not supporting streaming Aggregation, I have to use 'complete' outputMode.

StreamingQuery streamingQry = deviceBasicAgg.writeStream()

              .format("memory")

              .trigger(ProcessingTime.create("5 seconds"))

              .queryName("deviceBasicAggSummary")

              .outputMode("complete")

              .option("checkpointLocation", "/tmp/parquet/checkpoints/")

              .start();

streamingQry.awaitTermination();

Thread.sleep(5000);

while(true) {

Dataset<Row> deviceBasicAggSummaryData = spark.table("deviceBasicAggSummary");

deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+new Date().getTime()+"/");

}

=================================================

So whats the best practice for 'low latency query on distributed data' using Spark SQL and Structured Streaming ?


Thanks

Kaniska


On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <br...@gmail.com>> wrote:
Hi Kaniska,

In order to use append mode with aggregations, you need to set an event time watermark (using `withWatermark`). Otherwise, Spark doesn't know when to output an aggregation result as "final".

Best,
Burak

On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <ka...@gmail.com>> wrote:
Hi,

My goal is to ~
(1) either chain streaming aggregations in a single query OR
(2) run multiple streaming aggregations and save data in some meaningful format to execute low latency / failsafe OLAP queries

So my first choice is parquet format , but I failed to make it work !

I am using spark-streaming_2.11-2.1.1

I am facing the following error -
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;

- for the following syntax

 StreamingQuery streamingQry = tagBasicAgg.writeStream()

              .format("parquet")

              .trigger(ProcessingTime.create("10 seconds"))

              .queryName("tagAggSummary")

              .outputMode("append")

              .option("checkpointLocation", "/tmp/summary/checkpoints/")

              .option("path", "/data/summary/tags/")

              .start();

But, parquet doesn't support 'complete' outputMode.

So is parquet supported only for batch queries , NOT for streaming queries ?

- note that console outputmode working fine !

Any help will be much appreciated.

Thanks
Kaniska





Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

Posted by Tathagata Das <ta...@gmail.com>.
That is not the write way to use watermark + append output mode. The
`withWatermark` must be before the aggregation. Something like this.

df.withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "30 seconds"))
  .agg(...)

Read more here -
https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html


On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <ka...@gmail.com>
wrote:

> Hi Burak,
>
> Per your suggestion, I have specified
> > deviceBasicAgg.withWatermark("eventtime", "30 seconds");
> before invoking deviceBasicAgg.writeStream()...
>
> But I am still facing ~
>
> org.apache.spark.sql.AnalysisException: Append output mode not supported
> when there are streaming aggregations on streaming DataFrames/DataSets;
>
> I am Ok with 'complete' output mode.
>
> =================================================
>
> I tried another approach - Creating parquet file from the in-memory
> dataset ~ which seems to work.
>
> But I need only the delta, not the cumulative count. Since 'append' mode
> not supporting streaming Aggregation, I have to use 'complete' outputMode.
>
> StreamingQuery streamingQry = deviceBasicAgg.writeStream()
>
>               .format("memory")
>
>               .trigger(ProcessingTime.create("5 seconds"))
>
>               .queryName("deviceBasicAggSummary")
>
>               .outputMode("complete")
>
>               .option("checkpointLocation", "/tmp/parquet/checkpoints/")
>
>               .start();
>
> streamingQry.awaitTermination();
>
> Thread.sleep(5000);
>
> while(true) {
>
> Dataset<Row> deviceBasicAggSummaryData = spark.table("
> deviceBasicAggSummary");
>
> deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+
> new Date().getTime()+"/");
>
> }
>
> =================================================
>
> So whats the best practice for 'low latency query on distributed data'
> using Spark SQL and Structured Streaming ?
>
>
> Thanks
>
> Kaniska
>
>
>
> On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <br...@gmail.com> wrote:
>
>> Hi Kaniska,
>>
>> In order to use append mode with aggregations, you need to set an event
>> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when
>> to output an aggregation result as "final".
>>
>> Best,
>> Burak
>>
>> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <
>> kaniska.mandal@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> My goal is to ~
>>> (1) either chain streaming aggregations in a single query OR
>>> (2) run multiple streaming aggregations and save data in some meaningful
>>> format to execute low latency / failsafe OLAP queries
>>>
>>> So my first choice is parquet format , but I failed to make it work !
>>>
>>> I am using spark-streaming_2.11-2.1.1
>>>
>>> I am facing the following error -
>>> org.apache.spark.sql.AnalysisException: Append output mode not
>>> supported when there are streaming aggregations on streaming
>>> DataFrames/DataSets;
>>>
>>> - for the following syntax
>>>
>>>  StreamingQuery streamingQry = tagBasicAgg.writeStream()
>>>
>>>               .format("parquet")
>>>
>>>               .trigger(ProcessingTime.create("10 seconds"))
>>>
>>>               .queryName("tagAggSummary")
>>>
>>>               .outputMode("append")
>>>
>>>               .option("checkpointLocation", "/tmp/summary/checkpoints/")
>>>
>>>               .option("path", "/data/summary/tags/")
>>>
>>>               .start();
>>> But, parquet doesn't support 'complete' outputMode.
>>>
>>> So is parquet supported only for batch queries , NOT for streaming
>>> queries ?
>>>
>>> - note that console outputmode working fine !
>>>
>>> Any help will be much appreciated.
>>>
>>> Thanks
>>> Kaniska
>>>
>>>
>>
>

Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

Posted by kaniska Mandal <ka...@gmail.com>.
Hi Burak,

Per your suggestion, I have specified
> deviceBasicAgg.withWatermark("eventtime", "30 seconds");
before invoking deviceBasicAgg.writeStream()...

But I am still facing ~

org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets;

I am Ok with 'complete' output mode.

=================================================

I tried another approach - Creating parquet file from the in-memory dataset
~ which seems to work.

But I need only the delta, not the cumulative count. Since 'append' mode
not supporting streaming Aggregation, I have to use 'complete' outputMode.

StreamingQuery streamingQry = deviceBasicAgg.writeStream()

              .format("memory")

              .trigger(ProcessingTime.create("5 seconds"))

              .queryName("deviceBasicAggSummary")

              .outputMode("complete")

              .option("checkpointLocation", "/tmp/parquet/checkpoints/")

              .start();

streamingQry.awaitTermination();

Thread.sleep(5000);

while(true) {

Dataset<Row> deviceBasicAggSummaryData = spark.table("deviceBasicAggSummary"
);

deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+
new Date().getTime()+"/");

}

=================================================

So whats the best practice for 'low latency query on distributed data'
using Spark SQL and Structured Streaming ?


Thanks

Kaniska



On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <br...@gmail.com> wrote:

> Hi Kaniska,
>
> In order to use append mode with aggregations, you need to set an event
> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when
> to output an aggregation result as "final".
>
> Best,
> Burak
>
> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <kaniska.mandal@gmail.com
> > wrote:
>
>> Hi,
>>
>> My goal is to ~
>> (1) either chain streaming aggregations in a single query OR
>> (2) run multiple streaming aggregations and save data in some meaningful
>> format to execute low latency / failsafe OLAP queries
>>
>> So my first choice is parquet format , but I failed to make it work !
>>
>> I am using spark-streaming_2.11-2.1.1
>>
>> I am facing the following error -
>> org.apache.spark.sql.AnalysisException: Append output mode not supported
>> when there are streaming aggregations on streaming DataFrames/DataSets;
>>
>> - for the following syntax
>>
>>  StreamingQuery streamingQry = tagBasicAgg.writeStream()
>>
>>               .format("parquet")
>>
>>               .trigger(ProcessingTime.create("10 seconds"))
>>
>>               .queryName("tagAggSummary")
>>
>>               .outputMode("append")
>>
>>               .option("checkpointLocation", "/tmp/summary/checkpoints/")
>>
>>               .option("path", "/data/summary/tags/")
>>
>>               .start();
>> But, parquet doesn't support 'complete' outputMode.
>>
>> So is parquet supported only for batch queries , NOT for streaming
>> queries ?
>>
>> - note that console outputmode working fine !
>>
>> Any help will be much appreciated.
>>
>> Thanks
>> Kaniska
>>
>>
>

Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

Posted by Burak Yavuz <br...@gmail.com>.
Hi Kaniska,

In order to use append mode with aggregations, you need to set an event
time watermark (using `withWatermark`). Otherwise, Spark doesn't know when
to output an aggregation result as "final".

Best,
Burak

On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <ka...@gmail.com>
wrote:

> Hi,
>
> My goal is to ~
> (1) either chain streaming aggregations in a single query OR
> (2) run multiple streaming aggregations and save data in some meaningful
> format to execute low latency / failsafe OLAP queries
>
> So my first choice is parquet format , but I failed to make it work !
>
> I am using spark-streaming_2.11-2.1.1
>
> I am facing the following error -
> org.apache.spark.sql.AnalysisException: Append output mode not supported
> when there are streaming aggregations on streaming DataFrames/DataSets;
>
> - for the following syntax
>
>  StreamingQuery streamingQry = tagBasicAgg.writeStream()
>
>               .format("parquet")
>
>               .trigger(ProcessingTime.create("10 seconds"))
>
>               .queryName("tagAggSummary")
>
>               .outputMode("append")
>
>               .option("checkpointLocation", "/tmp/summary/checkpoints/")
>
>               .option("path", "/data/summary/tags/")
>
>               .start();
> But, parquet doesn't support 'complete' outputMode.
>
> So is parquet supported only for batch queries , NOT for streaming queries
> ?
>
> - note that console outputmode working fine !
>
> Any help will be much appreciated.
>
> Thanks
> Kaniska
>
>