You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/18 21:45:42 UTC

spark git commit: [SPARK-7673] [SQL] WIP: HadoopFsRelation and ParquetRelation2 performance optimizations

Repository: spark
Updated Branches:
  refs/heads/master 530397ba2 -> 9dadf019b


[SPARK-7673] [SQL] WIP: HadoopFsRelation and ParquetRelation2 performance optimizations

This PR introduces several performance optimizations to `HadoopFsRelation` and `ParquetRelation2`:

1.  Moving `FileStatus` listing from `DataSourceStrategy` into a cache within `HadoopFsRelation`.

    This new cache generalizes and replaces the one used in `ParquetRelation2`.

    This also introduces an interface change: to reuse cached `FileStatus` objects, `HadoopFsRelation.buildScan` methods now receive `Array[FileStatus]` instead of `Array[String]`.

1.  When Parquet task side metadata reading is enabled, skip reading row group information when reading Parquet footers.

    This is basically what PR #5334 does. Also, now we uses `ParquetFileReader.readAllFootersInParallel` to read footers in parallel.

Another optimization in question is, instead of asking `HadoopFsRelation.buildScan` to return an `RDD[Row]` for a single selected partition and then union them all, we ask it to return an `RDD[Row]` for all selected partitions. This optimization is based on the fact that Hadoop configuration broadcasting used in `NewHadoopRDD` takes 34% time in the following microbenchmark.  However, this complicates data source user code because user code must merge partition values manually.

To check the cost of broadcasting in `NewHadoopRDD`, I also did microbenchmark after removing the `broadcast` call in `NewHadoopRDD`.  All results are shown below.

### Microbenchmark

#### Preparation code

Generating a partitioned table with 50k partitions, 1k rows per partition:

```scala
import sqlContext._
import sqlContext.implicits._

for (n <- 0 until 500) {
  val data = for {
    p <- (n * 10) until ((n + 1) * 10)
    i <- 0 until 1000
  } yield (i, f"val_$i%04d", f"$p%04d")

  data.
    toDF("a", "b", "p").
    write.
    partitionBy("p").
    mode("append").
    parquet(path)
}
```

#### Benchmarking code

```scala
import sqlContext._
import sqlContext.implicits._

import org.apache.spark.sql.types._
import com.google.common.base.Stopwatch

val path = "hdfs://localhost:9000/user/lian/5k"

def benchmark(n: Int)(f: => Unit) {
  val stopwatch = new Stopwatch()

  def run() = {
    stopwatch.reset()
    stopwatch.start()
    f
    stopwatch.stop()
    stopwatch.elapsedMillis()
  }

  val records = (0 until n).map(_ => run())

  (0 until n).foreach(i => println(s"Round $i: ${records(i)} ms"))
  println(s"Average: ${records.sum / n.toDouble} ms")
}

benchmark(3) { read.parquet(path).explain(extended = true) }
```

#### Results

Before:

```
Round 0: 72528 ms
Round 1: 68938 ms
Round 2: 65372 ms
Average: 68946.0 ms
```

After:

```
Round 0: 59499 ms
Round 1: 53645 ms
Round 2: 53844 ms
Round 3: 49093 ms
Round 4: 50555 ms
Average: 53327.2 ms
```

Also removing Hadoop configuration broadcasting:

(Note that I was testing on a local laptop, thus network cost is pretty low.)

```
Round 0: 15806 ms
Round 1: 14394 ms
Round 2: 14699 ms
Round 3: 15334 ms
Round 4: 14123 ms
Average: 14871.2 ms
```

Author: Cheng Lian <li...@databricks.com>

Closes #6225 from liancheng/spark-7673 and squashes the following commits:

2d58a2b [Cheng Lian] Skips reading row group information when using task side metadata reading
7aa3748 [Cheng Lian] Optimizes FileStatusCache by introducing a map from parent directories to child files
ba41250 [Cheng Lian] Reuses HadoopFsRelation FileStatusCache in ParquetRelation2
3d278f7 [Cheng Lian] Fixes a bug when reading a single Parquet data file
b84612a [Cheng Lian] Fixes Scala style issue
6a08b02 [Cheng Lian] WIP: Moves file status cache into HadoopFSRelation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9dadf019
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9dadf019
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9dadf019

