You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by William Briggs <wr...@gmail.com> on 2018/01/04 17:08:16 UTC

Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The
job sources data from a Kafka topic, performs a variety of filters and
transformations, and sinks data back into a different Kafka topic.

Once per day, we stop the query in order to merge the namenode edit logs
with the fsimage, because Structured Streaming creates and destroys a
significant number of HDFS files, and EMR doesn't support a secondary or HA
namenode for fsimage compaction (AWS support directed us to do this, as
Namenode edit logs were filling the disk).

Occasionally, the Structured Streaming query will not restart because the
most recent file in the "commits" or "offsets" checkpoint subdirectory is
empty. This seems like an undesirable behavior, as it requires manual
intervention to remove the empty files in order to force the job to fall
back onto the last good values. Has anyone run into this behavior? The only
similar issue I can find is SPARK-21760
<https://issues.apache.org/jira/browse/SPARK-21760>, which appears to have
no fix or workaround.

Any assistance would be greatly appreciated!

Regards,
Will

Re: Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
The root cause is probably that HDFSMetadataLog ignores exceptions thrown
by "output.close". I think this should be fixed by this line in Spark 2.2.1
and 3.0.0:
https://github.com/apache/spark/commit/6edfff055caea81dc3a98a6b4081313a0c0b0729#diff-aaeb546880508bb771df502318c40a99L126

Could you try 2.2.1?

On Thu, Jan 4, 2018 at 9:08 AM, William Briggs <wr...@gmail.com> wrote:

> I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The
> job sources data from a Kafka topic, performs a variety of filters and
> transformations, and sinks data back into a different Kafka topic.
>
> Once per day, we stop the query in order to merge the namenode edit logs
> with the fsimage, because Structured Streaming creates and destroys a
> significant number of HDFS files, and EMR doesn't support a secondary or HA
> namenode for fsimage compaction (AWS support directed us to do this, as
> Namenode edit logs were filling the disk).
>
> Occasionally, the Structured Streaming query will not restart because the
> most recent file in the "commits" or "offsets" checkpoint subdirectory is
> empty. This seems like an undesirable behavior, as it requires manual
> intervention to remove the empty files in order to force the job to fall
> back onto the last good values. Has anyone run into this behavior? The only
> similar issue I can find is SPARK-21760
> <https://issues.apache.org/jira/browse/SPARK-21760>, which appears to
> have no fix or workaround.
>
> Any assistance would be greatly appreciated!
>
> Regards,
> Will
>