You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/20 20:23:44 UTC

spark git commit: [SPARK-7713] [SQL] Use shared broadcast hadoop conf for partitioned table scan.

Repository: spark
Updated Branches:
  refs/heads/master 98a46f9df -> b631bf73b


[SPARK-7713] [SQL] Use shared broadcast hadoop conf for partitioned table scan.

https://issues.apache.org/jira/browse/SPARK-7713

I tested the performance with the following code:
```scala
import sqlContext._
import sqlContext.implicits._

(1 to 5000).foreach { i =>
  val df = (1 to 1000).map(j => (j, s"str$j")).toDF("a", "b").save(s"/tmp/partitioned/i=$i")
}

sqlContext.sql("""
CREATE TEMPORARY TABLE partitionedParquet
USING org.apache.spark.sql.parquet
OPTIONS (
  path '/tmp/partitioned'
)""")

table("partitionedParquet").explain(true)
```

In our master `explain` takes 40s in my laptop. With this PR, `explain` takes 14s.

Author: Yin Huai <yh...@databricks.com>

Closes #6252 from yhuai/broadcastHadoopConf and squashes the following commits:

6fa73df [Yin Huai] Address comments of Josh and Andrew.
807fbf9 [Yin Huai] Make the new buildScan and SqlNewHadoopRDD private sql.
e393555 [Yin Huai] Cheng's comments.
2eb53bb [Yin Huai] Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b631bf73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b631bf73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b631bf73

Branch: refs/heads/master
Commit: b631bf73b9f288f37c98b806be430b22485880e5
Parents: 98a46f9
Author: Yin Huai <yh...@databricks.com>
Authored: Wed May 20 11:23:40 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 20 11:23:40 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 113 +++++---
 .../spark/sql/sources/DataSourceStrategy.scala  |  19 +-
 .../spark/sql/sources/SqlNewHadoopRDD.scala     | 268 +++++++++++++++++++
 .../apache/spark/sql/sources/interfaces.scala   |  35 ++-
 4 files changed, 387 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b631bf73/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 7ca44f7..c35b7ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConversions._
 import scala.util.Try
 
 import com.google.common.base.Objects
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
@@ -32,13 +33,14 @@ import parquet.hadoop._
 import parquet.hadoop.metadata.CompressionCodecName
 import parquet.hadoop.util.ContextUtil
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD._
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}
 
 private[sql] class DefaultSource extends HadoopFsRelationProvider {
   override def createRelation(
@@ -233,40 +235,20 @@ private[sql] class ParquetRelation2(
   override def buildScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
-      inputFiles: Array[FileStatus]): RDD[Row] = {
-
-    val job = new Job(SparkHadoopUtil.get.conf)
-    val conf = ContextUtil.getConfiguration(job)
-
-    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-
-    if (inputFiles.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
-    }
-
-    // Try to push down filters when filter push-down is enabled.
-    if (sqlContext.conf.parquetFilterPushDown) {
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-    }
-
-    conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
-      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
-    })
-
-    conf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetTypesConverter.convertToString(dataSchema.toAttributes))
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+      inputFiles: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
     val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
-    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+    val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
+    // Create the function to set variable Parquet confs at both driver and executor side.
+    val initLocalJobFuncOpt =
+      ParquetRelation2.initializeLocalJobFunc(
+        requiredColumns,
+        filters,
+        dataSchema,
+        useMetadataCache,
+        parquetFilterPushDown) _
+    // Create the function to set input paths at the driver side.
+    val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
 
     val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
 
