You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/09 00:19:29 UTC

spark git commit: [SPARK-13738][SQL] Cleanup Data Source resolution

Repository: spark
Updated Branches:
  refs/heads/master 076009b94 -> 1e2884059


[SPARK-13738][SQL] Cleanup Data Source resolution

Follow-up to #11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`.
 - Multiple functions share the same set of arguments so we make this a case class, called `DataSource`.  Actual resolution is now done by calling a function on this class.
 - Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`.
 - Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places.

Author: Michael Armbrust <mi...@databricks.com>

Closes #11572 from marmbrus/dataSourceResolution.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e288405
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e288405
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e288405

Branch: refs/heads/master
Commit: 1e28840594b9d972c96d3922ca0bf0f76e313e82
Parents: 076009b
Author: Michael Armbrust <mi...@databricks.com>
Authored: Tue Mar 8 15:19:26 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Mar 8 15:19:26 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala  |  34 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  29 +-
 .../sql/execution/datasources/DataSource.scala  | 338 +++++++++++++++++
 .../datasources/PartitioningUtils.scala         |  24 +-
 .../datasources/ResolvedDataSource.scala        | 360 -------------------
 .../spark/sql/execution/datasources/ddl.scala   |  21 +-
 .../spark/sql/execution/datasources/rules.scala |  14 +-
 .../apache/spark/sql/sources/interfaces.scala   |   2 +-
 .../execution/datasources/json/JsonSuite.scala  |  14 +-
 .../sql/sources/ResolvedDataSourceSuite.scala   |  28 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  24 +-
 .../spark/sql/hive/execution/commands.scala     |  31 +-
 .../apache/spark/sql/hive/orc/OrcRelation.scala |   5 +-
 13 files changed, 462 insertions(+), 462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index fd92e52..509b299 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.LogicalRDD
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
 import org.apache.spark.sql.execution.streaming.StreamingRelation
@@ -122,12 +122,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * @since 1.4.0
    */
   def load(): DataFrame = {
-    val resolved = ResolvedDataSource(
-      sqlContext,
-      userSpecifiedSchema = userSpecifiedSchema,
-      provider = source,
-      options = extraOptions.toMap)
-    DataFrame(sqlContext, LogicalRelation(resolved.relation))
+    val dataSource =
+      DataSource(
+        sqlContext,
+        userSpecifiedSchema = userSpecifiedSchema,
+        className = source,
+        options = extraOptions.toMap)
+    DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
   }
 
   /**
@@ -152,12 +153,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
       sqlContext.emptyDataFrame
     } else {
       sqlContext.baseRelationToDataFrame(
-        ResolvedDataSource.apply(
+        DataSource.apply(
           sqlContext,
           paths = paths,
           userSpecifiedSchema = userSpecifiedSchema,
-          provider = source,
-          options = extraOptions.toMap).relation)
+          className = source,
+          options = extraOptions.toMap).resolveRelation())
     }
   }
 
@@ -168,12 +169,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * @since 2.0.0
    */
   def stream(): DataFrame = {
-    val resolved = ResolvedDataSource.createSource(
-      sqlContext,
-      userSpecifiedSchema = userSpecifiedSchema,
-      providerName = source,
-      options = extraOptions.toMap)
-    DataFrame(sqlContext, StreamingRelation(resolved))
+    val dataSource =
+      DataSource(
+        sqlContext,
+        userSpecifiedSchema = userSpecifiedSchema,
+        className = source,
+        options = extraOptions.toMap)
+    DataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6d8c8f6..78f30f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
-import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.sources.HadoopFsRelation
@@ -195,14 +195,14 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    */
   def save(): Unit = {
     assertNotBucketed()
-    ResolvedDataSource(
+    val dataSource = DataSource(
       df.sqlContext,
-      source,
-      partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
-      getBucketSpec,
-      mode,
-      extraOptions.toMap,
-      df)
+      className = source,
+      partitionColumns = partitioningColumns.getOrElse(Nil),
+      bucketSpec = getBucketSpec,
+      options = extraOptions.toMap)
+
+    dataSource.write(mode, df)
   }
 
   /**
@@ -235,14 +235,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * @since 2.0.0
    */
   def stream(): ContinuousQuery = {
-    val sink = ResolvedDataSource.createSink(
-      df.sqlContext,
-      source,
-      extraOptions.toMap,
-      normalizedParCols.getOrElse(Nil))
+    val dataSource =
+      DataSource(
+        df.sqlContext,
+        className = source,
+        options = extraOptions.toMap,
+        partitionColumns = normalizedParCols.getOrElse(Nil))
 
     df.sqlContext.continuousQueryManager.startQuery(
-      extraOptions.getOrElse("queryName", StreamExecution.nextName), df, sink)
+      extraOptions.getOrElse("queryName", StreamExecution.nextName), df, dataSource.createSink())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
new file mode 100644
index 0000000..e90e72d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -0,0 +1,338 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+import scala.language.{existentials, implicitConversions}
+import scala.util.{Failure, Success, Try}
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
+import org.apache.spark.util.Utils
+
+/**
+ * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
+ * acting as the canonical set of parameters that can describe a Data Source, this class is used to
+ * resolve a description to a concrete implementation that can be used in a query plan
+ * (either batch or streaming) or to write out data using an external library.
+ *
+ * From an end user's perspective a DataSource description can be created explicitly using
+ * [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL.  Additionally, this class is
+ * used when resolving a description from a metastore to a concrete implementation.
+ *
+ * Many of the arguments to this class are optional, though depending on the specific API being used
+ * these optional arguments might be filled in during resolution using either inference or external
+ * metadata.  For example, when reading a partitioned table from a file system, partition columns
+ * will be inferred from the directory layout even if they are not specified.
+ *
+ * @param paths A list of file system paths that hold data.  These will be globbed before and
+ *              qualified. This option only works when reading from a [[FileFormat]].
+ * @param userSpecifiedSchema An optional specification of the schema of the data. When present
+ *                            we skip attempting to infer the schema.
+ * @param partitionColumns A list of column names that the relation is partitioned by.  When this
+ *                         list is empty, the relation is unpartitioned.
+ * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
+ */
+case class DataSource(
+    sqlContext: SQLContext,
+    className: String,
+    paths: Seq[String] = Nil,
+    userSpecifiedSchema: Option[StructType] = None,
+    partitionColumns: Seq[String] = Seq.empty,
+    bucketSpec: Option[BucketSpec] = None,
+    options: Map[String, String] = Map.empty) extends Logging {
+
+  lazy val providingClass: Class[_] = lookupDataSource(className)
+
+  /** A map to maintain backward compatibility in case we move data sources around. */
+  private val backwardCompatibilityMap = Map(
+    "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName
+  )
+
+  /** Given a provider name, look up the data source class definition. */
+  private def lookupDataSource(provider0: String): Class[_] = {
+    val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
+    val provider2 = s"$provider.DefaultSource"
+    val loader = Utils.getContextOrSparkClassLoader
+    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+    serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match {
+      // the provider format did not match any given registered aliases
+      case Nil =>
+        Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
+          case Success(dataSource) =>
+            // Found the data source using fully qualified path
+            dataSource
+          case Failure(error) =>
+            if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+              throw new ClassNotFoundException(
+                "The ORC data source must be used with Hive support enabled.", error)
+            } else {
+              if (provider == "avro" || provider == "com.databricks.spark.avro") {
+                throw new ClassNotFoundException(
+                  s"Failed to find data source: $provider. Please use Spark package " +
+                  "http://spark-packages.org/package/databricks/spark-avro",
+                  error)
+              } else {
+                throw new ClassNotFoundException(
+                  s"Failed to find data source: $provider. Please find packages at " +
+                  "http://spark-packages.org",
+                  error)
+              }
+            }
+        }
+      case head :: Nil =>
+        // there is exactly one registered alias
+        head.getClass
+      case sources =>
+        // There are multiple registered aliases for the input
+        sys.error(s"Multiple sources found for $provider " +
+          s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+          "please specify the fully qualified class name.")
+    }
+  }
+
+  /** Returns a source that can be used to continually read data. */
+  def createSource(): Source = {
+    providingClass.newInstance() match {
+      case s: StreamSourceProvider =>
+        s.createSource(sqlContext, userSpecifiedSchema, className, options)
+
+      case format: FileFormat =>
+        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+        val path = caseInsensitiveOptions.getOrElse("path", {
+          throw new IllegalArgumentException("'path' is not specified")
+        })
+        val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
+
+        val allPaths = caseInsensitiveOptions.get("path")
+        val globbedPaths = allPaths.toSeq.flatMap { path =>
+          val hdfsPath = new Path(path)
+          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+          SparkHadoopUtil.get.globPathIfNecessary(qualified)
+        }.toArray
+
+        val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
+        val dataSchema = userSpecifiedSchema.orElse {
+          format.inferSchema(
+            sqlContext,
+            caseInsensitiveOptions,
+            fileCatalog.allFiles())
+        }.getOrElse {
+          throw new AnalysisException("Unable to infer schema.  It must be specified manually.")
+        }
+
+        def dataFrameBuilder(files: Array[String]): DataFrame = {
+          new DataFrame(
+            sqlContext,
+            LogicalRelation(
+              DataSource(
+                sqlContext,
+                paths = files,
+                userSpecifiedSchema = Some(dataSchema),
+                className = className,
+                options = options.filterKeys(_ != "path")).resolveRelation()))
+        }
+
+        new FileStreamSource(
+          sqlContext, metadataPath, path, Some(dataSchema), className, dataFrameBuilder)
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Data source $className does not support streamed reading")
+    }
+  }
+
+  /** Returns a sink that can be used to continually write data. */
+  def createSink(): Sink = {
+    val datasourceClass = providingClass.newInstance() match {
+      case s: StreamSinkProvider => s
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Data source $className does not support streamed writing")
+    }
+
+    datasourceClass.createSink(sqlContext, options, partitionColumns)
+  }
+
+  /** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */
+  def resolveRelation(): BaseRelation = {
+    val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+    val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
+      // TODO: Throw when too much is given.
+      case (dataSource: SchemaRelationProvider, Some(schema)) =>
+        dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
+      case (dataSource: RelationProvider, None) =>
+        dataSource.createRelation(sqlContext, caseInsensitiveOptions)
+      case (_: SchemaRelationProvider, None) =>
+        throw new AnalysisException(s"A schema needs to be specified when using $className.")
+      case (_: RelationProvider, Some(_)) =>
+        throw new AnalysisException(s"$className does not allow user-specified schemas.")
+
+      case (format: FileFormat, _) =>
+        val allPaths = caseInsensitiveOptions.get("path") ++ paths
+        val globbedPaths = allPaths.flatMap { path =>
+          val hdfsPath = new Path(path)
+          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+          SparkHadoopUtil.get.globPathIfNecessary(qualified)
+        }.toArray
+
+        val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
+        val dataSchema = userSpecifiedSchema.orElse {
+          format.inferSchema(
+            sqlContext,
+            caseInsensitiveOptions,
+            fileCatalog.allFiles())
+        }.getOrElse {
+          throw new AnalysisException(
+            s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
+            "It must be specified manually")
+        }
+
+        // If they gave a schema, then we try and figure out the types of the partition columns
+        // from that schema.
+        val partitionSchema = userSpecifiedSchema.map { schema =>
+          StructType(
+            partitionColumns.map { c =>
+              // TODO: Case sensitivity.
+              schema
+                .find(_.name.toLowerCase() == c.toLowerCase())
+                .getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
+          })
+        }.getOrElse(fileCatalog.partitionSpec(None).partitionColumns)
+
+        HadoopFsRelation(
+          sqlContext,
+          fileCatalog,
+          partitionSchema = partitionSchema,
+          dataSchema = dataSchema.asNullable,
+          bucketSpec = bucketSpec,
+          format,
+          options)
+
+      case _ =>
+        throw new AnalysisException(
+          s"$className is not a valid Spark SQL Data Source.")
+    }
+
+    relation
+  }
+
+  /** Writes the give [[DataFrame]] out to this [[DataSource]]. */
+  def write(
+      mode: SaveMode,
+      data: DataFrame): BaseRelation = {
+    if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
+      throw new AnalysisException("Cannot save interval data type into external storage.")
+    }
+
+    providingClass.newInstance() match {
+      case dataSource: CreatableRelationProvider =>
+        dataSource.createRelation(sqlContext, mode, options, data)
+      case format: FileFormat =>
+        // Don't glob path for the write path.  The contracts here are:
+        //  1. Only one output path can be specified on the write path;
+        //  2. Output path must be a legal HDFS style file system path;
+        //  3. It's OK that the output path doesn't exist yet;
+        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+        val outputPath = {
+          val path = new Path(caseInsensitiveOptions.getOrElse("path", {
+            throw new IllegalArgumentException("'path' is not specified")
+          }))
+          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        }
+
+        val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
+        PartitioningUtils.validatePartitionColumnDataTypes(
+          data.schema, partitionColumns, caseSensitive)
+
+        val equality =
+          if (sqlContext.conf.caseSensitiveAnalysis) {
+            org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+          } else {
+            org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+          }
+
+        val dataSchema = StructType(
+          data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
+
+        // If we are appending to a table that already exists, make sure the partitioning matches
+        // up.  If we fail to load the table for whatever reason, ignore the check.
+        if (mode == SaveMode.Append) {
+          val existingPartitionColumnSet = try {
+            Some(
+              resolveRelation()
+                .asInstanceOf[HadoopFsRelation]
+                .location
+                .partitionSpec(None)
+                .partitionColumns
+                .fieldNames
+                .toSet)
+          } catch {
+            case e: Exception =>
+              None
+          }
+
+          existingPartitionColumnSet.foreach { ex =>
+            if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) {
+              throw new AnalysisException(
+                s"Requested partitioning does not equal existing partitioning: " +
+                s"$ex != ${partitionColumns.toSet}.")
+            }
+          }
+        }
+
+        // For partitioned relation r, r.schema's column ordering can be different from the column
+        // ordering of data.logicalPlan (partition columns are all moved after data column).  This
+        // will be adjusted within InsertIntoHadoopFsRelation.
+        val plan =
+          InsertIntoHadoopFsRelation(
+            outputPath,
+            partitionColumns.map(UnresolvedAttribute.quoted),
+            bucketSpec,
+            format,
+            () => Unit, // No existing table needs to be refreshed.
+            options,
+            data.logicalPlan,
+            mode)
+        sqlContext.executePlan(plan).toRdd
+
+      case _ =>
+        sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
+    }
+
+    // We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
+    copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index eda3c36..c3f8d7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -335,10 +335,10 @@ private[sql] object PartitioningUtils {
 
   def validatePartitionColumnDataTypes(
       schema: StructType,
-      partitionColumns: Array[String],
+      partitionColumns: Seq[String],
       caseSensitive: Boolean): Unit = {
 
-    ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
+    partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
       field => field.dataType match {
         case _: AtomicType => // OK
         case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
@@ -346,6 +346,26 @@ private[sql] object PartitioningUtils {
     }
   }
 
+  def partitionColumnsSchema(
+      schema: StructType,
+      partitionColumns: Seq[String],
+      caseSensitive: Boolean): StructType = {
+    val equality = columnNameEquality(caseSensitive)
+    StructType(partitionColumns.map { col =>
+      schema.find(f => equality(f.name, col)).getOrElse {
+        throw new RuntimeException(s"Partition column $col not found in schema $schema")
+      }
+    }).asNullable
+  }
+
+  private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
+    if (caseSensitive) {
+      org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+    } else {
+      org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+    }
+  }
+
   /**
    * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
    * types.

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
deleted file mode 100644
index 8dd975e..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.datasources
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-import scala.language.{existentials, implicitConversions}
-import scala.util.{Failure, Success, Try}
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
-import org.apache.spark.util.Utils
-
-case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
-
-/**
- * Responsible for taking a description of a datasource (either from
- * [[org.apache.spark.sql.DataFrameReader]], or a metastore) and converting it into a logical
- * relation that can be used in a query plan.
- */
-object ResolvedDataSource extends Logging {
-
-  /** A map to maintain backward compatibility in case we move data sources around. */
-  private val backwardCompatibilityMap = Map(
-    "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName
-  )
-
-  /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider0: String): Class[_] = {
-    val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
-    val provider2 = s"$provider.DefaultSource"
-    val loader = Utils.getContextOrSparkClassLoader
-    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
-
-    serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match {
-      // the provider format did not match any given registered aliases
-      case Nil =>
-        Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
-          case Success(dataSource) =>
-            // Found the data source using fully qualified path
-            dataSource
-          case Failure(error) =>
-            if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
-              throw new ClassNotFoundException(
-                "The ORC data source must be used with Hive support enabled.", error)
-            } else {
-              if (provider == "avro" || provider == "com.databricks.spark.avro") {
-                throw new ClassNotFoundException(
-                  s"Failed to find data source: $provider. Please use Spark package " +
-                  "http://spark-packages.org/package/databricks/spark-avro",
-                  error)
-              } else {
-                throw new ClassNotFoundException(
-                  s"Failed to find data source: $provider. Please find packages at " +
-                  "http://spark-packages.org",
-                  error)
-              }
-            }
-        }
-      case head :: Nil =>
-        // there is exactly one registered alias
-        head.getClass
-      case sources =>
-        // There are multiple registered aliases for the input
-        sys.error(s"Multiple sources found for $provider " +
-          s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
-          "please specify the fully qualified class name.")
-    }
-  }
-
-  // TODO: Combine with apply?
-  def createSource(
-      sqlContext: SQLContext,
-      userSpecifiedSchema: Option[StructType],
-      providerName: String,
-      options: Map[String, String]): Source = {
-    val provider = lookupDataSource(providerName).newInstance() match {
-      case s: StreamSourceProvider =>
-        s.createSource(sqlContext, userSpecifiedSchema, providerName, options)
-
-      case format: FileFormat =>
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val path = caseInsensitiveOptions.getOrElse("path", {
-          throw new IllegalArgumentException("'path' is not specified")
-        })
-        val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
-
-        val allPaths = caseInsensitiveOptions.get("path")
-        val globbedPaths = allPaths.toSeq.flatMap { path =>
-          val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-          val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-          SparkHadoopUtil.get.globPathIfNecessary(qualified)
-        }.toArray
-
-        val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
-        val dataSchema = userSpecifiedSchema.orElse {
-          format.inferSchema(
-            sqlContext,
-            caseInsensitiveOptions,
-            fileCatalog.allFiles())
-        }.getOrElse {
-          throw new AnalysisException("Unable to infer schema.  It must be specified manually.")
-        }
-
-        def dataFrameBuilder(files: Array[String]): DataFrame = {
-          new DataFrame(
-            sqlContext,
-            LogicalRelation(
-              apply(
-                sqlContext,
-                paths = files,
-                userSpecifiedSchema = Some(dataSchema),
-                provider = providerName,
-                options = options.filterKeys(_ != "path")).relation))
-        }
-
-        new FileStreamSource(
-          sqlContext, metadataPath, path, Some(dataSchema), providerName, dataFrameBuilder)
-      case _ =>
-        throw new UnsupportedOperationException(
-          s"Data source $providerName does not support streamed reading")
-    }
-
-    provider
-  }
-
-  def createSink(
-      sqlContext: SQLContext,
-      providerName: String,
-      options: Map[String, String],
-      partitionColumns: Seq[String]): Sink = {
-    val provider = lookupDataSource(providerName).newInstance() match {
-      case s: StreamSinkProvider => s
-      case _ =>
-        throw new UnsupportedOperationException(
-          s"Data source $providerName does not support streamed writing")
-    }
-
-    provider.createSink(sqlContext, options, partitionColumns)
-  }
-
-  /** Create a [[ResolvedDataSource]] for reading data in. */
-  def apply(
-      sqlContext: SQLContext,
-      paths: Seq[String] = Nil,
-      userSpecifiedSchema: Option[StructType] = None,
-      partitionColumns: Array[String] = Array.empty,
-      bucketSpec: Option[BucketSpec] = None,
-      provider: String,
-      options: Map[String, String]): ResolvedDataSource = {
-    val clazz: Class[_] = lookupDataSource(provider)
-    def className: String = clazz.getCanonicalName
-
-    val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-    val relation = (clazz.newInstance(), userSpecifiedSchema) match {
-      // TODO: Throw when too much is given.
-      case (dataSource: SchemaRelationProvider, Some(schema)) =>
-        dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
-      case (dataSource: RelationProvider, None) =>
-        dataSource.createRelation(sqlContext, caseInsensitiveOptions)
-      case (_: SchemaRelationProvider, None) =>
-        throw new AnalysisException(s"A schema needs to be specified when using $className.")
-      case (_: RelationProvider, Some(_)) =>
-        throw new AnalysisException(s"$className does not allow user-specified schemas.")
-
-      case (format: FileFormat, _) =>
-        val allPaths = caseInsensitiveOptions.get("path") ++ paths
-        val globbedPaths = allPaths.flatMap { path =>
-          val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-          val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-          SparkHadoopUtil.get.globPathIfNecessary(qualified)
-        }.toArray
-
-        val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
-        val dataSchema = userSpecifiedSchema.orElse {
-          format.inferSchema(
-            sqlContext,
-            caseInsensitiveOptions,
-            fileCatalog.allFiles())
-        }.getOrElse {
-          throw new AnalysisException(
-            s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
-            "It must be specified manually")
-        }
-
-        // If they gave a schema, then we try and figure out the types of the partition columns
-        // from that schema.
-        val partitionSchema = userSpecifiedSchema.map { schema =>
-          StructType(
-            partitionColumns.map { c =>
-              // TODO: Case sensitivity.
-              schema
-                .find(_.name.toLowerCase() == c.toLowerCase())
-                .getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
-          })
-        }.getOrElse(fileCatalog.partitionSpec(None).partitionColumns)
-
-        HadoopFsRelation(
-          sqlContext,
-          fileCatalog,
-          partitionSchema = partitionSchema,
-          dataSchema = dataSchema.asNullable,
-          bucketSpec = bucketSpec,
-          format,
-          options)
-
-      case _ =>
-        throw new AnalysisException(
-          s"$className is not a valid Spark SQL Data Source.")
-    }
-    new ResolvedDataSource(clazz, relation)
-  }
-
-  def partitionColumnsSchema(
-      schema: StructType,
-      partitionColumns: Array[String],
-      caseSensitive: Boolean): StructType = {
-    val equality = columnNameEquality(caseSensitive)
-    StructType(partitionColumns.map { col =>
-      schema.find(f => equality(f.name, col)).getOrElse {
-        throw new RuntimeException(s"Partition column $col not found in schema $schema")
-      }
-    }).asNullable
-  }
-
-  private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
-    if (caseSensitive) {
-      org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
-    } else {
-      org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
-    }
-  }
-
-  /** Create a [[ResolvedDataSource]] for saving the content of the given DataFrame. */
-  def apply(
-      sqlContext: SQLContext,
-      provider: String,
-      partitionColumns: Array[String],
-      bucketSpec: Option[BucketSpec],
-      mode: SaveMode,
-      options: Map[String, String],
-      data: DataFrame): ResolvedDataSource = {
-    if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
-      throw new AnalysisException("Cannot save interval data type into external storage.")
-    }
-    val clazz: Class[_] = lookupDataSource(provider)
-    clazz.newInstance() match {
-      case dataSource: CreatableRelationProvider =>
-        dataSource.createRelation(sqlContext, mode, options, data)
-      case format: FileFormat =>
-        // Don't glob path for the write path.  The contracts here are:
-        //  1. Only one output path can be specified on the write path;
-        //  2. Output path must be a legal HDFS style file system path;
-        //  3. It's OK that the output path doesn't exist yet;
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val outputPath = {
-          val path = new Path(caseInsensitiveOptions.getOrElse("path", {
-            throw new IllegalArgumentException("'path' is not specified")
-          }))
-          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        }
-
-        val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
-        PartitioningUtils.validatePartitionColumnDataTypes(
-          data.schema, partitionColumns, caseSensitive)
-
-        val equality = columnNameEquality(caseSensitive)
-        val dataSchema = StructType(
-          data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
-
-        // If we are appending to a table that already exists, make sure the partitioning matches
-        // up.  If we fail to load the table for whatever reason, ignore the check.
-        if (mode == SaveMode.Append) {
-          val existingPartitionColumnSet = try {
-            val resolved = apply(
-              sqlContext,
-              userSpecifiedSchema = Some(data.schema.asNullable),
-              provider = provider,
-              options = options)
-
-            Some(resolved.relation
-              .asInstanceOf[HadoopFsRelation]
-              .location
-              .partitionSpec(None)
-              .partitionColumns
-              .fieldNames
-              .toSet)
-          } catch {
-            case e: Exception =>
-              None
-          }
-
-          existingPartitionColumnSet.foreach { ex =>
-            if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) {
-              throw new AnalysisException(
-                s"Requested partitioning does not equal existing partitioning: " +
-                s"$ex != ${partitionColumns.toSet}.")
-            }
-          }
-        }
-
-        // For partitioned relation r, r.schema's column ordering can be different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after data column).  This
-        // will be adjusted within InsertIntoHadoopFsRelation.
-        val plan =
-          InsertIntoHadoopFsRelation(
-            outputPath,
-            partitionColumns.map(UnresolvedAttribute.quoted),
-            bucketSpec,
-            format,
-            () => Unit, // No existing table needs to be refreshed.
-            options,
-            data.logicalPlan,
-            mode)
-        sqlContext.executePlan(plan).toRdd
-
-      case _ =>
-        sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
-    }
-
-    apply(
-      sqlContext,
-      userSpecifiedSchema = Some(data.schema.asNullable),
-      partitionColumns = partitionColumns,
-      bucketSpec = bucketSpec,
-      provider = provider,
-      options = options)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 3d7c6a6..895794c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.types._
 
 /**
  * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ *
  * @param table The table to be described.
  * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
  *                   It is effective only when the table is a Hive table.
@@ -50,6 +51,7 @@ case class DescribeCommand(
 
 /**
   * Used to represent the operation of create table using a data source.
+ *
   * @param allowExisting If it is true, we will do nothing when the table already exists.
   *                      If it is false, an exception will be thrown
   */
@@ -91,14 +93,14 @@ case class CreateTempTableUsing(
     options: Map[String, String]) extends RunnableCommand {
 
   def run(sqlContext: SQLContext): Seq[Row] = {
-    val resolved = ResolvedDataSource(
+    val dataSource = DataSource(
       sqlContext,
       userSpecifiedSchema = userSpecifiedSchema,
-      provider = provider,
+      className = provider,
       options = options)
     sqlContext.catalog.registerTable(
       tableIdent,
-      DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
+      DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
 
     Seq.empty[Row]
   }
@@ -114,17 +116,16 @@ case class CreateTempTableUsingAsSelect(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val df = DataFrame(sqlContext, query)
-    val resolved = ResolvedDataSource(
+    val dataSource = DataSource(
       sqlContext,
-      provider,
-      partitionColumns,
+      className = provider,
+      partitionColumns = partitionColumns,
       bucketSpec = None,
-      mode,
-      options,
-      df)
+      options = options)
+    val result = dataSource.write(mode, df)
     sqlContext.catalog.registerTable(
       tableIdent,
-      DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
+      DataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
 
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 0eae346..63f0e4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -32,15 +32,11 @@ private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[Logica
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
       try {
-        val resolved = ResolvedDataSource(
+        val dataSource = DataSource(
           sqlContext,
-          paths = Seq.empty,
-          userSpecifiedSchema = None,
-          partitionColumns = Array(),
-          bucketSpec = None,
-          provider = u.tableIdentifier.database.get,
-          options = Map("path" -> u.tableIdentifier.table))
-        val plan = LogicalRelation(resolved.relation)
+          paths = u.tableIdentifier.table :: Nil,
+          className = u.tableIdentifier.database.get)
+        val plan = LogicalRelation(dataSource.resolveRelation())
         u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan)
       } catch {
         case e: ClassNotFoundException => u
@@ -143,7 +139,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         }
 
         PartitioningUtils.validatePartitionColumnDataTypes(
-          r.schema, part.keySet.toArray, catalog.conf.caseSensitiveAnalysis)
+          r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis)
 
         // Get all input data source relations of the query.
         val srcRelations = query.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 12512a8..60b0c64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -422,7 +422,7 @@ case class HadoopFsRelation(
 }
 
 /**
- * Used to read a write data in files to [[InternalRow]] format.
+ * Used to read and write data stored in files to/from the [[InternalRow]] format.
  */
 trait FileFormat {
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 2f17037..02b173d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -32,7 +32,7 @@ import org.scalactic.Tolerance._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -1178,21 +1178,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       sparkContext.parallelize(1 to 100)
         .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
 
-      val d1 = ResolvedDataSource(
+      val d1 = DataSource(
         sqlContext,
         userSpecifiedSchema = None,
         partitionColumns = Array.empty[String],
         bucketSpec = None,
-        provider = classOf[DefaultSource].getCanonicalName,
-        options = Map("path" -> path))
+        className = classOf[DefaultSource].getCanonicalName,
+        options = Map("path" -> path)).resolveRelation()
 
-      val d2 = ResolvedDataSource(
+      val d2 = DataSource(
         sqlContext,
         userSpecifiedSchema = None,
         partitionColumns = Array.empty[String],
         bucketSpec = None,
-        provider = classOf[DefaultSource].getCanonicalName,
-        options = Map("path" -> path))
+        className = classOf[DefaultSource].getCanonicalName,
+        options = Map("path" -> path)).resolveRelation()
       assert(d1 === d2)
     })
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index cb6e517..94d032f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -18,59 +18,61 @@
 package org.apache.spark.sql.sources
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.datasources.ResolvedDataSource
+import org.apache.spark.sql.execution.datasources.DataSource
 
 class ResolvedDataSourceSuite extends SparkFunSuite {
+  private def getProvidingClass(name: String): Class[_] =
+    DataSource(sqlContext = null, className = name).providingClass
 
   test("jdbc") {
     assert(
-      ResolvedDataSource.lookupDataSource("jdbc") ===
+      getProvidingClass("jdbc") ===
       classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
     assert(
-      ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.jdbc") ===
+      getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") ===
       classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
     assert(
-      ResolvedDataSource.lookupDataSource("org.apache.spark.sql.jdbc") ===
+      getProvidingClass("org.apache.spark.sql.jdbc") ===
         classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
   }
 
   test("json") {
     assert(
-      ResolvedDataSource.lookupDataSource("json") ===
+      getProvidingClass("json") ===
       classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
     assert(
-      ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.json") ===
+      getProvidingClass("org.apache.spark.sql.execution.datasources.json") ===
         classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
     assert(
-      ResolvedDataSource.lookupDataSource("org.apache.spark.sql.json") ===
+      getProvidingClass("org.apache.spark.sql.json") ===
         classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
   }
 
   test("parquet") {
     assert(
-      ResolvedDataSource.lookupDataSource("parquet") ===
+      getProvidingClass("parquet") ===
       classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
     assert(
-      ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.parquet") ===
+      getProvidingClass("org.apache.spark.sql.execution.datasources.parquet") ===
         classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
     assert(
-      ResolvedDataSource.lookupDataSource("org.apache.spark.sql.parquet") ===
+      getProvidingClass("org.apache.spark.sql.parquet") ===
         classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
   }
 
   test("error message for unknown data sources") {
     val error1 = intercept[ClassNotFoundException] {
-      ResolvedDataSource.lookupDataSource("avro")
+      getProvidingClass("avro")
     }
     assert(error1.getMessage.contains("spark-packages"))
 
     val error2 = intercept[ClassNotFoundException] {
-      ResolvedDataSource.lookupDataSource("com.databricks.spark.avro")
+      getProvidingClass("com.databricks.spark.avro")
     }
     assert(error2.getMessage.contains("spark-packages"))
 
     val error3 = intercept[ClassNotFoundException] {
-      ResolvedDataSource.lookupDataSource("asfdwefasdfasdf")
+      getProvidingClass("asfdwefasdfasdf")
     }
     assert(error3.getMessage.contains("spark-packages"))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2887418..8f6cd66 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -176,17 +176,17 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         }
 
         val options = table.storage.serdeProperties
-        val resolvedRelation =
-          ResolvedDataSource(
+        val dataSource =
+          DataSource(
             hive,
             userSpecifiedSchema = userSpecifiedSchema,
-            partitionColumns = partitionColumns.toArray,
+            partitionColumns = partitionColumns,
             bucketSpec = bucketSpec,
-            provider = table.properties("spark.sql.sources.provider"),
+            className = table.properties("spark.sql.sources.provider"),
             options = options)
 
         LogicalRelation(
-          resolvedRelation.relation,
+          dataSource.resolveRelation(),
           metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
       }
     }
@@ -283,12 +283,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
     val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
     val dataSource =
-      ResolvedDataSource(
+      DataSource(
         hive,
         userSpecifiedSchema = userSpecifiedSchema,
         partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
-        provider = provider,
+        className = provider,
         options = options)
 
     def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
@@ -334,7 +334,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
     // TODO: Support persisting partitioned data source relations in Hive compatible format
     val qualifiedTableName = tableIdent.quotedString
     val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
-    val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+    val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
       case _ if skipHiveMetadata =>
         val message =
           s"Persisting partitioned data source relation $qualifiedTableName into " +
@@ -511,7 +511,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
       val parquetRelation = cached.getOrElse {
         val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
-        val fileCatalog = new HiveFileCatalog(hive, paths, partitionSpec)
+        val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
         val format = new DefaultSource()
         val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles())
 
@@ -541,12 +541,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
       val parquetRelation = cached.getOrElse {
         val created =
           LogicalRelation(
-            ResolvedDataSource(
+            DataSource(
               sqlContext = hive,
               paths = paths,
               userSpecifiedSchema = Some(metastoreRelation.schema),
               options = parquetOptions,
-              provider = "parquet").relation)
+              className = "parquet").resolveRelation())
 
         cachedDataSourceTables.put(tableIdentifier, created)
         created
@@ -749,7 +749,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
  * An override of the standard HDFS listing based catalog, that overrides the partition spec with
  * the information from the metastore.
  */
-class HiveFileCatalog(
+class MetaStoreFileCatalog(
     hive: HiveContext,
     paths: Seq[Path],
     partitionSpecFromHive: PartitionSpec)

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 37cec6d..7e4fb8b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{BucketSpec, LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -148,12 +148,12 @@ case class CreateMetastoreDataSource(
       }
 
     // Create the relation to validate the arguments before writing the metadata to the metastore.
-    ResolvedDataSource(
+    DataSource(
       sqlContext = sqlContext,
       userSpecifiedSchema = userSpecifiedSchema,
-      provider = provider,
+      className = provider,
       bucketSpec = None,
-      options = optionsWithPath)
+      options = optionsWithPath).resolveRelation()
 
     hiveContext.catalog.createDataSourceTable(
       tableIdent,
@@ -220,15 +220,16 @@ case class CreateMetastoreDataSourceAsSelect(
           return Seq.empty[Row]
         case SaveMode.Append =>
           // Check if the specified data source match the data source of the existing table.
-          val resolved = ResolvedDataSource(
+          val dataSource = DataSource(
             sqlContext = sqlContext,
             userSpecifiedSchema = Some(query.schema.asNullable),
             partitionColumns = partitionColumns,
             bucketSpec = bucketSpec,
-            provider = provider,
+            className = provider,
             options = optionsWithPath)
           // TODO: Check that options from the resolved relation match the relation that we are
           // inserting into (i.e. using the same compression).
+
           EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match {
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
               existingSchema = Some(l.schema)
@@ -248,19 +249,19 @@ case class CreateMetastoreDataSourceAsSelect(
     val data = DataFrame(hiveContext, query)
     val df = existingSchema match {
       // If we are inserting into an existing table, just use the existing schema.
-      case Some(schema) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, schema)
+      case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s)
       case None => data
     }
 
     // Create the relation based on the data of df.
-    val resolved = ResolvedDataSource(
+    val dataSource = DataSource(
       sqlContext,
-      provider,
-      partitionColumns,
-      bucketSpec,
-      mode,
-      optionsWithPath,
-      df)
+      className = provider,
+      partitionColumns = partitionColumns,
+      bucketSpec = bucketSpec,
+      options = optionsWithPath)
+
+    val result = dataSource.write(mode, df)
 
     if (createMetastoreTable) {
       // We will use the schema of resolved.relation as the schema of the table (instead of
@@ -268,7 +269,7 @@ case class CreateMetastoreDataSourceAsSelect(
       // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
       hiveContext.catalog.createDataSourceTable(
         tableIdent,
-        Some(resolved.relation.schema),
+        Some(result.schema),
         partitionColumns,
         bucketSpec,
         provider,

http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index ad832b5..041e0fb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.orc
 
 import java.util.Properties
 
-import com.google.common.base.Objects
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -39,7 +38,7 @@ import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -173,7 +172,7 @@ private[orc] class OrcOutputWriter(
   }
 
   override def write(row: Row): Unit =
-    throw new UnsupportedOperationException("call  writeInternal")
+    throw new UnsupportedOperationException("call writeInternal")
 
   private def wrapOrcStruct(
       struct: OrcStruct,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org