You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Armbrust (JIRA)" <ji...@apache.org> on 2017/03/23 00:18:42 UTC

[jira] [Updated] (SPARK-19965) DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output

     [ https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Michael Armbrust updated SPARK-19965:
-------------------------------------
    Target Version/s: 2.2.0

> DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-19965
>                 URL: https://issues.apache.org/jira/browse/SPARK-19965
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Shixiong Zhu
>
> Reproducer
> {code}
>   test("partitioned writing and batch reading with 'basePath'") {
>     val inputData = MemoryStream[Int]
>     val ds = inputData.toDS()
>     val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
>     val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
>     var query: StreamingQuery = null
>     try {
>       query =
>         ds.map(i => (i, i * 1000))
>           .toDF("id", "value")
>           .writeStream
>           .partitionBy("id")
>           .option("checkpointLocation", checkpointDir)
>           .format("parquet")
>           .start(outputDir)
>       inputData.addData(1, 2, 3)
>       failAfter(streamingTimeout) {
>         query.processAllAvailable()
>       }
>       spark.read.option("basePath", outputDir).parquet(outputDir + "/*").show()
>     } finally {
>       if (query != null) {
>         query.stop()
>       }
>     }
>   }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** (3 seconds, 928 milliseconds)
> [info]   java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:
> [info] 	***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info] 	***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info] 
> [info] If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
> [info]   at scala.Predef$.assert(Predef.scala:170)
> [info]   at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info]   at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info]   at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info]   at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info]   at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info]   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info]   at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info]   at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org