@@ -274,12 +256,14 @@ private[sql] class ParquetRelation2(
     // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
     // footers.  Especially when a global arbitrative schema (either from metastore or data source
     // DDL) is available.
-    new NewHadoopRDD(
-      sqlContext.sparkContext,
-      classOf[FilteringParquetRowInputFormat],
-      classOf[Void],
-      classOf[Row],
-      conf) {
+    new SqlNewHadoopRDD(
+      sc = sqlContext.sparkContext,
+      broadcastedConf = broadcastedConf,
+      initDriverSideJobFuncOpt = Some(setInputPaths),
+      initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
+      inputFormatClass = classOf[FilteringParquetRowInputFormat],
+      keyClass = classOf[Void],
+      valueClass = classOf[Row]) {
 
       val cacheMetadata = useMetadataCache
 
@@ -311,11 +295,11 @@ private[sql] class ParquetRelation2(
           new FilteringParquetRowInputFormat
         }
 
-        val jobContext = newJobContext(getConf, jobId)
+        val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
         val rawSplits = inputFormat.getSplits(jobContext)
 
         Array.tabulate[SparkPartition](rawSplits.size) { i =>
-          new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+          new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
         }
       }
     }.values
@@ -452,6 +436,49 @@ private[sql] object ParquetRelation2 extends Logging {
   // internally.
   private[sql] val METASTORE_SCHEMA = "metastoreSchema"
 
+  /** This closure sets various Parquet configurations at both driver side and executor side. */
+  private[parquet] def initializeLocalJobFunc(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      dataSchema: StructType,
+      useMetadataCache: Boolean,
+      parquetFilterPushDown: Boolean)(job: Job): Unit = {
+    val conf = job.getConfiguration
+    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName())
+
+    // Try to push down filters when filter push-down is enabled.
+    if (parquetFilterPushDown) {
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+        // is used here.
+        .flatMap(ParquetFilters.createFilter(dataSchema, _))
+        .reduceOption(FilterApi.and)
+        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+    }
+
+    conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+      ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
+    })
+
+    conf.set(
+      RowWriteSupport.SPARK_ROW_SCHEMA,
+      ParquetTypesConverter.convertToString(dataSchema.toAttributes))
+
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+  }
+
+  /** This closure sets input paths at the driver side. */
+  private[parquet] def initializeDriverSideJobFunc(
+      inputFiles: Array[FileStatus])(job: Job): Unit = {
+    // We side the input paths at the driver side.
+    if (inputFiles.nonEmpty) {
+      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
+    }
+  }
+
   private[parquet] def readSchema(
       footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
     footers.map { footer =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b631bf73/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 1615a6d..550090d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.spark.Logging
+import org.apache.spark.{SerializableWritable, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
@@ -84,11 +85,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
 
     // Scanning non-partitioned HadoopFsRelation
     case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
+      // See buildPartitionedTableScan for the reason that we need to create a shard
+      // broadcast HadoopConf.
+      val sharedHadoopConf = SparkHadoopUtil.get.conf
+      val confBroadcast =
+        t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
       pruneFilterProject(
         l,
         projectList,
         filters,
-        (a, f) => t.buildScan(a, f, t.paths)) :: Nil
+        (a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil
 
     case l @ LogicalRelation(t: TableScan) =>
       createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
@@ -115,6 +121,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     val output = projections.map(_.toAttribute)
     val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
 
+    // Because we are creating one RDD per partition, we need to have a shared HadoopConf.
+    // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
+    val sharedHadoopConf = SparkHadoopUtil.get.conf
+    val confBroadcast =
+      relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+
     // Builds RDD[Row]s for each selected partition.
     val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
       // The table scan operator (PhysicalRDD) which retrieves required columns from data files.
@@ -132,7 +144,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
             // assuming partition columns data stored in data files are always consistent with those
             // partition values encoded in partition directory paths.
             val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
-            val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))
+            val dataRows =
+              relation.buildScan(nonPartitionColumns, filters, Array(dir), confBroadcast)
 
             // Merges data values with partition values.
             mergeWithPartitionValues(

http://git-wip-us.apache.org/repos/asf/spark/blob/b631bf73/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
new file mode 100644
index 0000000..0c7bb6e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.spark.broadcast.Broadcast
+
+import org.apache.spark.{Partition => SparkPartition, _}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.rdd.{RDD, HadoopRDD}
+import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
+
+import scala.reflect.ClassTag
+
+private[spark] class SqlNewHadoopPartition(
+    rddId: Int,
+    val index: Int,
+    @transient rawSplit: InputSplit with Writable)
+  extends SparkPartition {
+
+  val serializableHadoopSplit = new SerializableWritable(rawSplit)
+
+  override def hashCode(): Int = 41 * (41 + rddId) + index
+}
+
+/**
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
+ * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
+ * 1. A shared broadcast Hadoop Configuration.
+ * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side
+ *    to the shared Hadoop Configuration.
+ * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
+ *    and the executor side to the shared Hadoop Configuration.
+ *
+ * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
+ * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be
+ * folded into core.
+ */
+private[sql] class SqlNewHadoopRDD[K, V](
+    @transient sc : SparkContext,
+    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+    @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+    initLocalJobFuncOpt: Option[Job => Unit],
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V])
+  extends RDD[(K, V)](sc, Nil)
+  with SparkHadoopMapReduceUtil
+  with Logging {
+
+  if (initLocalJobFuncOpt.isDefined) {
+    sc.clean(initLocalJobFuncOpt.get)
+  }
+
+  protected def getJob(): Job = {
+    val conf: Configuration = broadcastedConf.value.value
+    // "new Job" will make a copy of the conf. Then, it is
+    // safe to mutate conf properties with initLocalJobFuncOpt
+    // and initDriverSideJobFuncOpt.
+    val newJob = new Job(conf)
+    initLocalJobFuncOpt.map(f => f(newJob))
+    newJob
+  }
+
+  def getConf(isDriverSide: Boolean): Configuration = {
+    val job = getJob()
+    if (isDriverSide) {
+      initDriverSideJobFuncOpt.map(f => f(job))
+    }
+    job.getConfiguration
+  }
+
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  @transient protected val jobId = new JobID(jobTrackerId, id)
+
+  override def getPartitions: Array[SparkPartition] = {
+    val conf = getConf(isDriverSide = true)
+    val inputFormat = inputFormatClass.newInstance
+    inputFormat match {
+      case configurable: Configurable =>
+        configurable.setConf(conf)
+      case _ =>
+    }
+    val jobContext = newJobContext(conf, jobId)
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val result = new Array[SparkPartition](rawSplits.size)
+    for (i <- 0 until rawSplits.size) {
+      result(i) =
+        new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+    }
+    result
+  }
+
+  override def compute(
+      theSplit: SparkPartition,
+      context: TaskContext): InterruptibleIterator[(K, V)] = {
+    val iter = new Iterator[(K, V)] {
+      val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
+      logInfo("Input split: " + split.serializableHadoopSplit)
+      val conf = getConf(isDriverSide = false)
+
+      val inputMetrics = context.taskMetrics
+        .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
+      // Find a function that will return the FileSystem bytes read by this thread. Do this before
+      // creating RecordReader, because RecordReader's constructor might read some bytes
+      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
+        split.serializableHadoopSplit.value match {
+          case _: FileSplit | _: CombineFileSplit =>
+            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+          case _ => None
+        }
+      }
+      inputMetrics.setBytesReadCallback(bytesReadCallback)
+
+      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+      val format = inputFormatClass.newInstance
+      format match {
+        case configurable: Configurable =>
+          configurable.setConf(conf)
+        case _ =>
+      }
+      val reader = format.createRecordReader(
+        split.serializableHadoopSplit.value, hadoopAttemptContext)
+      reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+
+      // Register an on-task-completion callback to close the input stream.
+      context.addTaskCompletionListener(context => close())
+      var havePair = false
+      var finished = false
+      var recordsSinceMetricsUpdate = 0
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = !reader.nextKeyValue
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        if (!finished) {
+          inputMetrics.incRecordsRead(1)
+        }
+        (reader.getCurrentKey, reader.getCurrentValue)
+      }
+
+      private def close() {
+        try {
+          reader.close()
+          if (bytesReadCallback.isDefined) {
+            inputMetrics.updateBytesRead()
+          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+                     split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+            // If we can't get the bytes read from the FS stats, fall back to the split size,
+            // which may be inaccurate.
+            try {
+              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+            } catch {
+              case e: java.io.IOException =>
+                logWarning("Unable to get input size to set InputMetrics for task", e)
+            }
+          }
+        } catch {
+          case e: Exception => {
+            if (!Utils.inShutdown()) {
+              logWarning("Exception in RecordReader.close()", e)
+            }
+          }
+        }
+      }
+    }
+    new InterruptibleIterator(context, iter)
+  }
+
+  /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
+  @DeveloperApi
+  def mapPartitionsWithInputSplit[U: ClassTag](
+      f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
+      preservesPartitioning: Boolean = false): RDD[U] = {
+    new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+  }
+
+  override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
+    val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
+    val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
+      case Some(c) => 
+        try {
+          val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
+          Some(HadoopRDD.convertSplitLocationInfo(infos))
+        } catch {
+          case e : Exception =>
+            logDebug("Failed to use InputSplit#getLocationInfo.", e)
+            None
+        }
+      case None => None
+    }
+    locs.getOrElse(split.getLocations.filter(_ != "localhost"))
+  }
+
+  override def persist(storageLevel: StorageLevel): this.type = {
+    if (storageLevel.deserialized) {
+      logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
+        " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
+        " Use a map transformation to make copies of the records.")
+    }
+    super.persist(storageLevel)
+  }
+}
+
+private[spark] object SqlNewHadoopRDD {
+  /**
+   * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
+   * the given function rather than the index of the partition.
+   */
+  private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
+      prev: RDD[T],
+      f: (InputSplit, Iterator[T]) => Iterator[U],
+      preservesPartitioning: Boolean = false)
+    extends RDD[U](prev) {
+
+    override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
+
+    override def getPartitions: Array[SparkPartition] = firstParent[T].partitions
+
+    override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = {
+      val partition = split.asInstanceOf[SqlNewHadoopPartition]
+      val inputSplit = partition.serializableHadoopSplit.value
+      f(inputSplit, firstParent[T].iterator(split, context))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b631bf73/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 9b52d1b..6a917bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -25,7 +25,9 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
+import org.apache.spark.SerializableWritable
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
@@ -484,7 +486,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   private[sources] final def buildScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
-      inputPaths: Array[String]): RDD[Row] = {
+      inputPaths: Array[String],
+      broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
     val inputStatuses = inputPaths.flatMap { input =>
       val path = new Path(input)
 
@@ -499,7 +502,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
       }
     }
 
-    buildScan(requiredColumns, filters, inputStatuses)
+    buildScan(requiredColumns, filters, inputStatuses, broadcastedConf)
   }
 
   /**
@@ -584,6 +587,34 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   }
 
   /**
+   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
+   * this relation. For partitioned relations, this method is called for each selected partition,
+   * and builds an `RDD[Row]` containing all rows within that single partition.
+   *
+   * Note: This interface is subject to change in future.
+   *
+   * @param requiredColumns Required columns.
+   * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
+   *        of all `filters`.  The pushed down filters are currently purely an optimization as they
+   *        will all be evaluated again. This means it is safe to use them with methods that produce
+   *        false positives such as filtering partitions based on a bloom filter.
+   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
+   *        relation. For a partitioned relation, it contains paths of all data files in a single
+   *        selected partition.
+   * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the
+   *                        overhead of broadcasting the Configuration for every Hadoop RDD.
+   *
+   * @since 1.4.0
+   */
+  private[sql] def buildScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputFiles: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+    buildScan(requiredColumns, filters, inputFiles)
+  }
+
+  /**
    * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
    * be put here.  For example, user defined output committer can be configured here
    * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org