You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/12 07:59:46 UTC

spark git commit: [SPARK-14535][SQL] Remove buildInternalScan from FileFormat

Repository: spark
Updated Branches:
  refs/heads/master 52a801124 -> 678b96e77


[SPARK-14535][SQL] Remove buildInternalScan from FileFormat

## What changes were proposed in this pull request?

Now `HadoopFsRelation` with all kinds of file formats can be handled in `FileSourceStrategy`, we can remove the branches for  `HadoopFsRelation` in `FileSourceStrategy` and the `buildInternalScan` API from `FileFormat`.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #12300 from cloud-fan/remove.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/678b96e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/678b96e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/678b96e7

Branch: refs/heads/master
Commit: 678b96e77bf77a64b8df14b19db5a3bb18febfe3
Parents: 52a8011
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Apr 11 22:59:42 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Apr 11 22:59:42 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/source/libsvm/LibSVMRelation.scala |  34 +-
 .../datasources/DataSourceStrategy.scala        | 390 -------------------
 .../datasources/FileSourceStrategy.scala        |  10 +-
 .../datasources/csv/DefaultSource.scala         |  31 --
 .../datasources/json/JSONRelation.scala         |  29 --
 .../datasources/parquet/ParquetRelation.scala   | 110 +-----
 .../datasources/text/DefaultSource.scala        |  39 --
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 -
 .../apache/spark/sql/sources/interfaces.scala   |  10 -
 .../datasources/FileSourceStrategySuite.scala   |  12 -
 .../apache/spark/sql/hive/orc/OrcRelation.scala |  13 -
 11 files changed, 5 insertions(+), 681 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 2e9b6be..4737b6f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -178,39 +178,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     }
   }
 
