You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Revin Chalil <rc...@expedia.com> on 2017/06/28 17:27:10 UTC

Structured Streaming Questions

I am using Structured Streaming with Spark 2.1 and have some basic questions.


*         Is there a way to automatically refresh the Hive Partitions when using Parquet Sink with Partition? My query looks like below


val queryCount = windowedCount
                  .withColumn("hive_partition_persist_date", $"persist_date_window_start".cast("date"))
                  .writeStream.format("parquet")
                  .partitionBy("hive_partition_persist_date")
                  .option("path", StatsDestination)
                  .option("checkpointLocation", CheckPointLocationStats)
                  .trigger(ProcessingTime(WindowDurationStats))
                  .outputMode("append")
                  .start()


I have an external Parquet table built on top of Destination Dir. Above query creates the Partition Dirs but the Hive partition metadata is not refreshed and I have to execute  ALTER TABLE .... RECOVER PARTITIONS, before querying the Hive table. With legacy Streaming, it was possible to use the spark.sql(hiveQL), where hiveQL can be any hive statements, config settings etc:. Would this kind of functionality be available in structured streaming?


*         The Query creates an additional dir "_spark_metadata<https://console.aws.amazon.com/s3/>" under the Destination dir and this causes the select statements against the Parquet table fail as it is expecting only the parquet files under the destination location. Is there a config to avoid the creation of this dir?

[cid:image002.jpg@01D2EFF9.184FE4F0]



*         Our use-case does not need to account for late-arriving records and so I have set the WaterMark as 0 seconds. Is this needed to flush out the data or is that a default setting or is this inappropriate?


*         In the "Append" mode, I have to push at least 3 batches to actually see the records written to the Destination, even with the watermark = 0 seconds setting. I understand that the statestore has to wait for watermark to output records but here watermark is 0 seconds. Curious to know what exactly is happening behind the scenes...



*         The "Trigger" duration and "Window" duration in our case are the same as we need to just get the count for every batch. Is a "Window" really needed in this scenario as I can logically get the batch count by just using count? I tried to just get the count from the batch and it said, aggregation cannot be done on streaming dataframe in the Append Mode.


*         In our current code, we use the DataBricks' Spark-Redshift library to write output to Redshift. Would this library be available in Structured Streaming? Is there a way to do this using the "ForEach"?


*         With Legacy streaming, we checkpoint the Kafka Offsets in ZooKeeper. Is using Structured Streaming's checkpointing resilient enough to handle all the failure-restart scenarios?



*         When would the spark 2.2 available for use? I see that the programming guide still says 2.1.1.


Thanks,
Revin

Re: Structured Streaming Questions

Posted by Tathagata Das <ta...@gmail.com>.
Answers inline.



On Wed, Jun 28, 2017 at 10:27 AM, Revin Chalil <rc...@expedia.com> wrote:

> I am using Structured Streaming with Spark 2.1 and have some basic
> questions.
>
>
>
> ·         Is there a way to automatically refresh the Hive Partitions
> when using Parquet Sink with Partition? My query looks like below
>
>
>
> *val *queryCount = windowedCount
>                   .withColumn("hive_partition_persist_date",
> $"persist_date_window_start".cast("date"))
>                   .writeStream.format("parquet")
>                   .partitionBy("hive_partition_persist_date")
>                   .option("path", StatsDestination)
>                   .option("checkpointLocation", CheckPointLocationStats)
>                   .trigger(*ProcessingTime*(WindowDurationStats))
>                   .outputMode("append")
>                   .start()
>
>
>
> I have an external Parquet table built on top of Destination Dir. Above
> query creates the Partition Dirs but the Hive partition metadata is not
> refreshed and I have to execute  ALTER TABLE …. RECOVER PARTITIONS,
> before querying the Hive table. With legacy Streaming, it was possible to
> use the spark.sql(hiveQL), where hiveQL can be any hive statements, config
> settings etc:. Would this kind of functionality be available in structured
> streaming?
>
>
>
> ·         The Query creates an additional dir “_spark_metadata
> <https://console.aws.amazon.com/s3/>” under the Destination dir and this
> causes the select statements against the Parquet table fail as it is
> expecting only the parquet files under the destination location. Is there a
> config to avoid the creation of this dir?
>
> The _spark_metadata directory hold the metadata information (a
write-ahead-log) of which files in the directory are the correct complete
files generated by the streaming query. This is how we actually get exactly
once guarantees when writing to a file system. So this directory is very
important. In fact, if you want to run queries on this directory, Spark is
aware of this metadata and will read the correct files, and ignore
temporary/incomplete files that may have been generated by failed/duplicate
tasks. Querying that directory from Hive may lead to duplicate data as it
will read those temp/duplicate files as well.

