You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/09 00:19:29 UTC
spark git commit: [SPARK-13738][SQL] Cleanup Data Source resolution
Repository: spark
Updated Branches:
refs/heads/master 076009b94 -> 1e2884059
[SPARK-13738][SQL] Cleanup Data Source resolution
Follow-up to #11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`.
- Multiple functions share the same set of arguments so we make this a case class, called `DataSource`. Actual resolution is now done by calling a function on this class.
- Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`.
- Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places.
Author: Michael Armbrust <mi...@databricks.com>
Closes #11572 from marmbrus/dataSourceResolution.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e288405
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e288405
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e288405
Branch: refs/heads/master
Commit: 1e28840594b9d972c96d3922ca0bf0f76e313e82
Parents: 076009b
Author: Michael Armbrust <mi...@databricks.com>
Authored: Tue Mar 8 15:19:26 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Mar 8 15:19:26 2016 -0800
----------------------------------------------------------------------
.../org/apache/spark/sql/DataFrameReader.scala | 34 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 29 +-
.../sql/execution/datasources/DataSource.scala | 338 +++++++++++++++++
.../datasources/PartitioningUtils.scala | 24 +-
.../datasources/ResolvedDataSource.scala | 360 -------------------
.../spark/sql/execution/datasources/ddl.scala | 21 +-
.../spark/sql/execution/datasources/rules.scala | 14 +-
.../apache/spark/sql/sources/interfaces.scala | 2 +-
.../execution/datasources/json/JsonSuite.scala | 14 +-
.../sql/sources/ResolvedDataSourceSuite.scala | 28 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 24 +-
.../spark/sql/hive/execution/commands.scala | 31 +-
.../apache/spark/sql/hive/orc/OrcRelation.scala | 5 +-
13 files changed, 462 insertions(+), 462 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index fd92e52..509b299 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.LogicalRDD
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.streaming.StreamingRelation
@@ -122,12 +122,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def load(): DataFrame = {
- val resolved = ResolvedDataSource(
- sqlContext,
- userSpecifiedSchema = userSpecifiedSchema,
- provider = source,
- options = extraOptions.toMap)
- DataFrame(sqlContext, LogicalRelation(resolved.relation))
+ val dataSource =
+ DataSource(
+ sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ className = source,
+ options = extraOptions.toMap)
+ DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
}
/**
@@ -152,12 +153,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
sqlContext.emptyDataFrame
} else {
sqlContext.baseRelationToDataFrame(
- ResolvedDataSource.apply(
+ DataSource.apply(
sqlContext,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
- provider = source,
- options = extraOptions.toMap).relation)
+ className = source,
+ options = extraOptions.toMap).resolveRelation())
}
}
@@ -168,12 +169,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 2.0.0
*/
def stream(): DataFrame = {
- val resolved = ResolvedDataSource.createSource(
- sqlContext,
- userSpecifiedSchema = userSpecifiedSchema,
- providerName = source,
- options = extraOptions.toMap)
- DataFrame(sqlContext, StreamingRelation(resolved))
+ val dataSource =
+ DataSource(
+ sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ className = source,
+ options = extraOptions.toMap)
+ DataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6d8c8f6..78f30f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
-import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.sources.HadoopFsRelation
@@ -195,14 +195,14 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
def save(): Unit = {
assertNotBucketed()
- ResolvedDataSource(
+ val dataSource = DataSource(
df.sqlContext,
- source,
- partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
- getBucketSpec,
- mode,
- extraOptions.toMap,
- df)
+ className = source,
+ partitionColumns = partitioningColumns.getOrElse(Nil),
+ bucketSpec = getBucketSpec,
+ options = extraOptions.toMap)
+
+ dataSource.write(mode, df)
}
/**
@@ -235,14 +235,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 2.0.0
*/
def stream(): ContinuousQuery = {
- val sink = ResolvedDataSource.createSink(
- df.sqlContext,
- source,
- extraOptions.toMap,
- normalizedParCols.getOrElse(Nil))
+ val dataSource =
+ DataSource(
+ df.sqlContext,
+ className = source,
+ options = extraOptions.toMap,
+ partitionColumns = normalizedParCols.getOrElse(Nil))
df.sqlContext.continuousQueryManager.startQuery(
- extraOptions.getOrElse("queryName", StreamExecution.nextName), df, sink)
+ extraOptions.getOrElse("queryName", StreamExecution.nextName), df, dataSource.createSink())
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
new file mode 100644
index 0000000..e90e72d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -0,0 +1,338 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+import scala.language.{existentials, implicitConversions}
+import scala.util.{Failure, Success, Try}
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
+import org.apache.spark.util.Utils
+
+/**
+ * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
+ * acting as the canonical set of parameters that can describe a Data Source, this class is used to
+ * resolve a description to a concrete implementation that can be used in a query plan
+ * (either batch or streaming) or to write out data using an external library.
+ *
+ * From an end user's perspective a DataSource description can be created explicitly using
+ * [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL. Additionally, this class is
+ * used when resolving a description from a metastore to a concrete implementation.
+ *
+ * Many of the arguments to this class are optional, though depending on the specific API being used
+ * these optional arguments might be filled in during resolution using either inference or external
+ * metadata. For example, when reading a partitioned table from a file system, partition columns
+ * will be inferred from the directory layout even if they are not specified.
+ *
+ * @param paths A list of file system paths that hold data. These will be globbed before and
+ * qualified. This option only works when reading from a [[FileFormat]].
+ * @param userSpecifiedSchema An optional specification of the schema of the data. When present
+ * we skip attempting to infer the schema.
+ * @param partitionColumns A list of column names that the relation is partitioned by. When this
+ * list is empty, the relation is unpartitioned.
+ * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
+ */
+case class DataSource(
+ sqlContext: SQLContext,
+ className: String,
+ paths: Seq[String] = Nil,
+ userSpecifiedSchema: Option[StructType] = None,
+ partitionColumns: Seq[String] = Seq.empty,
+ bucketSpec: Option[BucketSpec] = None,
+ options: Map[String, String] = Map.empty) extends Logging {
+
+ lazy val providingClass: Class[_] = lookupDataSource(className)
+
+ /** A map to maintain backward compatibility in case we move data sources around. */
+ private val backwardCompatibilityMap = Map(
+ "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName
+ )
+
+ /** Given a provider name, look up the data source class definition. */
+ private def lookupDataSource(provider0: String): Class[_] = {
+ val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
+ val provider2 = s"$provider.DefaultSource"
+ val loader = Utils.getContextOrSparkClassLoader
+ val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+ serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match {
+ // the provider format did not match any given registered aliases
+ case Nil =>
+ Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
+ case Success(dataSource) =>
+ // Found the data source using fully qualified path
+ dataSource
+ case Failure(error) =>
+ if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+ throw new ClassNotFoundException(
+ "The ORC data source must be used with Hive support enabled.", error)
+ } else {
+ if (provider == "avro" || provider == "com.databricks.spark.avro") {
+ throw new ClassNotFoundException(
+ s"Failed to find data source: $provider. Please use Spark package " +
+ "http://spark-packages.org/package/databricks/spark-avro",
+ error)
+ } else {
+ throw new ClassNotFoundException(
+ s"Failed to find data source: $provider. Please find packages at " +
+ "http://spark-packages.org",
+ error)
+ }
+ }
+ }
+ case head :: Nil =>
+ // there is exactly one registered alias
+ head.getClass
+ case sources =>
+ // There are multiple registered aliases for the input
+ sys.error(s"Multiple sources found for $provider " +
+ s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+ "please specify the fully qualified class name.")
+ }
+ }
+
+ /** Returns a source that can be used to continually read data. */
+ def createSource(): Source = {
+ providingClass.newInstance() match {
+ case s: StreamSourceProvider =>
+ s.createSource(sqlContext, userSpecifiedSchema, className, options)
+
+ case format: FileFormat =>
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val path = caseInsensitiveOptions.getOrElse("path", {
+ throw new IllegalArgumentException("'path' is not specified")
+ })
+ val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
+
+ val allPaths = caseInsensitiveOptions.get("path")
+ val globbedPaths = allPaths.toSeq.flatMap { path =>
+ val hdfsPath = new Path(path)
+ val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualified)
+ }.toArray
+
+ val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
+ val dataSchema = userSpecifiedSchema.orElse {
+ format.inferSchema(
+ sqlContext,
+ caseInsensitiveOptions,
+ fileCatalog.allFiles())
+ }.getOrElse {
+ throw new AnalysisException("Unable to infer schema. It must be specified manually.")
+ }
+
+ def dataFrameBuilder(files: Array[String]): DataFrame = {
+ new DataFrame(
+ sqlContext,
+ LogicalRelation(
+ DataSource(
+ sqlContext,
+ paths = files,
+ userSpecifiedSchema = Some(dataSchema),
+ className = className,
+ options = options.filterKeys(_ != "path")).resolveRelation()))
+ }
+
+ new FileStreamSource(
+ sqlContext, metadataPath, path, Some(dataSchema), className, dataFrameBuilder)
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Data source $className does not support streamed reading")
+ }
+ }
+
+ /** Returns a sink that can be used to continually write data. */
+ def createSink(): Sink = {
+ val datasourceClass = providingClass.newInstance() match {
+ case s: StreamSinkProvider => s
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Data source $className does not support streamed writing")
+ }
+
+ datasourceClass.createSink(sqlContext, options, partitionColumns)
+ }
+
+ /** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */
+ def resolveRelation(): BaseRelation = {
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
+ // TODO: Throw when too much is given.
+ case (dataSource: SchemaRelationProvider, Some(schema)) =>
+ dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
+ case (dataSource: RelationProvider, None) =>
+ dataSource.createRelation(sqlContext, caseInsensitiveOptions)
+ case (_: SchemaRelationProvider, None) =>
+ throw new AnalysisException(s"A schema needs to be specified when using $className.")
+ case (_: RelationProvider, Some(_)) =>
+ throw new AnalysisException(s"$className does not allow user-specified schemas.")
+
+ case (format: FileFormat, _) =>
+ val allPaths = caseInsensitiveOptions.get("path") ++ paths
+ val globbedPaths = allPaths.flatMap { path =>
+ val hdfsPath = new Path(path)
+ val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualified)
+ }.toArray
+
+ val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
+ val dataSchema = userSpecifiedSchema.orElse {
+ format.inferSchema(
+ sqlContext,
+ caseInsensitiveOptions,
+ fileCatalog.allFiles())
+ }.getOrElse {
+ throw new AnalysisException(
+ s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
+ "It must be specified manually")
+ }
+
+ // If they gave a schema, then we try and figure out the types of the partition columns
+ // from that schema.
+ val partitionSchema = userSpecifiedSchema.map { schema =>
+ StructType(
+ partitionColumns.map { c =>
+ // TODO: Case sensitivity.
+ schema
+ .find(_.name.toLowerCase() == c.toLowerCase())
+ .getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
+ })
+ }.getOrElse(fileCatalog.partitionSpec(None).partitionColumns)
+
+ HadoopFsRelation(
+ sqlContext,
+ fileCatalog,
+ partitionSchema = partitionSchema,
+ dataSchema = dataSchema.asNullable,
+ bucketSpec = bucketSpec,
+ format,
+ options)
+
+ case _ =>
+ throw new AnalysisException(
+ s"$className is not a valid Spark SQL Data Source.")
+ }
+
+ relation
+ }
+
+ /** Writes the give [[DataFrame]] out to this [[DataSource]]. */
+ def write(
+ mode: SaveMode,
+ data: DataFrame): BaseRelation = {
+ if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
+ throw new AnalysisException("Cannot save interval data type into external storage.")
+ }
+
+ providingClass.newInstance() match {
+ case dataSource: CreatableRelationProvider =>
+ dataSource.createRelation(sqlContext, mode, options, data)
+ case format: FileFormat =>
+ // Don't glob path for the write path. The contracts here are:
+ // 1. Only one output path can be specified on the write path;
+ // 2. Output path must be a legal HDFS style file system path;
+ // 3. It's OK that the output path doesn't exist yet;
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val outputPath = {
+ val path = new Path(caseInsensitiveOptions.getOrElse("path", {
+ throw new IllegalArgumentException("'path' is not specified")
+ }))
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+
+ val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
+ PartitioningUtils.validatePartitionColumnDataTypes(
+ data.schema, partitionColumns, caseSensitive)
+
+ val equality =
+ if (sqlContext.conf.caseSensitiveAnalysis) {
+ org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+ } else {
+ org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+ }
+
+ val dataSchema = StructType(
+ data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
+
+ // If we are appending to a table that already exists, make sure the partitioning matches
+ // up. If we fail to load the table for whatever reason, ignore the check.
+ if (mode == SaveMode.Append) {
+ val existingPartitionColumnSet = try {
+ Some(
+ resolveRelation()
+ .asInstanceOf[HadoopFsRelation]
+ .location
+ .partitionSpec(None)
+ .partitionColumns
+ .fieldNames
+ .toSet)
+ } catch {
+ case e: Exception =>
+ None
+ }
+
+ existingPartitionColumnSet.foreach { ex =>
+ if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) {
+ throw new AnalysisException(
+ s"Requested partitioning does not equal existing partitioning: " +
+ s"$ex != ${partitionColumns.toSet}.")
+ }
+ }
+ }
+
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). This
+ // will be adjusted within InsertIntoHadoopFsRelation.
+ val plan =
+ InsertIntoHadoopFsRelation(
+ outputPath,
+ partitionColumns.map(UnresolvedAttribute.quoted),
+ bucketSpec,
+ format,
+ () => Unit, // No existing table needs to be refreshed.
+ options,
+ data.logicalPlan,
+ mode)
+ sqlContext.executePlan(plan).toRdd
+
+ case _ =>
+ sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
+ }
+
+ // We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
+ copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index eda3c36..c3f8d7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -335,10 +335,10 @@ private[sql] object PartitioningUtils {
def validatePartitionColumnDataTypes(
schema: StructType,
- partitionColumns: Array[String],
+ partitionColumns: Seq[String],
caseSensitive: Boolean): Unit = {
- ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
+ partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
field => field.dataType match {
case _: AtomicType => // OK
case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
@@ -346,6 +346,26 @@ private[sql] object PartitioningUtils {
}
}
+ def partitionColumnsSchema(
+ schema: StructType,
+ partitionColumns: Seq[String],
+ caseSensitive: Boolean): StructType = {
+ val equality = columnNameEquality(caseSensitive)
+ StructType(partitionColumns.map { col =>
+ schema.find(f => equality(f.name, col)).getOrElse {
+ throw new RuntimeException(s"Partition column $col not found in schema $schema")
+ }
+ }).asNullable
+ }
+
+ private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
+ if (caseSensitive) {
+ org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+ } else {
+ org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+ }
+ }
+
/**
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
deleted file mode 100644
index 8dd975e..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.datasources
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-import scala.language.{existentials, implicitConversions}
-import scala.util.{Failure, Success, Try}
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
-import org.apache.spark.util.Utils
-
-case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
-
-/**
- * Responsible for taking a description of a datasource (either from
- * [[org.apache.spark.sql.DataFrameReader]], or a metastore) and converting it into a logical
- * relation that can be used in a query plan.
- */
-object ResolvedDataSource extends Logging {
-
- /** A map to maintain backward compatibility in case we move data sources around. */
- private val backwardCompatibilityMap = Map(
- "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
- "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName
- )
-
- /** Given a provider name, look up the data source class definition. */
- def lookupDataSource(provider0: String): Class[_] = {
- val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
- val provider2 = s"$provider.DefaultSource"
- val loader = Utils.getContextOrSparkClassLoader
- val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
-
- serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match {
- // the provider format did not match any given registered aliases
- case Nil =>
- Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
- case Success(dataSource) =>
- // Found the data source using fully qualified path
- dataSource
- case Failure(error) =>
- if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
- throw new ClassNotFoundException(
- "The ORC data source must be used with Hive support enabled.", error)
- } else {
- if (provider == "avro" || provider == "com.databricks.spark.avro") {
- throw new ClassNotFoundException(
- s"Failed to find data source: $provider. Please use Spark package " +
- "http://spark-packages.org/package/databricks/spark-avro",
- error)
- } else {
- throw new ClassNotFoundException(
- s"Failed to find data source: $provider. Please find packages at " +
- "http://spark-packages.org",
- error)
- }
- }
- }
- case head :: Nil =>
- // there is exactly one registered alias
- head.getClass
- case sources =>
- // There are multiple registered aliases for the input
- sys.error(s"Multiple sources found for $provider " +
- s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
- "please specify the fully qualified class name.")
- }
- }
-
- // TODO: Combine with apply?
- def createSource(
- sqlContext: SQLContext,
- userSpecifiedSchema: Option[StructType],
- providerName: String,
- options: Map[String, String]): Source = {
- val provider = lookupDataSource(providerName).newInstance() match {
- case s: StreamSourceProvider =>
- s.createSource(sqlContext, userSpecifiedSchema, providerName, options)
-
- case format: FileFormat =>
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val path = caseInsensitiveOptions.getOrElse("path", {
- throw new IllegalArgumentException("'path' is not specified")
- })
- val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
-
- val allPaths = caseInsensitiveOptions.get("path")
- val globbedPaths = allPaths.toSeq.flatMap { path =>
- val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualified)
- }.toArray
-
- val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
- val dataSchema = userSpecifiedSchema.orElse {
- format.inferSchema(
- sqlContext,
- caseInsensitiveOptions,
- fileCatalog.allFiles())
- }.getOrElse {
- throw new AnalysisException("Unable to infer schema. It must be specified manually.")
- }
-
- def dataFrameBuilder(files: Array[String]): DataFrame = {
- new DataFrame(
- sqlContext,
- LogicalRelation(
- apply(
- sqlContext,
- paths = files,
- userSpecifiedSchema = Some(dataSchema),
- provider = providerName,
- options = options.filterKeys(_ != "path")).relation))
- }
-
- new FileStreamSource(
- sqlContext, metadataPath, path, Some(dataSchema), providerName, dataFrameBuilder)
- case _ =>
- throw new UnsupportedOperationException(
- s"Data source $providerName does not support streamed reading")
- }
-
- provider
- }
-
- def createSink(
- sqlContext: SQLContext,
- providerName: String,
- options: Map[String, String],
- partitionColumns: Seq[String]): Sink = {
- val provider = lookupDataSource(providerName).newInstance() match {
- case s: StreamSinkProvider => s
- case _ =>
- throw new UnsupportedOperationException(
- s"Data source $providerName does not support streamed writing")
- }
-
- provider.createSink(sqlContext, options, partitionColumns)
- }
-
- /** Create a [[ResolvedDataSource]] for reading data in. */
- def apply(
- sqlContext: SQLContext,
- paths: Seq[String] = Nil,
- userSpecifiedSchema: Option[StructType] = None,
- partitionColumns: Array[String] = Array.empty,
- bucketSpec: Option[BucketSpec] = None,
- provider: String,
- options: Map[String, String]): ResolvedDataSource = {
- val clazz: Class[_] = lookupDataSource(provider)
- def className: String = clazz.getCanonicalName
-
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val relation = (clazz.newInstance(), userSpecifiedSchema) match {
- // TODO: Throw when too much is given.
- case (dataSource: SchemaRelationProvider, Some(schema)) =>
- dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
- case (dataSource: RelationProvider, None) =>
- dataSource.createRelation(sqlContext, caseInsensitiveOptions)
- case (_: SchemaRelationProvider, None) =>
- throw new AnalysisException(s"A schema needs to be specified when using $className.")
- case (_: RelationProvider, Some(_)) =>
- throw new AnalysisException(s"$className does not allow user-specified schemas.")
-
- case (format: FileFormat, _) =>
- val allPaths = caseInsensitiveOptions.get("path") ++ paths
- val globbedPaths = allPaths.flatMap { path =>
- val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualified)
- }.toArray
-
- val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
- val dataSchema = userSpecifiedSchema.orElse {
- format.inferSchema(
- sqlContext,
- caseInsensitiveOptions,
- fileCatalog.allFiles())
- }.getOrElse {
- throw new AnalysisException(
- s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
- "It must be specified manually")
- }
-
- // If they gave a schema, then we try and figure out the types of the partition columns
- // from that schema.
- val partitionSchema = userSpecifiedSchema.map { schema =>
- StructType(
- partitionColumns.map { c =>
- // TODO: Case sensitivity.
- schema
- .find(_.name.toLowerCase() == c.toLowerCase())
- .getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
- })
- }.getOrElse(fileCatalog.partitionSpec(None).partitionColumns)
-
- HadoopFsRelation(
- sqlContext,
- fileCatalog,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema.asNullable,
- bucketSpec = bucketSpec,
- format,
- options)
-
- case _ =>
- throw new AnalysisException(
- s"$className is not a valid Spark SQL Data Source.")
- }
- new ResolvedDataSource(clazz, relation)
- }
-
- def partitionColumnsSchema(
- schema: StructType,
- partitionColumns: Array[String],
- caseSensitive: Boolean): StructType = {
- val equality = columnNameEquality(caseSensitive)
- StructType(partitionColumns.map { col =>
- schema.find(f => equality(f.name, col)).getOrElse {
- throw new RuntimeException(s"Partition column $col not found in schema $schema")
- }
- }).asNullable
- }
-
- private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
- if (caseSensitive) {
- org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
- } else {
- org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
- }
- }
-
- /** Create a [[ResolvedDataSource]] for saving the content of the given DataFrame. */
- def apply(
- sqlContext: SQLContext,
- provider: String,
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
- mode: SaveMode,
- options: Map[String, String],
- data: DataFrame): ResolvedDataSource = {
- if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
- throw new AnalysisException("Cannot save interval data type into external storage.")
- }
- val clazz: Class[_] = lookupDataSource(provider)
- clazz.newInstance() match {
- case dataSource: CreatableRelationProvider =>
- dataSource.createRelation(sqlContext, mode, options, data)
- case format: FileFormat =>
- // Don't glob path for the write path. The contracts here are:
- // 1. Only one output path can be specified on the write path;
- // 2. Output path must be a legal HDFS style file system path;
- // 3. It's OK that the output path doesn't exist yet;
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val outputPath = {
- val path = new Path(caseInsensitiveOptions.getOrElse("path", {
- throw new IllegalArgumentException("'path' is not specified")
- }))
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- }
-
- val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
- PartitioningUtils.validatePartitionColumnDataTypes(
- data.schema, partitionColumns, caseSensitive)
-
- val equality = columnNameEquality(caseSensitive)
- val dataSchema = StructType(
- data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
-
- // If we are appending to a table that already exists, make sure the partitioning matches
- // up. If we fail to load the table for whatever reason, ignore the check.
- if (mode == SaveMode.Append) {
- val existingPartitionColumnSet = try {
- val resolved = apply(
- sqlContext,
- userSpecifiedSchema = Some(data.schema.asNullable),
- provider = provider,
- options = options)
-
- Some(resolved.relation
- .asInstanceOf[HadoopFsRelation]
- .location
- .partitionSpec(None)
- .partitionColumns
- .fieldNames
- .toSet)
- } catch {
- case e: Exception =>
- None
- }
-
- existingPartitionColumnSet.foreach { ex =>
- if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) {
- throw new AnalysisException(
- s"Requested partitioning does not equal existing partitioning: " +
- s"$ex != ${partitionColumns.toSet}.")
- }
- }
- }
-
- // For partitioned relation r, r.schema's column ordering can be different from the column
- // ordering of data.logicalPlan (partition columns are all moved after data column). This
- // will be adjusted within InsertIntoHadoopFsRelation.
- val plan =
- InsertIntoHadoopFsRelation(
- outputPath,
- partitionColumns.map(UnresolvedAttribute.quoted),
- bucketSpec,
- format,
- () => Unit, // No existing table needs to be refreshed.
- options,
- data.logicalPlan,
- mode)
- sqlContext.executePlan(plan).toRdd
-
- case _ =>
- sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
- }
-
- apply(
- sqlContext,
- userSpecifiedSchema = Some(data.schema.asNullable),
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- provider = provider,
- options = options)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 3d7c6a6..895794c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.types._
/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ *
* @param table The table to be described.
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
@@ -50,6 +51,7 @@ case class DescribeCommand(
/**
* Used to represent the operation of create table using a data source.
+ *
* @param allowExisting If it is true, we will do nothing when the table already exists.
* If it is false, an exception will be thrown
*/
@@ -91,14 +93,14 @@ case class CreateTempTableUsing(
options: Map[String, String]) extends RunnableCommand {
def run(sqlContext: SQLContext): Seq[Row] = {
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
- provider = provider,
+ className = provider,
options = options)
sqlContext.catalog.registerTable(
tableIdent,
- DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
+ DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
Seq.empty[Row]
}
@@ -114,17 +116,16 @@ case class CreateTempTableUsingAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val df = DataFrame(sqlContext, query)
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
- provider,
- partitionColumns,
+ className = provider,
+ partitionColumns = partitionColumns,
bucketSpec = None,
- mode,
- options,
- df)
+ options = options)
+ val result = dataSource.write(mode, df)
sqlContext.catalog.registerTable(
tableIdent,
- DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
+ DataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 0eae346..63f0e4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -32,15 +32,11 @@ private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[Logica
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
try {
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
- paths = Seq.empty,
- userSpecifiedSchema = None,
- partitionColumns = Array(),
- bucketSpec = None,
- provider = u.tableIdentifier.database.get,
- options = Map("path" -> u.tableIdentifier.table))
- val plan = LogicalRelation(resolved.relation)
+ paths = u.tableIdentifier.table :: Nil,
+ className = u.tableIdentifier.database.get)
+ val plan = LogicalRelation(dataSource.resolveRelation())
u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan)
} catch {
case e: ClassNotFoundException => u
@@ -143,7 +139,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
PartitioningUtils.validatePartitionColumnDataTypes(
- r.schema, part.keySet.toArray, catalog.conf.caseSensitiveAnalysis)
+ r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis)
// Get all input data source relations of the query.
val srcRelations = query.collect {
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 12512a8..60b0c64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -422,7 +422,7 @@ case class HadoopFsRelation(
}
/**
- * Used to read a write data in files to [[InternalRow]] format.
+ * Used to read and write data stored in files to/from the [[InternalRow]] format.
*/
trait FileFormat {
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 2f17037..02b173d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -32,7 +32,7 @@ import org.scalactic.Tolerance._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -1178,21 +1178,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
sparkContext.parallelize(1 to 100)
.map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
- val d1 = ResolvedDataSource(
+ val d1 = DataSource(
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
- provider = classOf[DefaultSource].getCanonicalName,
- options = Map("path" -> path))
+ className = classOf[DefaultSource].getCanonicalName,
+ options = Map("path" -> path)).resolveRelation()
- val d2 = ResolvedDataSource(
+ val d2 = DataSource(
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
- provider = classOf[DefaultSource].getCanonicalName,
- options = Map("path" -> path))
+ className = classOf[DefaultSource].getCanonicalName,
+ options = Map("path" -> path)).resolveRelation()
assert(d1 === d2)
})
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index cb6e517..94d032f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -18,59 +18,61 @@
package org.apache.spark.sql.sources
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.datasources.ResolvedDataSource
+import org.apache.spark.sql.execution.datasources.DataSource
class ResolvedDataSourceSuite extends SparkFunSuite {
+ private def getProvidingClass(name: String): Class[_] =
+ DataSource(sqlContext = null, className = name).providingClass
test("jdbc") {
assert(
- ResolvedDataSource.lookupDataSource("jdbc") ===
+ getProvidingClass("jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.jdbc") ===
+ getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.jdbc") ===
+ getProvidingClass("org.apache.spark.sql.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
}
test("json") {
assert(
- ResolvedDataSource.lookupDataSource("json") ===
+ getProvidingClass("json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.json") ===
+ getProvidingClass("org.apache.spark.sql.execution.datasources.json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.json") ===
+ getProvidingClass("org.apache.spark.sql.json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
}
test("parquet") {
assert(
- ResolvedDataSource.lookupDataSource("parquet") ===
+ getProvidingClass("parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.parquet") ===
+ getProvidingClass("org.apache.spark.sql.execution.datasources.parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
assert(
- ResolvedDataSource.lookupDataSource("org.apache.spark.sql.parquet") ===
+ getProvidingClass("org.apache.spark.sql.parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
}
test("error message for unknown data sources") {
val error1 = intercept[ClassNotFoundException] {
- ResolvedDataSource.lookupDataSource("avro")
+ getProvidingClass("avro")
}
assert(error1.getMessage.contains("spark-packages"))
val error2 = intercept[ClassNotFoundException] {
- ResolvedDataSource.lookupDataSource("com.databricks.spark.avro")
+ getProvidingClass("com.databricks.spark.avro")
}
assert(error2.getMessage.contains("spark-packages"))
val error3 = intercept[ClassNotFoundException] {
- ResolvedDataSource.lookupDataSource("asfdwefasdfasdf")
+ getProvidingClass("asfdwefasdfasdf")
}
assert(error3.getMessage.contains("spark-packages"))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2887418..8f6cd66 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -176,17 +176,17 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
val options = table.storage.serdeProperties
- val resolvedRelation =
- ResolvedDataSource(
+ val dataSource =
+ DataSource(
hive,
userSpecifiedSchema = userSpecifiedSchema,
- partitionColumns = partitionColumns.toArray,
+ partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = table.properties("spark.sql.sources.provider"),
+ className = table.properties("spark.sql.sources.provider"),
options = options)
LogicalRelation(
- resolvedRelation.relation,
+ dataSource.resolveRelation(),
metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
}
}
@@ -283,12 +283,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
val dataSource =
- ResolvedDataSource(
+ DataSource(
hive,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = provider,
+ className = provider,
options = options)
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
@@ -334,7 +334,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
- val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+ val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
case _ if skipHiveMetadata =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
@@ -511,7 +511,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val parquetRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
- val fileCatalog = new HiveFileCatalog(hive, paths, partitionSpec)
+ val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
val format = new DefaultSource()
val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles())
@@ -541,12 +541,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(
- ResolvedDataSource(
+ DataSource(
sqlContext = hive,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
options = parquetOptions,
- provider = "parquet").relation)
+ className = "parquet").resolveRelation())
cachedDataSourceTables.put(tableIdentifier, created)
created
@@ -749,7 +749,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
*/
-class HiveFileCatalog(
+class MetaStoreFileCatalog(
hive: HiveContext,
paths: Seq[Path],
partitionSpecFromHive: PartitionSpec)
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 37cec6d..7e4fb8b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{BucketSpec, LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -148,12 +148,12 @@ case class CreateMetastoreDataSource(
}
// Create the relation to validate the arguments before writing the metadata to the metastore.
- ResolvedDataSource(
+ DataSource(
sqlContext = sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
- provider = provider,
+ className = provider,
bucketSpec = None,
- options = optionsWithPath)
+ options = optionsWithPath).resolveRelation()
hiveContext.catalog.createDataSourceTable(
tableIdent,
@@ -220,15 +220,16 @@ case class CreateMetastoreDataSourceAsSelect(
return Seq.empty[Row]
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext = sqlContext,
userSpecifiedSchema = Some(query.schema.asNullable),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- provider = provider,
+ className = provider,
options = optionsWithPath)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
+
EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
existingSchema = Some(l.schema)
@@ -248,19 +249,19 @@ case class CreateMetastoreDataSourceAsSelect(
val data = DataFrame(hiveContext, query)
val df = existingSchema match {
// If we are inserting into an existing table, just use the existing schema.
- case Some(schema) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, schema)
+ case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s)
case None => data
}
// Create the relation based on the data of df.
- val resolved = ResolvedDataSource(
+ val dataSource = DataSource(
sqlContext,
- provider,
- partitionColumns,
- bucketSpec,
- mode,
- optionsWithPath,
- df)
+ className = provider,
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
+ options = optionsWithPath)
+
+ val result = dataSource.write(mode, df)
if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
@@ -268,7 +269,7 @@ case class CreateMetastoreDataSourceAsSelect(
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
hiveContext.catalog.createDataSourceTable(
tableIdent,
- Some(resolved.relation.schema),
+ Some(result.schema),
partitionColumns,
bucketSpec,
provider,
http://git-wip-us.apache.org/repos/asf/spark/blob/1e288405/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index ad832b5..041e0fb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.orc
import java.util.Properties
-import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -39,7 +38,7 @@ import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -173,7 +172,7 @@ private[orc] class OrcOutputWriter(
}
override def write(row: Row): Unit =
- throw new UnsupportedOperationException("call writeInternal")
+ throw new UnsupportedOperationException("call writeInternal")
private def wrapOrcStruct(
struct: OrcStruct,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org