-  override def buildInternalScan(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      inputFiles: Seq[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration],
-      options: Map[String, String]): RDD[InternalRow] = {
-    // TODO: This does not handle cases where column pruning has been performed.
-
-    verifySchema(dataSchema)
-    val dataFiles = inputFiles.filterNot(_.getPath.getName startsWith "_")
-
-    val path = if (dataFiles.length == 1) dataFiles.head.getPath.toUri.toString
-    else if (dataFiles.isEmpty) throw new IOException("No input path specified for libsvm data")
-    else throw new IOException("Multiple input paths are not supported for libsvm data.")
-
-    val numFeatures = options.getOrElse("numFeatures", "-1").toInt
-    val vectorType = options.getOrElse("vectorType", "sparse")
-
-    val sc = sqlContext.sparkContext
-    val baseRdd = MLUtils.loadLibSVMFile(sc, path, numFeatures)
-    val sparse = vectorType == "sparse"
-    baseRdd.map { pt =>
-      val features = if (sparse) pt.features.toSparse else pt.features.toDense
-      Row(pt.label, features)
-    }.mapPartitions { externalRows =>
-      val converter = RowEncoder(dataSchema)
-      externalRows.map(converter.toRow)
-    }
-  }
-
   override def buildReader(
       sqlContext: SQLContext,
       dataSchema: StructType,
@@ -218,6 +185,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+    verifySchema(dataSchema)
     val numFeatures = options("numFeatures").toInt
     assert(numFeatures > 0)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8c18331..c3885a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -110,133 +110,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         filters,
         (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
 
-    // Scanning partitioned HadoopFsRelation
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _))
-        if t.partitionSchema.nonEmpty =>
-      // We divide the filter expressions into 3 parts
-      val partitionColumns = AttributeSet(
-        t.partitionSchema.map(c => l.output.find(_.name == c.name).get))
-
-      // Only pruning the partition keys
-      val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns))
-
-      // Only pushes down predicates that do not reference partition keys.
-      val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
-
-      // Predicates with both partition keys and attributes
-      val partitionAndNormalColumnFilters =
-        filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
-
-      val selectedPartitions = t.location.listFiles(partitionFilters)
-
-      logInfo {
-        val total = t.partitionSpec.partitions.length
-        val selected = selectedPartitions.length
-        val percentPruned = (1 - selected.toDouble / total.toDouble) * 100
-        s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
-      }
-
-      // need to add projections from "partitionAndNormalColumnAttrs" in if it is not empty
-      val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters)
-      val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) {
-        projects
-      } else {
-        (partitionAndNormalColumnAttrs ++ projects).toSeq
-      }
-
-      // Prune the buckets based on the pushed filters that do not contain partitioning key
-      // since the bucketing key is not allowed to use the columns in partitioning key
-      val bucketSet = getBuckets(pushedFilters, t.bucketSpec)
-      val scan = buildPartitionedTableScan(
-        l,
-        partitionAndNormalColumnProjs,
-        pushedFilters,
-        bucketSet,
-        t.partitionSpec.partitionColumns,
-        selectedPartitions,
-        t.options)
-
-      // Add a Projection to guarantee the original projection:
-      // this is because "partitionAndNormalColumnAttrs" may be different
-      // from the original "projects", in elements or their ordering
-
-      partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf =>
-        if (projects.isEmpty || projects == partitionAndNormalColumnProjs) {
-          // if the original projection is empty, no need for the additional Project either
-          execution.Filter(cf, scan)
-        } else {
-          execution.Project(projects, execution.Filter(cf, scan))
-        }
-      ).getOrElse(scan) :: Nil
-
-    // TODO: The code for planning bucketed/unbucketed/partitioned/unpartitioned tables contains
-    // a lot of duplication and produces overly complicated RDDs.
-
-    // Scanning non-partitioned HadoopFsRelation
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) =>
-      // See buildPartitionedTableScan for the reason that we need to create a shard
-      // broadcast HadoopConf.
-      val sharedHadoopConf = SparkHadoopUtil.get.conf
-      val confBroadcast =
-        t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
-
-      t.bucketSpec match {
-        case Some(spec) if t.sqlContext.conf.bucketingEnabled =>
-          val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = {
-            (requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
-              val bucketed =
-                t.location
-                  .allFiles()
-                  .filterNot(_.getPath.getName startsWith "_")
-                  .groupBy { f =>
-                    BucketingUtils
-                      .getBucketId(f.getPath.getName)
-                      .getOrElse(sys.error(s"Invalid bucket file ${f.getPath}"))
-                  }
-
-              val bucketedDataMap = bucketed.mapValues { bucketFiles =>
-                t.fileFormat.buildInternalScan(
-                  t.sqlContext,
-                  t.dataSchema,
-                  requiredColumns.map(_.name).toArray,
-                  filters,
-                  None,
-                  bucketFiles,
-                  confBroadcast,
-                  t.options).coalesce(1)
-              }
-
-              val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext,
-                (0 until spec.numBuckets).map { bucketId =>
-                  bucketedDataMap.getOrElse(bucketId, t.sqlContext.emptyResult: RDD[InternalRow])
-                })
-              bucketedRDD
-            }
-          }
-
-          pruneFilterProject(
-            l,
-            projects,
-            filters,
-            scanBuilder) :: Nil
-
-        case _ =>
-          pruneFilterProject(
-            l,
-            projects,
-            filters,
-            (a, f) =>
-              t.fileFormat.buildInternalScan(
-                t.sqlContext,
-                t.dataSchema,
-                a.map(_.name).toArray,
-                f,
-                None,
-                t.location.allFiles(),
-                confBroadcast,
-                t.options)) :: Nil
-      }
-
     case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
       execution.DataSourceScan.create(
         l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
@@ -248,218 +121,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     case _ => Nil
   }
 
