You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Magnus Nilsson <ma...@kth.se> on 2018/11/21 09:02:48 UTC

Structured Streaming to file sink results in illegal state exception

I'm evaluating Structured Streaming trying to understand how resilient the
pipeline is. I ran a small test streaming data from an Azure Event Hub
using Azure Databricks saving the data into a parquet file on the
Databricks filesystem dbfs:/.

I did an unclean shutdown by cancelling the query. Then tried to restart
the query without changing any parameters. This lead to an illegal state
exception.

"Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2
doesn't exist when compacting batch 9 (compactInterval: 10)"

Now I'm trying to reconcile how checkpointing works with the commit logs in
the _spark_metadata/ directory in the data output folder.

There isn't any _spark_metadata/2 file, that is correct. How does Spark
know there ought to be one? The streaming query created 9 offset log files
and 8 commit log files in the checkpoint directory. The data folder's
_spark_metdata/ folder contains two files listing two files each, the data
directory itself contains 10 parquet files.

If I understand correctly on the input side this means that nine trigger
batches has been started, eight has been finished. On the output side 10
files have been started and four have been finished (commited). Six of them
are "uncommited", ie dirty or in progress writes as far as Spark is
concerned.

I have yet to find where the translation from batch to output files are
logged. If the pipeline is capable of exactly-once-delivery semantics to a
file store shouldn't the translation from batch per partition to resulting
commited file in the data folder be logged somewhere?

Ie in this scenario shouldn't Spark look up what batches are saved in the
commited output files, clean up the dirty writes, then replay the stream
from the last known good position?

I want to back to the last known good state and resume the stream from
there. Any input from the list is greatly appreciated.

Is there any good blog post or other documentation on the metadata handling
in Spark? I've looked but only found synoptic documentation.

Thanks,

Magnus