If you want the data to be available to Hive, you could periodically run a
Spark job to read the files from the directory and write out to a hive
table.


>
>
> ·         Our use-case does not need to account for late-arriving records
> and so I have set the WaterMark as 0 seconds. Is this needed to flush out
> the data or is that a default setting or is this inappropriate?
>
> This should be fine.


>
>
> ·         In the “Append” mode, I have to push at least 3 batches to
> actually see the records written to the Destination, even with the
> watermark = 0 seconds setting. I understand that the statestore has to wait
> for watermark to output records but here watermark is 0 seconds. Curious to
> know what exactly is happening behind the scenes…
>
> Probably this is what is going on
1st batch - No estimate of max event-time to begin with, watermark value
not set
2nd batch - Watermark is set based on the max event-time seen in 1st batch.
Say it is W. Windows (i.e. [start,end]) for the earlier data received in
first batch is such that window.start < W < window.end. So the window is
still open and not finalized.
3rd batch - Watermark updated to W1 such that earliestWindow.end < W, and
therefore the corresponding W is finalized.

In general, in aggregation + watermark + append mode, you have to wait at
least (window duration + watermark gap) before the earliest window expires
and emits the finalized aggregates. This is visually shown here.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking


>
> ·         The “Trigger” duration and “Window” duration in our case are
> the same as we need to just get the count for every batch. Is a “Window”
> really needed in this scenario as I can logically get the batch count by
> just using count? I tried to just get the count from the batch and it said,
> aggregation cannot be done on streaming dataframe in the Append Mode.
>
count() in a streaming DataFrame is not a batch count but a count on all
the data in the stream. As this data arrives, this count is updated by the
streaming query, and depending on the output mode, the updated/finalized
values are written out by the sink.

In general, the whole design of Structured Streaming is that you should
specify your query as if you are doing on a table, and the execution model
(batches, or otherwise) should not factor in into the query semantics. For
example, when writing SQL queries in MySQL, we only care about the query
semantics, and dont care about how it is going to be executed. So the
concept of "batch" does not exist in the streaming DataFrame APIs, and
queries written in this API will be executable either in a micro-batch
engine, or a continuous-processing engine
<https://issues.apache.org/jira/browse/SPARK-20928>.

Hence, the real question here is why do you need the batch count?


>
>
> ·         In our current code, we use the DataBricks’ Spark-Redshift
> library to write output to Redshift. Would this library be available in
> Structured Streaming? Is there a way to do this using the “ForEach”?
>
>
>
This is not yet available from Structured Streaming yet.


> ·         With Legacy streaming, we checkpoint the Kafka Offsets in
> ZooKeeper. Is using Structured Streaming’s checkpointing resilient enough
> to handle all the failure-restart scenarios?
>
YES. One of the biggest improvement in the checkpointing compared to
DStream is this. Only the offsets are saved in the provided checkpoint
directory, not the binary. Hence you can change the code (within
constraints) and restart. E.g. you can add a filter, and restart. But you
cannot add an aggregation and restart successfully as it does not make
semantic sense to change the query that much and "restart"

> ·         When would the spark 2.2 available for use? I see that the
> programming guide still says 2.1.1.
>
We are voting on 2.2.0 RC5 right now. So "very soon". :)


>
>
> Thanks,
>
> Revin
>