-  private def buildPartitionedTableScan(
-      logicalRelation: LogicalRelation,
-      projections: Seq[NamedExpression],
-      filters: Seq[Expression],
-      buckets: Option[BitSet],
-      partitionColumns: StructType,
-      partitions: Seq[Partition],
-      options: Map[String, String]): SparkPlan = {
-    val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
-
-    // Because we are creating one RDD per partition, we need to have a shared HadoopConf.
-    // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
-    val sharedHadoopConf = SparkHadoopUtil.get.conf
-    val confBroadcast =
-      relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
-    val partitionColumnNames = partitionColumns.fieldNames.toSet
-
-    // Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
-    // will union all partitions and attach partition values if needed.
-    val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = {
-      (requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
-
-        relation.bucketSpec match {
-          case Some(spec) if relation.sqlContext.conf.bucketingEnabled =>
-            val requiredDataColumns =
-              requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))
-
-            // Builds RDD[Row]s for each selected partition.
-            val perPartitionRows: Seq[(Int, RDD[InternalRow])] = partitions.flatMap {
-              case Partition(partitionValues, files) =>
-                val bucketed = files.groupBy { f =>
-                  BucketingUtils
-                    .getBucketId(f.getPath.getName)
-                    .getOrElse(sys.error(s"Invalid bucket file ${f.getPath}"))
-                }
-
-                bucketed.map { bucketFiles =>
-                  // Don't scan any partition columns to save I/O.  Here we are being optimistic and
-                  // assuming partition columns data stored in data files are always consistent with
-                  // those partition values encoded in partition directory paths.
-                  val dataRows = relation.fileFormat.buildInternalScan(
-                    relation.sqlContext,
-                    relation.dataSchema,
-                    requiredDataColumns.map(_.name).toArray,
-                    filters,
-                    buckets,
-                    bucketFiles._2,
-                    confBroadcast,
-                    options)
-
-                  // Merges data values with partition values.
-                  bucketFiles._1 -> mergeWithPartitionValues(
-                    requiredColumns,
-                    requiredDataColumns,
-                    partitionColumns,
-                    partitionValues,
-                    dataRows)
-                }
-            }
-
-            val bucketedDataMap: Map[Int, Seq[RDD[InternalRow]]] =
-              perPartitionRows.groupBy(_._1).mapValues(_.map(_._2))
-
-            val bucketed = new UnionRDD(relation.sqlContext.sparkContext,
-              (0 until spec.numBuckets).map { bucketId =>
-                bucketedDataMap.get(bucketId).map(i => i.reduce(_ ++ _).coalesce(1)).getOrElse {
-                  relation.sqlContext.emptyResult: RDD[InternalRow]
-                }
-              })
-            bucketed
-
-          case _ =>
-            val requiredDataColumns =
-              requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))
-
-            // Builds RDD[Row]s for each selected partition.
-            val perPartitionRows = partitions.map {
-              case Partition(partitionValues, files) =>
-                val dataRows = relation.fileFormat.buildInternalScan(
-                  relation.sqlContext,
-                  relation.dataSchema,
-                  requiredDataColumns.map(_.name).toArray,
-                  filters,
-                  buckets,
-                  files,
-                  confBroadcast,
-                  options)
-
-                // Merges data values with partition values.
-                mergeWithPartitionValues(
-                  requiredColumns,
-                  requiredDataColumns,
-                  partitionColumns,
-                  partitionValues,
-                  dataRows)
-            }
-            new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
-        }
-      }
-    }
-
-    // Create the scan operator. If needed, add Filter and/or Project on top of the scan.
-    // The added Filter/Project is on top of the unioned RDD. We do not want to create
-    // one Filter/Project for every partition.
-    val sparkPlan = pruneFilterProject(
-      logicalRelation,
-      projections,
-      filters,
-      scanBuilder)
-
-    sparkPlan
-  }
-
-  /**
-   * Creates a ColumnarBatch that contains the values for `requiredColumns`. These columns can
-   * either come from `input` (columns scanned from the data source) or from the partitioning
-   * values (data from `partitionValues`). This is done *once* per physical partition. When
-   * the column is from `input`, it just references the same underlying column. When using
-   * partition columns, the column is populated once.
-   * TODO: there's probably a cleaner way to do this.
-   */
-  private def projectedColumnBatch(
-      input: ColumnarBatch,
-      requiredColumns: Seq[Attribute],
-      dataColumns: Seq[Attribute],
-      partitionColumnSchema: StructType,
-      partitionValues: InternalRow) : ColumnarBatch = {
-    val result = ColumnarBatch.allocate(StructType.fromAttributes(requiredColumns))
-    var resultIdx = 0
-    var inputIdx = 0
-
-    while (resultIdx < requiredColumns.length) {
-      val attr = requiredColumns(resultIdx)
-      if (inputIdx < dataColumns.length && requiredColumns(resultIdx) == dataColumns(inputIdx)) {
-        result.setColumn(resultIdx, input.column(inputIdx))
-        inputIdx += 1
-      } else {
-        require(partitionColumnSchema.fields.count(_.name == attr.name) == 1)
-        var partitionIdx = 0
-        partitionColumnSchema.fields.foreach { f => {
-          if (f.name.equals(attr.name)) {
-            ColumnVectorUtils.populate(result.column(resultIdx), partitionValues, partitionIdx)
-          }
-          partitionIdx += 1
-        }}
-      }
-      resultIdx += 1
-    }
-    result
-  }
-
-  private def mergeWithPartitionValues(
-      requiredColumns: Seq[Attribute],
-      dataColumns: Seq[Attribute],
-      partitionColumnSchema: StructType,
-      partitionValues: InternalRow,
-      dataRows: RDD[InternalRow]): RDD[InternalRow] = {
-    // If output columns contain any partition column(s), we need to merge scanned data
-    // columns and requested partition columns to form the final result.
-    if (requiredColumns != dataColumns) {
-      // Builds `AttributeReference`s for all partition columns so that we can use them to project
-      // required partition columns.  Note that if a partition column appears in `requiredColumns`,
-      // we should use the `AttributeReference` in `requiredColumns`.
-      val partitionColumns = {
-        val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
-        partitionColumnSchema.toAttributes.map { a =>
-          requiredColumnMap.getOrElse(a.name, a)
-        }
-      }
-
-      val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Object]) => {
-        // Note that we can't use an `UnsafeRowJoiner` to replace the following `JoinedRow` and
-        // `UnsafeProjection`.  Because the projection may also adjust column order.
-        val mutableJoinedRow = new JoinedRow()
-        val unsafePartitionValues = UnsafeProjection.create(partitionColumnSchema)(partitionValues)
-        val unsafeProjection =
-          UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
-
-        // If we are returning batches directly, we need to augment them with the partitioning
-        // columns. We want to do this without a row by row operation.
-        var columnBatch: ColumnarBatch = null
-        var mergedBatch: ColumnarBatch = null
-
-        iterator.map { input => {
-          if (input.isInstanceOf[InternalRow]) {
-            unsafeProjection(mutableJoinedRow(
-              input.asInstanceOf[InternalRow], unsafePartitionValues))
-          } else {
-            require(input.isInstanceOf[ColumnarBatch])
-            val inputBatch = input.asInstanceOf[ColumnarBatch]
-            if (inputBatch != mergedBatch) {
-              mergedBatch = inputBatch
-              columnBatch = projectedColumnBatch(inputBatch, requiredColumns,
-                dataColumns, partitionColumnSchema, partitionValues)
-            }
-            columnBatch.setNumRows(inputBatch.numRows())
-            columnBatch
-          }
-        }}
-      }
-
-      // This is an internal RDD whose call site the user should not be concerned with
-      // Since we create many of these (one per partition), the time spent on computing
-      // the call site may add up.
-      Utils.withDummyCallSite(dataRows.sparkContext) {
-        new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
-      }.asInstanceOf[RDD[InternalRow]]
-    } else {
-      dataRows
-    }
-  }
-
   // Get the bucket ID based on the bucketing values.
   // Restriction: Bucket pruning works iff the bucketing column has one and only one column.
   def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = {
@@ -472,57 +133,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     bucketIdGeneration(mutableRow).getInt(0)
   }
 