Branch: refs/heads/master
Commit: 9dadf019b93038e1e18336ccd06c5eecb4bae32f
Parents: 530397b
Author: Cheng Lian <li...@databricks.com>
Authored: Mon May 18 12:45:37 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon May 18 12:45:37 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   |  61 +++++------
 .../spark/sql/sources/DataSourceStrategy.scala  |  37 ++-----
 .../apache/spark/sql/sources/interfaces.scala   | 104 +++++++++++++++----
 .../spark/sql/sources/SimpleTextRelation.scala  |   6 +-
 4 files changed, 117 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9dadf019/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index fea54a2..7ca44f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -23,12 +23,11 @@ import scala.collection.JavaConversions._
 import scala.util.Try
 
 import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import parquet.filter2.predicate.FilterApi
-import parquet.format.converter.ParquetMetadataConverter
 import parquet.hadoop._
 import parquet.hadoop.metadata.CompressionCodecName
 import parquet.hadoop.util.ContextUtil
@@ -175,8 +174,8 @@ private[sql] class ParquetRelation2(
   override def dataSchema: StructType = metadataCache.dataSchema
 
   override private[sql] def refresh(): Unit = {
-    metadataCache.refresh()
     super.refresh()
+    metadataCache.refresh()
   }
 
   // Parquet data source always uses Catalyst internal representations.
@@ -234,15 +233,15 @@ private[sql] class ParquetRelation2(
   override def buildScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
-      inputPaths: Array[String]): RDD[Row] = {
+      inputFiles: Array[FileStatus]): RDD[Row] = {
 
     val job = new Job(SparkHadoopUtil.get.conf)
     val conf = ContextUtil.getConfiguration(job)
 
     ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
 
-    if (inputPaths.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
+    if (inputFiles.nonEmpty) {
+      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
     }
 
     // Try to push down filters when filter push-down is enabled.
@@ -269,10 +268,7 @@ private[sql] class ParquetRelation2(
     val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
     conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
 
-    val inputFileStatuses =
-      metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
-
-    val footers = inputFileStatuses.map(metadataCache.footers)
+    val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
 
     // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
     // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
@@ -287,7 +283,7 @@ private[sql] class ParquetRelation2(
 
       val cacheMetadata = useMetadataCache
 
-      @transient val cachedStatuses = inputFileStatuses.map { f =>
+      @transient val cachedStatuses = inputFiles.map { f =>
         // In order to encode the authority of a Path containing special characters such as /,
         // we need to use the string returned by the URI of the path to create a new Path.
         val pathWithAuthority = new Path(f.getPath.toUri.toString)
@@ -333,7 +329,7 @@ private[sql] class ParquetRelation2(
     private var commonMetadataStatuses: Array[FileStatus] = _
 
     // Parquet footer cache.
-    var footers: Map[FileStatus, Footer] = _
+    var footers: Map[Path, Footer] = _
 
     // `FileStatus` objects of all data files (Parquet part-files).
     var dataStatuses: Array[FileStatus] = _
@@ -349,35 +345,30 @@ private[sql] class ParquetRelation2(
      * Refreshes `FileStatus`es, footers, partition spec, and table schema.
      */
     def refresh(): Unit = {
-      // Support either reading a collection of raw Parquet part-files, or a collection of folders
-      // containing Parquet files (e.g. partitioned Parquet table).
-      val baseStatuses = paths.distinct.flatMap { p =>
-        val path = new Path(p)
-        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-        val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        Try(fs.getFileStatus(qualified)).toOption
-      }
-      assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
-
       // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
-      val leaves = baseStatuses.flatMap { f =>
-        val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
-        SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
-          isSummaryFile(f.getPath) ||
-            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
-        }
-      }
+      val leaves = cachedLeafStatuses().filter { f =>
+        isSummaryFile(f.getPath) ||
+          !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+      }.toArray
 
       dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
       metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
       commonMetadataStatuses =
         leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
 
-      footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
-        val parquetMetadata = ParquetFileReader.readFooter(
-          SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
-        f -> new Footer(f.getPath, parquetMetadata)
-      }.seq.toMap
+      footers = {
+        val conf = SparkHadoopUtil.get.conf
+        val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
+        val rawFooters = if (shouldMergeSchemas) {
+          ParquetFileReader.readAllFootersInParallel(
+            conf, seqAsJavaList(leaves), taskSideMetaData)
+        } else {
+          ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
+            conf, seqAsJavaList(leaves), taskSideMetaData)
+        }
+
+        rawFooters.map(footer => footer.getFile -> footer).toMap
+      }
 
       // If we already get the schema, don't need to re-compute it since the schema merging is
       // time-consuming.
@@ -448,7 +439,7 @@ private[sql] class ParquetRelation2(
         "No schema defined, " +
           s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
 
-      ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
+      ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9dadf019/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index e6324b2..1615a6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,20 +17,16 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.hadoop.fs.Path
-
 import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{UnionRDD, RDD}
-import org.apache.spark.sql.Row
+import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.types.{StructType, UTF8String, StringType}
-import org.apache.spark.sql._
+import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
+import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
 
 /**
  * A Strategy for planning scans over data sources defined using the sources API.
@@ -58,7 +54,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         filters,
         (a, _) => t.buildScan(a)) :: Nil
 
-    // Scanning partitioned FSBasedRelation
+    // Scanning partitioned HadoopFsRelation
     case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
         if t.partitionSpec.partitionColumns.nonEmpty =>
       val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
@@ -86,22 +82,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         t.partitionSpec.partitionColumns,
         selectedPartitions) :: Nil
 
-    // Scanning non-partitioned FSBasedRelation
+    // Scanning non-partitioned HadoopFsRelation
     case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
-      val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
-        val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
-        val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        SparkHadoopUtil.get.listLeafStatuses(fs, qualifiedPath).map(_.getPath).filterNot { path =>
-          val name = path.getName
-          name.startsWith("_") || name.startsWith(".")
-        }.map(fs.makeQualified(_).toString)
-      }
-
       pruneFilterProject(
         l,
         projectList,
         filters,
-        (a, f) => t.buildScan(a, f, inputPaths)) :: Nil
+        (a, f) => t.buildScan(a, f, t.paths)) :: Nil
 
     case l @ LogicalRelation(t: TableScan) =>
       createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
@@ -130,16 +117,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
 
     // Builds RDD[Row]s for each selected partition.
     val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
-      // Paths to all data files within this partition
-      val dataFilePaths = {
-        val dirPath = new Path(dir)
-        val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf)
-        fs.listStatus(dirPath).map(_.getPath).filterNot { path =>
-          val name = path.getName
-          name.startsWith("_") || name.startsWith(".")
-        }.map(fs.makeQualified(_).toString)
-      }
-
       // The table scan operator (PhysicalRDD) which retrieves required columns from data files.
       // Notice that the schema of data files, represented by `relation.dataSchema`, may contain
       // some partition column(s).
@@ -155,7 +132,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
             // assuming partition columns data stored in data files are always consistent with those
             // partition values encoded in partition directory paths.
             val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
-            val dataRows = relation.buildScan(nonPartitionColumns, filters, dataFilePaths)
+            val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))
 
             // Merges data values with partition values.
             mergeWithPartitionValues(

http://git-wip-us.apache.org/repos/asf/spark/blob/9dadf019/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a82a675..9b52d1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.sql.sources
 
+import scala.collection.mutable
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
@@ -368,18 +368,61 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
 
   private var _partitionSpec: PartitionSpec = _
 
+  private class FileStatusCache {
+    var leafFiles = mutable.Map.empty[Path, FileStatus]
+
+    var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
+
+    var leafDirs = mutable.Map.empty[Path, FileStatus]
+
+    def refresh(): Unit = {
+      def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
+        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+        val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
+        files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+      }
+
+      leafDirs.clear()
+      leafFiles.clear()
+
+      // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
+      // may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
+      val statuses = paths.flatMap { path =>
+        val hdfsPath = new Path(path)
+        val fs = hdfsPath.getFileSystem(hadoopConf)
+        val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
+      }
+
+      val (dirs, files) = statuses.partition(_.isDir)
+      leafDirs ++= dirs.map(d => d.getPath -> d).toMap
+      leafFiles ++= files.map(f => f.getPath -> f).toMap
+      leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
+    }
+  }
+
+  private lazy val fileStatusCache = {
+    val cache = new FileStatusCache
+    cache.refresh()
+    cache
+  }
+
+  protected def cachedLeafStatuses(): Set[FileStatus] = {
+    fileStatusCache.leafFiles.values.toSet
+  }
+
   final private[sql] def partitionSpec: PartitionSpec = {
     if (_partitionSpec == null) {
       _partitionSpec = maybePartitionSpec
         .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
         .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
         .getOrElse {
-        if (sqlContext.conf.partitionDiscoveryEnabled()) {
-          discoverPartitions()
-        } else {
-          PartitionSpec(StructType(Nil), Array.empty[Partition])
+          if (sqlContext.conf.partitionDiscoveryEnabled()) {
+            discoverPartitions()
+          } else {
+            PartitionSpec(StructType(Nil), Array.empty[Partition])
+          }
         }
-      }
     }
     _partitionSpec
   }
@@ -409,20 +452,14 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   def userDefinedPartitionColumns: Option[StructType] = None
 
   private[sql] def refresh(): Unit = {
+    fileStatusCache.refresh()
     if (sqlContext.conf.partitionDiscoveryEnabled()) {
       _partitionSpec = discoverPartitions()
     }
   }
 
   private def discoverPartitions(): PartitionSpec = {
-    val basePaths = paths.map(new Path(_))
-    val leafDirs = basePaths.flatMap { path =>
-      val fs = path.getFileSystem(hadoopConf)
-      Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory)))
-        .filter(_.isDir)
-        .map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
-        .getOrElse(Seq.empty[FileStatus])
-    }.map(_.getPath)
+    val leafDirs = fileStatusCache.leafDirs.keys.toSeq
 
     if (leafDirs.nonEmpty) {
       PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
@@ -444,6 +481,27 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
     })
   }
 
+  private[sources] final def buildScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputPaths: Array[String]): RDD[Row] = {
+    val inputStatuses = inputPaths.flatMap { input =>
+      val path = new Path(input)
+
+      // First assumes `input` is a directory path, and tries to get all files contained in it.
+      fileStatusCache.leafDirToChildrenFiles.getOrElse(
+        path,
+        // Otherwise, `input` might be a file path
+        fileStatusCache.leafFiles.get(path).toArray
+      ).filter { status =>
+        val name = status.getPath.getName
+        !name.startsWith("_") && !name.startsWith(".")
+      }
+    }
+
+    buildScan(requiredColumns, filters, inputStatuses)
+  }
+
   /**
    * Specifies schema of actual data files.  For partitioned relations, if one or more partitioned
    * columns are contained in the data files, they should also appear in `dataSchema`.
@@ -457,13 +515,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
    * this relation. For partitioned relations, this method is called for each selected partition,
    * and builds an `RDD[Row]` containing all rows within that single partition.
    *
-   * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
    *        relation. For a partitioned relation, it contains paths of all data files in a single
    *        selected partition.
    *
    * @since 1.4.0
    */
-  def buildScan(inputPaths: Array[String]): RDD[Row] = {
+  def buildScan(inputFiles: Array[FileStatus]): RDD[Row] = {
     throw new UnsupportedOperationException(
       "At least one buildScan() method should be overridden to read the relation.")
   }
@@ -474,13 +532,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
    * and builds an `RDD[Row]` containing all rows within that single partition.
    *
    * @param requiredColumns Required columns.
-   * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
    *        relation. For a partitioned relation, it contains paths of all data files in a single
    *        selected partition.
    *
    * @since 1.4.0
    */
-  def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = {
+  def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
     // Yeah, to workaround serialization...
     val dataSchema = this.dataSchema
     val codegenEnabled = this.codegenEnabled
@@ -490,7 +548,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
       BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable)
     }.toSeq
 
-    buildScan(inputPaths).mapPartitions { rows =>
+    buildScan(inputFiles).mapPartitions { rows =>
       val buildProjection = if (codegenEnabled) {
         GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
       } else {
@@ -512,7 +570,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
    *        of all `filters`.  The pushed down filters are currently purely an optimization as they
    *        will all be evaluated again. This means it is safe to use them with methods that produce
    *        false positives such as filtering partitions based on a bloom filter.
-   * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
    *        relation. For a partitioned relation, it contains paths of all data files in a single
    *        selected partition.
    *
@@ -521,8 +579,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   def buildScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
-      inputPaths: Array[String]): RDD[Row] = {
-    buildScan(requiredColumns, inputPaths)
+      inputFiles: Array[FileStatus]): RDD[Row] = {
+    buildScan(requiredColumns, inputFiles)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9dadf019/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 29b2158..09eed66 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -21,7 +21,7 @@ import java.text.NumberFormat
 import java.util.UUID
 
 import com.google.common.base.Objects
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
 import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
@@ -101,10 +101,10 @@ class SimpleTextRelation(
   override def hashCode(): Int =
     Objects.hashCode(paths, maybeDataSchema, dataSchema)
 
-  override def buildScan(inputPaths: Array[String]): RDD[Row] = {
+  override def buildScan(inputStatuses: Array[FileStatus]): RDD[Row] = {
     val fields = dataSchema.map(_.dataType)
 
-    sparkContext.textFile(inputPaths.mkString(",")).map { record =>
+    sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record =>
       Row(record.split(",").zip(fields).map { case (value, dataType) =>
         Cast(Literal(value), dataType).eval()
       }: _*)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org