You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Magnus Nilsson <ma...@gmail.com> on 2018/11/21 12:21:58 UTC

Structured Streaming restart results in illegal state exception

Hello,

I'm evaluating Structured Streaming trying to understand how resilient the
pipeline is to failures. I ran a small test streaming data from an Azure
Event Hub using Azure Databricks saving the data into a parquet file on the
Databricks filesystem dbfs:/.

I did an unclean shutdown by cancelling the query. Then tried to restart
the query without changing any parameters. This lead to an aborted job due
to an illegal state exception.

"Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2
doesn't exist when compacting batch 9 (compactInterval: 10)"

Now I'm trying to reconcile how stream checkpointing works with the commit
logs in the _spark_metadata/ directory in the data output folder.

How does Spark know there ought to be a _spark_metadata/2 file? And why is
the missing file considered an illegal state. How is the commit metadata in
the dbfs:/ file system integrated with structured streaming checkpointing?
I haven't found any metadata that links a particular committed file (i.e.
where there's a corresponding log file in the _spark_metadata/ folder) to
what batch created it. As far as I know checkpointing and commit logs are
separate from the file stores commit metadata. Somewhere Spark needs to
track what files where created from what batch to be able to uphold exactly
once processing to file stores.

If it does one would think Spark could clean up the dirty writes in the
sink folder and restart the stream from the last good known offset. This is
what I had hoped would happen. No such luck though.

I want to start over from the last known good state and resume the stream
from there. Any input from the list on this issue is greatly appreciated.

Is there any good blog post or other documentation on the file sink
metadata handling in Spark? I've looked but only found synoptic
documentation and nothing that explains the handling in detail.

Thanks,

Magnus