-  // Get the bucket BitSet by reading the filters that only contains bucketing keys.
-  // Note: When the returned BitSet is None, no pruning is possible.
-  // Restriction: Bucket pruning works iff the bucketing column has one and only one column.
-  private def getBuckets(
-      filters: Seq[Expression],
-      bucketSpec: Option[BucketSpec]): Option[BitSet] = {
-
-    if (bucketSpec.isEmpty ||
-      bucketSpec.get.numBuckets == 1 ||
-      bucketSpec.get.bucketColumnNames.length != 1) {
-      // None means all the buckets need to be scanned
-      return None
-    }
-
-    // Just get the first because bucketing pruning only works when the column has one column
-    val bucketColumnName = bucketSpec.get.bucketColumnNames.head
-    val numBuckets = bucketSpec.get.numBuckets
-    val matchedBuckets = new BitSet(numBuckets)
-    matchedBuckets.clear()
-
-    filters.foreach {
-      case expressions.EqualTo(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
-        matchedBuckets.set(getBucketId(a, numBuckets, v))
-      case expressions.EqualTo(Literal(v, _), a: Attribute) if a.name == bucketColumnName =>
-        matchedBuckets.set(getBucketId(a, numBuckets, v))
-      case expressions.EqualNullSafe(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
-        matchedBuckets.set(getBucketId(a, numBuckets, v))
-      case expressions.EqualNullSafe(Literal(v, _), a: Attribute) if a.name == bucketColumnName =>
-        matchedBuckets.set(getBucketId(a, numBuckets, v))
-      // Because we only convert In to InSet in Optimizer when there are more than certain
-      // items. So it is possible we still get an In expression here that needs to be pushed
-      // down.
-      case expressions.In(a: Attribute, list)
-          if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
-        val hSet = list.map(e => e.eval(EmptyRow))
-        hSet.foreach(e => matchedBuckets.set(getBucketId(a, numBuckets, e)))
-      case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
-        matchedBuckets.set(getBucketId(a, numBuckets, null))
-      case _ =>
-    }
-
-    logInfo {
-      val selected = matchedBuckets.cardinality()
-      val percentPruned = (1 - selected.toDouble / numBuckets.toDouble) * 100
-      s"Selected $selected buckets out of $numBuckets, pruned $percentPruned% partitions."
-    }
-
-    // None means all the buckets need to be scanned
-    if (matchedBuckets.cardinality() == 0) None else Some(matchedBuckets)
-  }
-
   // Based on Public API.
   protected def pruneFilterProject(
       relation: LogicalRelation,

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index aa1f764..bcddf72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -55,15 +55,7 @@ import org.apache.spark.sql.sources._
  */
 private[sql] object FileSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
-      if (files.fileFormat.toString == "TestFileFormat" ||
-         files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
-         files.fileFormat.toString == "ORC" ||
-         files.fileFormat.toString == "LibSVM" ||
-         files.fileFormat.isInstanceOf[csv.DefaultSource] ||
-         files.fileFormat.isInstanceOf[text.DefaultSource] ||
-         files.fileFormat.isInstanceOf[json.DefaultSource]) &&
-         files.sqlContext.conf.useFileScan =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) =>
       // Filters on this relation fall into four categories based on where we can use them to avoid
       // reading unneeded data:
       //  - partition keys only - used to prune directories to read

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 34fcbdf..06a371b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -133,37 +133,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     }
   }
 
