You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:03:15 UTC

[jira] [Updated] (SPARK-20325) Spark Structured Streaming documentation Update: checkpoint configuration

     [ https://issues.apache.org/jira/browse/SPARK-20325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-20325:
---------------------------------
    Labels: bulk-closed  (was: )

> Spark Structured Streaming documentation Update: checkpoint configuration
> -------------------------------------------------------------------------
>
>                 Key: SPARK-20325
>                 URL: https://issues.apache.org/jira/browse/SPARK-20325
>             Project: Spark
>          Issue Type: Documentation
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Kate Eri
>            Priority: Minor
>              Labels: bulk-closed
>
> I have configured the following stream outputting to Kafka:
> {code}
> map.foreach(metric => {
>       streamToProcess
>         .groupBy(metric)
>         .agg(count(metric))
>         .writeStream
>         .outputMode("complete")
>         .option("checkpointLocation", checkpointDir)
>         .foreach(kafkaWriter)
>         .start()
>     })
> {code}
> And configured the checkpoint Dir for each of output sinks like: .option("checkpointLocation", checkpointDir)  according to the documentation => http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing 
> As a result I've got the following exception: 
> Cannot start query with id bf6a1003-6252-4c62-8249-c6a189701255 as another query with same id is already active. Perhaps you are attempting to restart a query from checkpoint that is already active.
> java.lang.IllegalStateException: Cannot start query with id bf6a1003-6252-4c62-8249-c6a189701255 as another query with same id is already active. Perhaps you are attempting to restart a query from checkpoint that is already active.
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:291)
> So according to current spark logic for “foreach” sink the checkpoint configuration is loaded in the following way: 
> {code:title=StreamingQueryManager.scala}
>    val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
>       new Path(userSpecified).toUri.toString
>     }.orElse {
>       df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
>         new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
>       }
>     }.getOrElse {
>       if (useTempCheckpointLocation) {
>         Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
>       } else {
>         throw new AnalysisException(
>           "checkpointLocation must be specified either " +
>             """through option("checkpointLocation", ...) or """ +
>             s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
>       }
>     }
> {code}
> so first spark take checkpointDir from query, then from sparksession (spark.sql.streaming.checkpointLocation) and so on. 
> But this behavior was not documented, thus two questions:
> 1) could we update documentation for Structured Streaming and describe this behavior
> 2) Do we really need to specify the checkpoint dir per query? what the reason for this? finally we will be forced to write some checkpointDir name generator, for example associate it with some particular named query and so on?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org