You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ericl <gi...@git.apache.org> on 2016/07/17 22:20:16 UTC

[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

GitHub user ericl opened a pull request:

    https://github.com/apache/spark/pull/14241

    [SPARK-16596] [SQL] Refactor DataSourceScanExec to do partition discovery at execution instead of planning time

    ## What changes were proposed in this pull request?
    
    Partition discovery is rather expensive, so we should do it at execution time instead of during physical planning. Right now there is not much benefit since ListingFileCatalog will read scan for all partitions at planning time anyways, but this can be optimized in the future. Also, there might be more information for partition pruning not available at planning time.
    
    TODO: In another pr, move DataSourceScanExec to it's own file.
    
    ## How was this patch tested?
    
    Existing tests (it might be worth adding a test that catalog.listFiles() is delayed until execution, but this can be delayed until there is an actual benefit to doing so).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ericl/spark refactor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14241.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14241
    
----
commit d04636474cce217106c1b3bfb60b5da54a53f7e5
Author: Eric Liang <ek...@databricks.com>
Date:   2016-07-16T02:18:52Z

    Fri Jul 15 19:18:52 PDT 2016

commit 36d6ef44051a9ac57b9d6d1681aa9b11fa16d259
Author: Eric Liang <ek...@databricks.com>
Date:   2016-07-17T21:28:47Z

    Sun Jul 17 14:28:47 PDT 2016

commit 6c0eb0e05238e21c68b3e26c1efad01c2af3e5e8
Author: Eric Liang <ek...@databricks.com>
Date:   2016-07-17T21:29:46Z

    Sun Jul 17 14:29:46 PDT 2016

commit 1a4660286496663f6cb3414a22460e4fb24610b1
Author: Eric Liang <ek...@databricks.com>
Date:   2016-07-17T21:36:58Z

    Sun Jul 17 14:36:58 PDT 2016

commit 538233499efce05379110d7210a0cdc7e25b699e
Author: Eric Liang <ek...@databricks.com>
Date:   2016-07-17T21:42:32Z

    Sun Jul 17 14:42:32 PDT 2016

commit 98d6d74dde496b2256081e5840d56e91031e4db3
Author: Eric Liang <ek...@databricks.com>
Date:   2016-07-17T21:55:13Z

    Sun Jul 17 14:55:13 PDT 2016

commit 0d4642a3cef757666fbc72932d3eb78bbaeec530
Author: Eric Liang <ek...@databricks.com>
Date:   2016-07-17T22:12:24Z

    Sun Jul 17 15:12:24 PDT 2016

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #63139 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63139/consoleFull)** for PR 14241 at commit [`a76b432`](https://github.com/apache/spark/commit/a76b432602c1282a89599b8ac19fb15d0ec5f507).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62845/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72430345
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    --- End diff --
    
    Note to self: Comes from DataSourceScanExec.create.
    
    You could also write:
    `relation.bucketSpec.filter(_ => relation.sparkSession.sessionState.conf.bucketingEnabled)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72015104
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning = {
    --- End diff --
    
    put the return type here (we automatically check for defs but unfortunately not checking for vals yet)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r73226713
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from HadoopFsRelations.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.catalogString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    --- End diff --
    
    inputRDDs() may be called multiple times, should we cache the created RDD?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72430434
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.simpleString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    --- End diff --
    
    Note to Self: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62845 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62845/consoleFull)** for PR 14241 at commit [`780fec5`](https://github.com/apache/spark/commit/780fec5b393ae61913bc48992675dbfc67a3841c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72679240
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72418556
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.simpleString,
    --- End diff --
    
    Should we be able to reconstruct the schema from this? If it is then we should use `catalogString`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62691 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62691/consoleFull)** for PR 14241 at commit [`a3d2c69`](https://github.com/apache/spark/commit/a3d2c698bb24b5ac589f8f6a3caebac4efdbe8c9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62438 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62438/consoleFull)** for PR 14241 at commit [`0d4642a`](https://github.com/apache/spark/commit/0d4642a3cef757666fbc72932d3eb78bbaeec530).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72431432
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    --- End diff --
    
    ...from HadoopFsRelations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72016877
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    --- End diff --
    
    can you add classdoc documenting what partitionFilters and dataFilters do? It's a little bit confusing because they are both filters, but have different types.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62624/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72427356
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.simpleString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    buildScan() :: Nil
    +  }
     
       private[sql] override lazy val metrics =
         Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
           "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    // in the case of fallback, this batched scan should never fail because of:
    -    // 1) only primitive types are supported
    -    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    -    WholeStageCodegenExec(this).execute()
    +    if (supportsBatch) {
    +      // in the case of fallback, this batched scan should never fail because of:
    +      // 1) only primitive types are supported
    +      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    +      WholeStageCodegenExec(this).execute()
    +    } else {
    +      val unsafeRows = {
    --- End diff --
    
    Is there are reasons why you have not moved this into the parent class? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72427246
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.simpleString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    buildScan() :: Nil
    +  }
     
       private[sql] override lazy val metrics =
         Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
           "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    // in the case of fallback, this batched scan should never fail because of:
    -    // 1) only primitive types are supported
    -    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    -    WholeStageCodegenExec(this).execute()
    +    if (supportsBatch) {
    +      // in the case of fallback, this batched scan should never fail because of:
    +      // 1) only primitive types are supported
    +      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    +      WholeStageCodegenExec(this).execute()
    +    } else {
    +      val unsafeRows = {
    +        val scan = buildScan()
    +        if (needsUnsafeRowConversion) {
    +          scan.mapPartitionsInternal { iter =>
    +            val proj = UnsafeProjection.create(schema)
    +            iter.map(proj)
    +          }
    +        } else {
    +          scan
    +        }
    +      }
    +      val numOutputRows = longMetric("numOutputRows")
    +      unsafeRows.map { r =>
    +        numOutputRows += 1
    +        r
    +      }
    +    }
       }
     
       override def simpleString: String = {
         val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
           key + ": " + StringUtils.abbreviate(value, 100)
         }
         val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
    -    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
    +    s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
       }
     
    -  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    -    rdd :: Nil
    -  }
    -
    -  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
    -    dataType: DataType, nullable: Boolean): ExprCode = {
    -    val javaType = ctx.javaType(dataType)
    -    val value = ctx.getValue(columnVar, dataType, ordinal)
    -    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
    -    val valueVar = ctx.freshName("value")
    -    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
    -    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
    -      s"""
    -        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
    -        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
    -      """
    -    } else {
    -      s"$javaType ${valueVar} = $value;"
    -    }).trim
    -    ExprCode(code, isNullVar, valueVar)
    +  override protected def doProduce(ctx: CodegenContext): String = {
    +    if (supportsBatch) {
    +      return doProduceVectorized(ctx)
    +    }
    +    val numOutputRows = metricTerm(ctx, "numOutputRows")
    --- End diff --
    
    Move the code below into the parent? `doProduceIterated`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62625/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/14241


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r71090902
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -205,17 +209,17 @@ private[sql] trait DataSourceScanExec extends LeafExecNode {
     /** Physical plan node for scanning data from a relation. */
     private[sql] case class RowDataSourceScanExec(
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    +    @transient buildScan: () => RDD[InternalRow],
         @transient relation: BaseRelation,
         override val outputPartitioning: Partitioning,
         override val metadata: Map[String, String],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
     
       private[sql] override lazy val metrics =
         Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
     
    -  val outputUnsafeRows = relation match {
    +  private val outputUnsafeRows = relation match {
    --- End diff --
    
    TODO: why do we need to output unsafe rows? It's weird since this operator has to inspect the relation implementation to decide whether to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    This doesn't actually give us a way to add additional filter constraints in the physical operator, does it?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72154191
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning = {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62619/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r71597567
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning = {
    --- End diff --
    
    Moved from `FileSourceStrategy`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62847 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62847/consoleFull)** for PR 14241 at commit [`ddb202e`](https://github.com/apache/spark/commit/ddb202e44cf072b2b150da32d71d5f3e4a1bcb9c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62845 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62845/consoleFull)** for PR 14241 at commit [`780fec5`](https://github.com/apache/spark/commit/780fec5b393ae61913bc48992675dbfc67a3841c).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62624 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62624/consoleFull)** for PR 14241 at commit [`bbf89a1`](https://github.com/apache/spark/commit/bbf89a1550204e9ffde89f34a52cf83f4d1c385b).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62623 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62623/consoleFull)** for PR 14241 at commit [`ebf2102`](https://github.com/apache/spark/commit/ebf2102d4c0f80367b51dad8af88a1edbb3978c9).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62627 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62627/consoleFull)** for PR 14241 at commit [`2d78051`](https://github.com/apache/spark/commit/2d780516052a663f33cd0ab682cfd970939d6863).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62847 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62847/consoleFull)** for PR 14241 at commit [`ddb202e`](https://github.com/apache/spark/commit/ddb202e44cf072b2b150da32d71d5f3e4a1bcb9c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62619 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62619/consoleFull)** for PR 14241 at commit [`b45e253`](https://github.com/apache/spark/commit/b45e25352812e52798938d3e358c4c4d6c2183bb).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72430492
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    --- End diff --
    
    Note to self: Comes from `FileSourceStrategy.apply`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62979/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72016917
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    --- End diff --
    
    BTW in order to make this more dynamic, we'd need to make these mutable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72679265
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
    @@ -358,11 +358,11 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
           df1.write.parquet(tableDir.getAbsolutePath)
     
           val agged = spark.table("bucketed_table").groupBy("i").count()
    -      val error = intercept[RuntimeException] {
    +      val error = intercept[Exception] {
    --- End diff --
    
    It's a nested exception, which is quite hard to match. The following assert checks for the right error message, which is the important bit I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    @hvanhovell Have you finished your round of review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62627/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r71597470
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -391,48 +477,190 @@ private[sql] case class BatchedDataSourceScanExec(
            |$scanTimeTotalNs = 0;
          """.stripMargin
       }
    -}
     
    -private[sql] object DataSourceScanExec {
    -  // Metadata keys
    -  val INPUT_PATHS = "InputPaths"
    -  val PUSHED_FILTERS = "PushedFilters"
    +  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
    --- End diff --
    
    All these functions below were moved verbatim.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62438/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r73230979
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from HadoopFsRelations.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.catalogString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    --- End diff --
    
    I looked at the callers and this seems ok (and the prior code made this assumption as well). I can change it if you think it is safer though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62627/consoleFull)** for PR 14241 at commit [`2d78051`](https://github.com/apache/spark/commit/2d780516052a663f33cd0ab682cfd970939d6863).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r71597508
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.simpleString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    buildScan() :: Nil
    +  }
     
       private[sql] override lazy val metrics =
         Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
           "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    // in the case of fallback, this batched scan should never fail because of:
    -    // 1) only primitive types are supported
    -    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    -    WholeStageCodegenExec(this).execute()
    +    if (supportsBatch) {
    +      // in the case of fallback, this batched scan should never fail because of:
    +      // 1) only primitive types are supported
    +      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    +      WholeStageCodegenExec(this).execute()
    +    } else {
    +      val unsafeRows = {
    --- End diff --
    
    Copied from `RowDataSourceExec`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #63139 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63139/consoleFull)** for PR 14241 at commit [`a76b432`](https://github.com/apache/spark/commit/a76b432602c1282a89599b8ac19fb15d0ec5f507).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable `
      * `case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic `
      * `case class SparkPartitionID() extends LeafExpression with Nondeterministic `
      * `case class AggregateExpression(`
      * `case class Least(children: Seq[Expression]) extends Expression `
      * `case class Greatest(children: Seq[Expression]) extends Expression `
      * `case class CurrentDatabase() extends LeafExpression with Unevaluable `
      * `class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow `
      * `class AbstractScalaRowIterator[T] extends Iterator[T] `
      * `  implicit class SchemaAttribute(f: StructField) `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72679259
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.simpleString,
    --- End diff --
    
    Good point. I believe this is used for equality checks, so a truncated string might be problematic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62847/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    You should be able to add those filter constraints in FileDataSourceStrategy. I don't think it matters too much whether that code is located within buildScan(), or in the operator itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r73231816
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from HadoopFsRelations.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.catalogString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    --- End diff --
    
    Because the `relation.location.listFiles(partitionFilters)` is expensive, it's better to just call it once. It's fine to be called multiple times in other places.
    
    At least in the testing code, I saw this is called twice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r73228065
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from HadoopFsRelations.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.catalogString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    buildScan() :: Nil
    +  }
     
       private[sql] override lazy val metrics =
         Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
           "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    // in the case of fallback, this batched scan should never fail because of:
    -    // 1) only primitive types are supported
    -    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    -    WholeStageCodegenExec(this).execute()
    +    if (supportsBatch) {
    +      // in the case of fallback, this batched scan should never fail because of:
    +      // 1) only primitive types are supported
    +      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    +      WholeStageCodegenExec(this).execute()
    +    } else {
    +      val unsafeRows = {
    +        val scan = buildScan()
    +        if (needsUnsafeRowConversion) {
    +          scan.mapPartitionsInternal { iter =>
    +            val proj = UnsafeProjection.create(schema)
    +            iter.map(proj)
    +          }
    +        } else {
    +          scan
    +        }
    +      }
    +      val numOutputRows = longMetric("numOutputRows")
    +      unsafeRows.map { r =>
    +        numOutputRows += 1
    +        r
    +      }
    +    }
       }
     
       override def simpleString: String = {
         val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
           key + ": " + StringUtils.abbreviate(value, 100)
         }
         val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
    -    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
    +    s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
    --- End diff --
    
    Is there a way I can know the FileSourceScanExec is in batch mode or not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62691/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r71597502
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.simpleString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    buildScan() :: Nil
    +  }
     
       private[sql] override lazy val metrics =
         Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
           "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    // in the case of fallback, this batched scan should never fail because of:
    -    // 1) only primitive types are supported
    -    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    -    WholeStageCodegenExec(this).execute()
    +    if (supportsBatch) {
    +      // in the case of fallback, this batched scan should never fail because of:
    +      // 1) only primitive types are supported
    +      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    +      WholeStageCodegenExec(this).execute()
    +    } else {
    +      val unsafeRows = {
    +        val scan = buildScan()
    +        if (needsUnsafeRowConversion) {
    +          scan.mapPartitionsInternal { iter =>
    +            val proj = UnsafeProjection.create(schema)
    +            iter.map(proj)
    +          }
    +        } else {
    +          scan
    +        }
    +      }
    +      val numOutputRows = longMetric("numOutputRows")
    +      unsafeRows.map { r =>
    +        numOutputRows += 1
    +        r
    +      }
    +    }
       }
     
       override def simpleString: String = {
         val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
           key + ": " + StringUtils.abbreviate(value, 100)
         }
         val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
    -    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
    +    s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
       }
     
    -  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    -    rdd :: Nil
    -  }
    -
    -  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
    -    dataType: DataType, nullable: Boolean): ExprCode = {
    -    val javaType = ctx.javaType(dataType)
    -    val value = ctx.getValue(columnVar, dataType, ordinal)
    -    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
    -    val valueVar = ctx.freshName("value")
    -    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
    -    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
    -      s"""
    -        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
    -        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
    -      """
    -    } else {
    -      s"$javaType ${valueVar} = $value;"
    -    }).trim
    -    ExprCode(code, isNullVar, valueVar)
    +  override protected def doProduce(ctx: CodegenContext): String = {
    --- End diff --
    
    Copied from `RowDataSourceExec`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62438 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62438/consoleFull)** for PR 14241 at commit [`0d4642a`](https://github.com/apache/spark/commit/0d4642a3cef757666fbc72932d3eb78bbaeec530).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #63141 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63141/consoleFull)** for PR 14241 at commit [`704511e`](https://github.com/apache/spark/commit/704511ebacd8438c52d7e6e9488db86dc481f7fc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62625 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62625/consoleFull)** for PR 14241 at commit [`358eb9f`](https://github.com/apache/spark/commit/358eb9f35e72be9cfdd2350221caa3912943a722).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62623/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72415515
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
    @@ -358,11 +358,11 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
           df1.write.parquet(tableDir.getAbsolutePath)
     
           val agged = spark.table("bucketed_table").groupBy("i").count()
    -      val error = intercept[RuntimeException] {
    +      val error = intercept[Exception] {
    --- End diff --
    
    NIT: we cannot catch the proper exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62691 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62691/consoleFull)** for PR 14241 at commit [`a3d2c69`](https://github.com/apache/spark/commit/a3d2c698bb24b5ac589f8f6a3caebac4efdbe8c9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merging this into master, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #63141 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63141/consoleFull)** for PR 14241 at commit [`704511e`](https://github.com/apache/spark/commit/704511ebacd8438c52d7e6e9488db86dc481f7fc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    @ericl I was talking with @marmbrus -- it'd be better to create an API in the physical scan operator that accepts a list of filters, and then do pruning there. That is to say, we also want to move all the pruning code from physical planning into the physical operators.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72745117
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    --- End diff --
    
    let's make sure we fix this one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62979 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62979/consoleFull)** for PR 14241 at commit [`18f5543`](https://github.com/apache/spark/commit/18f5543e6b7e56e093e07ec599fe48f3e305dc7b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72679252
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from files.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.simpleString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    buildScan() :: Nil
    +  }
     
       private[sql] override lazy val metrics =
         Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
           "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    // in the case of fallback, this batched scan should never fail because of:
    -    // 1) only primitive types are supported
    -    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    -    WholeStageCodegenExec(this).execute()
    +    if (supportsBatch) {
    +      // in the case of fallback, this batched scan should never fail because of:
    +      // 1) only primitive types are supported
    +      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    +      WholeStageCodegenExec(this).execute()
    +    } else {
    +      val unsafeRows = {
    +        val scan = buildScan()
    +        if (needsUnsafeRowConversion) {
    +          scan.mapPartitionsInternal { iter =>
    +            val proj = UnsafeProjection.create(schema)
    +            iter.map(proj)
    +          }
    +        } else {
    +          scan
    +        }
    +      }
    +      val numOutputRows = longMetric("numOutputRows")
    +      unsafeRows.map { r =>
    +        numOutputRows += 1
    +        r
    +      }
    +    }
       }
     
       override def simpleString: String = {
         val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
           key + ": " + StringUtils.abbreviate(value, 100)
         }
         val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
    -    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
    +    s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
       }
     
    -  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    -    rdd :: Nil
    -  }
    -
    -  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
    -    dataType: DataType, nullable: Boolean): ExprCode = {
    -    val javaType = ctx.javaType(dataType)
    -    val value = ctx.getValue(columnVar, dataType, ordinal)
    -    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
    -    val valueVar = ctx.freshName("value")
    -    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
    -    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
    -      s"""
    -        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
    -        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
    -      """
    -    } else {
    -      s"$javaType ${valueVar} = $value;"
    -    }).trim
    -    ExprCode(code, isNullVar, valueVar)
    +  override protected def doProduce(ctx: CodegenContext): String = {
    +    if (supportsBatch) {
    +      return doProduceVectorized(ctx)
    +    }
    +    val numOutputRows = metricTerm(ctx, "numOutputRows")
    --- End diff --
    
    While this would reduce the duplication some, it seems weird to me for DataSourceScan to have that as a sometimes used method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62623 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62623/consoleFull)** for PR 14241 at commit [`ebf2102`](https://github.com/apache/spark/commit/ebf2102d4c0f80367b51dad8af88a1edbb3978c9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r73261283
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from HadoopFsRelations.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.catalogString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    --- End diff --
    
    Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    This looks pretty good. I have left a few comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63139/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r73261265
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/**
    + * Physical plan node for scanning data from HadoopFsRelations.
    + *
    + * @param relation The file-based relation to scan.
    + * @param output Output attributes of the scan.
    + * @param outputSchema Output schema of the scan.
    + * @param partitionFilters Predicates to use for partition pruning.
    + * @param dataFilters Data source filters to use for filtering data within partitions.
    + * @param metastoreTableIdentifier
    + */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    +    dataFilters: Seq[Filter],
         override val metastoreTableIdentifier: Option[TableIdentifier])
    -  extends DataSourceScanExec with CodegenSupport {
    +  extends DataSourceScanExec {
    +
    +  val supportsBatch = relation.fileFormat.supportBatch(
    +    relation.sparkSession, StructType.fromAttributes(output))
    +
    +  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
    +    SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
    +  } else {
    +    false
    +  }
    +
    +  override val outputPartitioning: Partitioning = {
    +    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
    +      relation.bucketSpec
    +    } else {
    +      None
    +    }
    +    bucketSpec.map { spec =>
    +      val numBuckets = spec.numBuckets
    +      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +        output.find(_.name == n)
    +      }
    +      if (bucketColumns.size == spec.bucketColumnNames.size) {
    +        HashPartitioning(bucketColumns, numBuckets)
    +      } else {
    +        UnknownPartitioning(0)
    +      }
    +    }.getOrElse {
    +      UnknownPartitioning(0)
    +    }
    +  }
    +
    +  override val metadata: Map[String, String] = Map(
    +    "Format" -> relation.fileFormat.toString,
    +    "ReadSchema" -> outputSchema.catalogString,
    +    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
    +    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
    +
    +  private def buildScan(): RDD[InternalRow] = {
    +    val selectedPartitions = relation.location.listFiles(partitionFilters)
    +
    +    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    +      relation.fileFormat.buildReaderWithPartitionValues(
    +        sparkSession = relation.sparkSession,
    +        dataSchema = relation.dataSchema,
    +        partitionSchema = relation.partitionSchema,
    +        requiredSchema = outputSchema,
    +        filters = dataFilters,
    +        options = relation.options,
    +        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    +
    +    relation.bucketSpec match {
    +      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
    +        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    +      case _ =>
    +        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    +    }
    +  }
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    buildScan() :: Nil
    +  }
     
       private[sql] override lazy val metrics =
         Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
           "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    // in the case of fallback, this batched scan should never fail because of:
    -    // 1) only primitive types are supported
    -    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    -    WholeStageCodegenExec(this).execute()
    +    if (supportsBatch) {
    +      // in the case of fallback, this batched scan should never fail because of:
    +      // 1) only primitive types are supported
    +      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
    +      WholeStageCodegenExec(this).execute()
    +    } else {
    +      val unsafeRows = {
    +        val scan = buildScan()
    +        if (needsUnsafeRowConversion) {
    +          scan.mapPartitionsInternal { iter =>
    +            val proj = UnsafeProjection.create(schema)
    +            iter.map(proj)
    +          }
    +        } else {
    +          scan
    +        }
    +      }
    +      val numOutputRows = longMetric("numOutputRows")
    +      unsafeRows.map { r =>
    +        numOutputRows += 1
    +        r
    +      }
    +    }
       }
     
       override def simpleString: String = {
         val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
           key + ": " + StringUtils.abbreviate(value, 100)
         }
         val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
    -    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
    +    s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
    --- End diff --
    
    I added Batch to metadata.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62979 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62979/consoleFull)** for PR 14241 at commit [`18f5543`](https://github.com/apache/spark/commit/18f5543e6b7e56e093e07ec599fe48f3e305dc7b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec t...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14241#discussion_r72154195
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -275,62 +271,152 @@ private[sql] case class RowDataSourceScanExec(
            |}
          """.stripMargin
       }
    +
    +  // Ignore rdd when checking results
    +  override def sameResult(plan: SparkPlan): Boolean = plan match {
    +    case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
    +    case _ => false
    +  }
     }
     
    -/** Physical plan node for scanning data from a batched relation. */
    -private[sql] case class BatchedDataSourceScanExec(
    +/** Physical plan node for scanning data from files. */
    +private[sql] case class FileSourceScanExec(
    +    @transient relation: HadoopFsRelation,
         output: Seq[Attribute],
    -    rdd: RDD[InternalRow],
    -    @transient relation: BaseRelation,
    -    override val outputPartitioning: Partitioning,
    -    override val metadata: Map[String, String],
    +    outputSchema: StructType,
    +    partitionFilters: Seq[Expression],
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62624 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62624/consoleFull)** for PR 14241 at commit [`bbf89a1`](https://github.com/apache/spark/commit/bbf89a1550204e9ffde89f34a52cf83f4d1c385b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63141/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62625 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62625/consoleFull)** for PR 14241 at commit [`358eb9f`](https://github.com/apache/spark/commit/358eb9f35e72be9cfdd2350221caa3912943a722).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14241
  
    **[Test build #62619 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62619/consoleFull)** for PR 14241 at commit [`b45e253`](https://github.com/apache/spark/commit/b45e25352812e52798938d3e358c4c4d6c2183bb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org