-  /**
-   * This supports to eliminate unneeded columns before producing an RDD
-   * containing all of its tuples as Row objects. This reads all the tokens of each line
-   * and then drop unneeded tokens without casting and type-checking by mapping
-   * both the indices produced by `requiredColumns` and the ones of tokens.
-   */
-  override def buildInternalScan(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      inputFiles: Seq[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration],
-      options: Map[String, String]): RDD[InternalRow] = {
-    // TODO: Filter before calling buildInternalScan.
-    val csvFiles = inputFiles.filterNot(_.getPath.getName startsWith "_")
-
-    val csvOptions = new CSVOptions(options)
-    val pathsString = csvFiles.map(_.getPath.toUri.toString)
-    val header = dataSchema.fields.map(_.name)
-    val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString)
-    val rows = CSVRelation.parseCsv(tokenizedRdd, dataSchema, requiredColumns, csvOptions)
-
-    val requiredDataSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get))
-    rows.mapPartitions { iterator =>
-      val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
-      iterator.map(unsafeProjection)
-    }
-  }
-
   private def baseRdd(
       sqlContext: SQLContext,
       options: CSVOptions,

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 42cd25a..f32fea4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -93,35 +93,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     }
   }
 
-  override def buildInternalScan(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      inputFiles: Seq[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration],
-      options: Map[String, String]): RDD[InternalRow] = {
-    // TODO: Filter files for all formats before calling buildInternalScan.
-    val jsonFiles = inputFiles.filterNot(_.getPath.getName startsWith "_")
-
-    val parsedOptions: JSONOptions = new JSONOptions(options)
-    val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
-    val columnNameOfCorruptRecord =
-      parsedOptions.columnNameOfCorruptRecord
-        .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
-    val rows = JacksonParser.parse(
-      createBaseRdd(sqlContext, jsonFiles),
-      requiredDataSchema,
-      columnNameOfCorruptRecord,
-      parsedOptions)
-
-    rows.mapPartitions { iterator =>
-      val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
-      iterator.map(unsafeProjection)
-    }
-  }
-
   override def buildReader(
       sqlContext: SQLContext,
       dataSchema: StructType,

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index bcb2b2d..dbda094 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -251,12 +251,12 @@ private[sql] class DefaultSource
   }
 
   /**
-   * Returns whether the reader will the rows as batch or not.
+   * Returns whether the reader will return the rows as batch or not.
    */
   override def supportBatch(sqlContext: SQLContext, schema: StructType): Boolean = {
     val conf = SQLContext.getActive().get.conf
-    conf.useFileScan && conf.parquetVectorizedReaderEnabled &&
-      conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields &&
+    conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
+      schema.length <= conf.wholeStageMaxNumFields &&
       schema.forall(_.dataType.isInstanceOf[AtomicType])
   }
 
