You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Max Kießling (JIRA)" <ji...@apache.org> on 2018/11/16 13:23:00 UTC

[jira] [Commented] (SPARK-25799) DataSourceApiV2 scan reuse does not respect options

    [ https://issues.apache.org/jira/browse/SPARK-25799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689409#comment-16689409 ] 

Max Kießling commented on SPARK-25799:
--------------------------------------

Apparently this issues is already resolved with the new 2.4.0 release. Thanks!

> DataSourceApiV2 scan reuse does not respect options
> ---------------------------------------------------
>
>                 Key: SPARK-25799
>                 URL: https://issues.apache.org/jira/browse/SPARK-25799
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.3.1, 2.3.2
>            Reporter: Max Kießling
>            Priority: Major
>
> When creating a custom data source with the Data Source API V2 it seems that the computation of possible scan reuses is broken when the same data source is used but configured with different configuration options. In the case when both scans produce the same schema (which is always the case for count queries with column pruning enabled) the optimizer will reuse the scan produced by on of the data source instance for both branches of the query. 
> This can lead to wrong results if the configuration option somehow influences the returned data.
> The behavior can be reproduced with the following example:
> {code:scala}
> import org.apache.spark.sql.sources.v2.reader._
> import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
> import org.apache.spark.sql.types.StructType
> import org.apache.spark.sql.{Row, SparkSession}
> import scala.tools.nsc.interpreter.JList
> class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
>   class Reader(rowCount: Int) extends DataSourceReader
>     with SupportsPushDownRequiredColumns {
>     var requiredSchema = new StructType().add("i", "int").add("j", "int")
>     override def pruneColumns(requiredSchema: StructType): Unit = {
>       this.requiredSchema = requiredSchema
>     }
>     override def readSchema(): StructType = {
>       requiredSchema
>     }
>     override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
>       val res = new java.util.ArrayList[DataReaderFactory[Row]]
>       res.add(new AdvancedDataReaderFactory(0, 5, requiredSchema))
>       res.add(new AdvancedDataReaderFactory(5, rowCount, requiredSchema))
>       res
>     }
>   }
>   override def createReader(options: DataSourceOptions): DataSourceReader =
>     new Reader(options.get("rows").orElse("10").toInt)
> }
> class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: StructType)
>   extends DataReaderFactory[Row] with DataReader[Row] {
>   private var current = start - 1
>   override def createDataReader(): DataReader[Row] = {
>     new AdvancedDataReaderFactory(start, end, requiredSchema)
>   }
>   override def close(): Unit = {}
>   override def next(): Boolean = {
>     current += 1
>     current < end
>   }
>   override def get(): Row = {
>     val values = requiredSchema.map(_.name).map {
>       case "i" => current
>       case "j" => -current
>     }
>     Row.fromSeq(values)
>   }
> }
> object DataSourceTest extends App {
>   val spark = SparkSession.builder().master("local[*]").getOrCreate()
>   val cls = classOf[AdvancedDataSourceV2]
>   val with100 = spark.read.format(cls.getName).option("rows", 100).load()
>   val with10 = spark.read.format(cls.getName).option("rows", 10).load()
>   assert(with100.union(with10).count == 110)
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org