@@ -375,110 +375,6 @@ private[sql] class DefaultSource
       }
     }
   }
-
-  override def buildInternalScan(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      allFiles: Seq[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration],
-      options: Map[String, String]): RDD[InternalRow] = {
-    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
-    val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
-    val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
-    val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
-
-    // Parquet row group size. We will use this value as the value for
-    // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
-    // of these flags are smaller than the parquet row group size.
-    val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
-
-    // Create the function to set variable Parquet confs at both driver and executor side.
-    val initLocalJobFuncOpt =
-      ParquetRelation.initializeLocalJobFunc(
-        requiredColumns,
-        filters,
-        dataSchema,
-        parquetBlockSize,
-        useMetadataCache,
-        parquetFilterPushDown,
-        assumeBinaryIsString,
-        assumeInt96IsTimestamp) _
-
-    val inputFiles = splitFiles(allFiles).data.toArray
-
-    // Create the function to set input paths at the driver side.
-    val setInputPaths =
-      ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _
-
-    val allPrimitiveTypes = dataSchema.forall(_.dataType.isInstanceOf[AtomicType])
-    val inputFormatCls = if (sqlContext.conf.parquetVectorizedReaderEnabled
-      && allPrimitiveTypes) {
-      classOf[VectorizedParquetInputFormat]
-    } else {
-      classOf[ParquetInputFormat[InternalRow]]
-    }
-
-    Utils.withDummyCallSite(sqlContext.sparkContext) {
-      new SqlNewHadoopRDD(
-        sqlContext = sqlContext,
-        broadcastedConf = broadcastedConf,
-        initDriverSideJobFuncOpt = Some(setInputPaths),
-        initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
-        inputFormatClass = inputFormatCls,
-        valueClass = classOf[InternalRow]) {
-
-        val cacheMetadata = useMetadataCache
-
-        @transient val cachedStatuses = inputFiles.map { f =>
-          // In order to encode the authority of a Path containing special characters such as '/'
-          // (which does happen in some S3N credentials), we need to use the string returned by the
-          // URI of the path to create a new Path.
-          val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
-          new FileStatus(
-            f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime,
-            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
-        }.toSeq
-
-        private def escapePathUserInfo(path: Path): Path = {
-          val uri = path.toUri
-          new Path(new URI(
-            uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath,
-            uri.getQuery, uri.getFragment))
-        }
-
-        // Overridden so we can inject our own cached files statuses.
-        override def getPartitions: Array[SparkPartition] = {
-          val inputFormat = new ParquetInputFormat[InternalRow] {
-            override def listStatus(jobContext: JobContext): JList[FileStatus] = {
-              if (cacheMetadata) cachedStatuses.asJava else super.listStatus(jobContext)
-            }
-          }
-
-          val jobContext = new JobContextImpl(getConf(isDriverSide = true), jobId)
-          val rawSplits = inputFormat.getSplits(jobContext)
-
-          Array.tabulate[SparkPartition](rawSplits.size) { i =>
-            new SqlNewHadoopPartition(
-              id, i, rawSplits.get(i).asInstanceOf[InputSplit with Writable])
-          }
-        }
-      }
-    }
-  }
-}
-
-/**
- * The ParquetInputFormat that create VectorizedParquetRecordReader.
- */
-final class VectorizedParquetInputFormat extends ParquetInputFormat[InternalRow] {
-  override def createRecordReader(
-    inputSplit: InputSplit,
-    taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = {
-    new VectorizedParquetRecordReader().asInstanceOf[RecordReader[Void, InternalRow]]
-  }
 }
 
 // NOTE: This class is instantiated and used on executor side only, no need to be serializable.

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 99459ba..28b03ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -88,45 +88,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     }
   }
 
-  override def buildInternalScan(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      inputFiles: Seq[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration],
-      options: Map[String, String]): RDD[InternalRow] = {
-    verifySchema(dataSchema)
-
-    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
-    val conf = job.getConfiguration
-    val paths = inputFiles
-        .filterNot(_.getPath.getName startsWith "_")
-        .map(_.getPath)
-        .sortBy(_.toUri)
-
-    if (paths.nonEmpty) {
-      FileInputFormat.setInputPaths(job, paths: _*)
-    }
-
-    sqlContext.sparkContext.hadoopRDD(
-      conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
-        .mapPartitions { iter =>
-          val unsafeRow = new UnsafeRow(1)
-          val bufferHolder = new BufferHolder(unsafeRow)
-          val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
-
-          iter.map { case (_, line) =>
-            // Writes to an UnsafeRow directly
-            bufferHolder.reset()
-            unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
-            unsafeRow.setTotalSize(bufferHolder.totalSize())
-            unsafeRow
-          }
-        }
-  }
-
   override def buildReader(
       sqlContext: SQLContext,
       dataSchema: StructType,

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b58f960..e74fb00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -145,12 +145,6 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
-  val USE_FILE_SCAN = SQLConfigBuilder("spark.sql.sources.fileScan")
-    .internal()
-    .doc("Use the new FileScanRDD path for reading HDSF based data sources.")
-    .booleanConf
-    .createWithDefault(true)
-
   val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
     .doc("When true, the Parquet data source merges schemas collected from all data files, " +
          "otherwise the schema is picked from the summary file or a random data file " +
@@ -481,8 +475,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)
 
-  def useFileScan: Boolean = getConf(USE_FILE_SCAN)
-
   def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
 
   def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6acb41d..65b1f61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -458,16 +458,6 @@ trait FileFormat {
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory
 
-  def buildInternalScan(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      inputFiles: Seq[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration],
-      options: Map[String, String]): RDD[InternalRow]
-
   /**
    * Returns whether this format support returning columnar batch or not.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 41f536f..90d7f53 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -365,18 +365,6 @@ class TestFileFormat extends FileFormat {
     throw new NotImplementedError("JUST FOR TESTING")
   }
 
-  override def buildInternalScan(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      inputFiles: Seq[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration],
-      options: Map[String, String]): RDD[InternalRow] = {
-    throw new NotImplementedError("JUST FOR TESTING")
-  }
-
   override def buildReader(
       sqlContext: SQLContext,
       dataSchema: StructType,

http://git-wip-us.apache.org/repos/asf/spark/blob/678b96e7/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 43f445e..e915f3d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -111,19 +111,6 @@ private[sql] class DefaultSource
     }
   }
 
-  override def buildInternalScan(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      inputFiles: Seq[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration],
-      options: Map[String, String]): RDD[InternalRow] = {
-    val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
-    OrcTableScan(sqlContext, output, filters, inputFiles).execute()
-  }
-
   override def buildReader(
       sqlContext: SQLContext,
       dataSchema: StructType,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org