You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/05/15 10:21:12 UTC
[2/2] spark git commit: [SPARK-7591] [SQL] Partitioning support API
tweaks
[SPARK-7591] [SQL] Partitioning support API tweaks
Please see [SPARK-7591] [1] for the details.
/cc rxin marmbrus yhuai
[1]: https://issues.apache.org/jira/browse/SPARK-7591
Author: Cheng Lian <li...@databricks.com>
Closes #6150 from liancheng/spark-7591 and squashes the following commits:
af422e7 [Cheng Lian] Addresses @rxin's comments
37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization
2fc680a [Cheng Lian] Fixes Scala style issue
189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments
522c24e [Cheng Lian] Adds OutputWriterFactory
047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdf5bba3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdf5bba3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdf5bba3
Branch: refs/heads/master
Commit: fdf5bba35d201fe0de3901b4d47262c485c76569
Parents: 9476148
Author: Cheng Lian <li...@databricks.com>
Authored: Fri May 15 16:20:49 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri May 15 16:20:49 2015 +0800
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/SQLContext.scala | 14 +-
.../spark/sql/parquet/fsBasedParquet.scala | 565 ------------------
.../apache/spark/sql/parquet/newParquet.scala | 570 +++++++++++++++++++
.../spark/sql/sources/DataSourceStrategy.scala | 10 +-
.../spark/sql/sources/PartitioningUtils.scala | 4 +
.../org/apache/spark/sql/sources/commands.scala | 23 +-
.../org/apache/spark/sql/sources/ddl.scala | 8 +-
.../apache/spark/sql/sources/interfaces.scala | 140 +++--
.../org/apache/spark/sql/sources/rules.scala | 2 +-
.../spark/sql/parquet/ParquetFilterSuite.scala | 2 +-
.../spark/sql/parquet/ParquetSchemaSuite.scala | 12 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +-
.../spark/sql/hive/execution/commands.scala | 2 +-
.../sql/hive/MetastoreDataSourcesSuite.scala | 6 +-
.../sql/hive/execution/SQLQuerySuite.scala | 8 +-
.../apache/spark/sql/hive/parquetSuites.scala | 20 +-
.../spark/sql/sources/SimpleTextRelation.scala | 47 +-
.../sql/sources/fsBasedRelationSuites.scala | 564 ------------------
.../sql/sources/hadoopFsRelationSuites.scala | 564 ++++++++++++++++++
19 files changed, 1287 insertions(+), 1286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index b33a700..9fb355e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
} else if (conf.parquetUseDataSourceApi) {
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
baseRelationToDataFrame(
- new FSBasedParquetRelation(
+ new ParquetRelation2(
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
} else {
DataFrame(this, parquet.ParquetRelation(
@@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
}
-
+
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
}
-
+
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def jdbc(
url: String,
- table: String,
+ table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
@@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val parts = JDBCRelation.columnPartition(partitioning)
jdbc(url, table, parts, properties)
}
-
+
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
jdbc(url, table, parts, properties)
}
-
+
private def jdbc(
url: String,
table: String,
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
deleted file mode 100644
index c83a9c3..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import parquet.filter2.predicate.FilterApi
-import parquet.format.converter.ParquetMetadataConverter
-import parquet.hadoop._
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.util.ContextUtil
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD._
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
-
-private[sql] class DefaultSource extends FSBasedRelationProvider {
- override def createRelation(
- sqlContext: SQLContext,
- paths: Array[String],
- schema: Option[StructType],
- partitionColumns: Option[StructType],
- parameters: Map[String, String]): FSBasedRelation = {
- val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
- new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
- }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
-private[sql] class ParquetOutputWriter extends OutputWriter {
- private var recordWriter: RecordWriter[Void, Row] = _
- private var taskAttemptContext: TaskAttemptContext = _
-
- override def init(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): Unit = {
- val conf = context.getConfiguration
- val outputFormat = {
- // When appending new Parquet files to an existing Parquet file directory, to avoid
- // overwriting existing data files, we need to find out the max task ID encoded in these data
- // file names.
- // TODO Make this snippet a utility function for other data source developers
- val maxExistingTaskId = {
- // Note that `path` may point to a temporary location. Here we retrieve the real
- // destination path from the configuration
- val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
- val fs = outputPath.getFileSystem(conf)
-
- if (fs.exists(outputPath)) {
- // Pattern used to match task ID in part file names, e.g.:
- //
- // part-r-00001.gz.part
- // ^~~~~
- val partFilePattern = """part-.-(\d{1,}).*""".r
-
- fs.listStatus(outputPath).map(_.getPath.getName).map {
- case partFilePattern(id) => id.toInt
- case name if name.startsWith("_") => 0
- case name if name.startsWith(".") => 0
- case name => sys.error(
- s"""Trying to write Parquet files to directory $outputPath,
- |but found items with illegal name "$name"
- """.stripMargin.replace('\n', ' ').trim)
- }.reduceOption(_ max _).getOrElse(0)
- } else {
- 0
- }
- }
-
- new ParquetOutputFormat[Row]() {
- // Here we override `getDefaultWorkFile` for two reasons:
- //
- // 1. To allow appending. We need to generate output file name based on the max available
- // task ID computed above.
- //
- // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
- // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
- // partitions in the case of dynamic partitioning.
- override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
- new Path(path, f"part-r-$split%05d$extension")
- }
- }
- }
-
- recordWriter = outputFormat.getRecordWriter(context)
- taskAttemptContext = context
- }
-
- override def write(row: Row): Unit = recordWriter.write(null, row)
-
- override def close(): Unit = recordWriter.close(taskAttemptContext)
-}
-
-private[sql] class FSBasedParquetRelation(
- paths: Array[String],
- private val maybeDataSchema: Option[StructType],
- private val maybePartitionSpec: Option[PartitionSpec],
- parameters: Map[String, String])(
- val sqlContext: SQLContext)
- extends FSBasedRelation(paths, maybePartitionSpec)
- with Logging {
-
- // Should we merge schemas from all Parquet part-files?
- private val shouldMergeSchemas =
- parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
-
- private val maybeMetastoreSchema = parameters
- .get(FSBasedParquetRelation.METASTORE_SCHEMA)
- .map(DataType.fromJson(_).asInstanceOf[StructType])
-
- private val metadataCache = new MetadataCache
- metadataCache.refresh()
-
- override def equals(other: scala.Any): Boolean = other match {
- case that: FSBasedParquetRelation =>
- val schemaEquality = if (shouldMergeSchemas) {
- this.shouldMergeSchemas == that.shouldMergeSchemas
- } else {
- this.dataSchema == that.dataSchema &&
- this.schema == that.schema
- }
-
- this.paths.toSet == that.paths.toSet &&
- schemaEquality &&
- this.maybeDataSchema == that.maybeDataSchema &&
- this.partitionColumns == that.partitionColumns
-
- case _ => false
- }
-
- override def hashCode(): Int = {
- if (shouldMergeSchemas) {
- Objects.hashCode(
- Boolean.box(shouldMergeSchemas),
- paths.toSet,
- maybeDataSchema,
- maybePartitionSpec)
- } else {
- Objects.hashCode(
- Boolean.box(shouldMergeSchemas),
- paths.toSet,
- dataSchema,
- schema,
- maybeDataSchema,
- maybePartitionSpec)
- }
- }
-
- override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]
-
- override def dataSchema: StructType = metadataCache.dataSchema
-
- override private[sql] def refresh(): Unit = {
- metadataCache.refresh()
- super.refresh()
- }
-
- // Parquet data source always uses Catalyst internal representations.
- override val needConversion: Boolean = false
-
- override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
-
- override def prepareForWrite(job: Job): Unit = {
- val conf = ContextUtil.getConfiguration(job)
-
- val committerClass =
- conf.getClass(
- "spark.sql.parquet.output.committer.class",
- classOf[ParquetOutputCommitter],
- classOf[ParquetOutputCommitter])
-
- conf.setClass(
- "mapred.output.committer.class",
- committerClass,
- classOf[ParquetOutputCommitter])
-
- // TODO There's no need to use two kinds of WriteSupport
- // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
- // complex types.
- val writeSupportClass =
- if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
- classOf[MutableRowWriteSupport]
- } else {
- classOf[RowWriteSupport]
- }
-
- ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
- RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
-
- // Sets compression scheme
- conf.set(
- ParquetOutputFormat.COMPRESSION,
- ParquetRelation
- .shortParquetCompressionCodecNames
- .getOrElse(
- sqlContext.conf.parquetCompressionCodec.toUpperCase,
- CompressionCodecName.UNCOMPRESSED).name())
- }
-
- override def buildScan(
- requiredColumns: Array[String],
- filters: Array[Filter],
- inputPaths: Array[String]): RDD[Row] = {
-
- val job = new Job(SparkHadoopUtil.get.conf)
- val conf = ContextUtil.getConfiguration(job)
-
- ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-
- if (inputPaths.nonEmpty) {
- FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
- }
-
- // Try to push down filters when filter push-down is enabled.
- if (sqlContext.conf.parquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter(dataSchema, _))
- .reduceOption(FilterApi.and)
- .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
- }
-
- conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
- val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
- ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
- })
-
- conf.set(
- RowWriteSupport.SPARK_ROW_SCHEMA,
- ParquetTypesConverter.convertToString(dataSchema.toAttributes))
-
- // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
- val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
- conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
-
- val inputFileStatuses =
- metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
-
- val footers = inputFileStatuses.map(metadataCache.footers)
-
- // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
- // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
- // footers. Especially when a global arbitrative schema (either from metastore or data source
- // DDL) is available.
- new NewHadoopRDD(
- sqlContext.sparkContext,
- classOf[FilteringParquetRowInputFormat],
- classOf[Void],
- classOf[Row],
- conf) {
-
- val cacheMetadata = useMetadataCache
-
- @transient val cachedStatuses = inputFileStatuses.map { f =>
- // In order to encode the authority of a Path containing special characters such as /,
- // we need to use the string returned by the URI of the path to create a new Path.
- val pathWithAuthority = new Path(f.getPath.toUri.toString)
-
- new FileStatus(
- f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
- f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
- }.toSeq
-
- @transient val cachedFooters = footers.map { f =>
- // In order to encode the authority of a Path containing special characters such as /,
- // we need to use the string returned by the URI of the path to create a new Path.
- new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
- }.toSeq
-
- // Overridden so we can inject our own cached files statuses.
- override def getPartitions: Array[SparkPartition] = {
- val inputFormat = if (cacheMetadata) {
- new FilteringParquetRowInputFormat {
- override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
-
- override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
- }
- } else {
- new FilteringParquetRowInputFormat
- }
-
- val jobContext = newJobContext(getConf, jobId)
- val rawSplits = inputFormat.getSplits(jobContext)
-
- Array.tabulate[SparkPartition](rawSplits.size) { i =>
- new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
- }
- }
- }.values
- }
-
- private class MetadataCache {
- // `FileStatus` objects of all "_metadata" files.
- private var metadataStatuses: Array[FileStatus] = _
-
- // `FileStatus` objects of all "_common_metadata" files.
- private var commonMetadataStatuses: Array[FileStatus] = _
-
- // Parquet footer cache.
- var footers: Map[FileStatus, Footer] = _
-
- // `FileStatus` objects of all data files (Parquet part-files).
- var dataStatuses: Array[FileStatus] = _
-
- // Schema of the actual Parquet files, without partition columns discovered from partition
- // directory paths.
- var dataSchema: StructType = _
-
- // Schema of the whole table, including partition columns.
- var schema: StructType = _
-
- /**
- * Refreshes `FileStatus`es, footers, partition spec, and table schema.
- */
- def refresh(): Unit = {
- // Support either reading a collection of raw Parquet part-files, or a collection of folders
- // containing Parquet files (e.g. partitioned Parquet table).
- val baseStatuses = paths.distinct.flatMap { p =>
- val path = new Path(p)
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- Try(fs.getFileStatus(qualified)).toOption
- }
- assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
-
- // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
- val leaves = baseStatuses.flatMap { f =>
- val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
- SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
- isSummaryFile(f.getPath) ||
- !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
- }
- }
-
- dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
- metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
- commonMetadataStatuses =
- leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
- footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
- val parquetMetadata = ParquetFileReader.readFooter(
- SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
- f -> new Footer(f.getPath, parquetMetadata)
- }.seq.toMap
-
- dataSchema = {
- val dataSchema0 =
- maybeDataSchema
- .orElse(readSchema())
- .orElse(maybeMetastoreSchema)
- .getOrElse(sys.error("Failed to get the schema."))
-
- // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
- // case insensitivity issue and possible schema mismatch (probably caused by schema
- // evolution).
- maybeMetastoreSchema
- .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
- .getOrElse(dataSchema0)
- }
- }
-
- private def isSummaryFile(file: Path): Boolean = {
- file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
- file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
- }
-
- private def readSchema(): Option[StructType] = {
- // Sees which file(s) we need to touch in order to figure out the schema.
- //
- // Always tries the summary files first if users don't require a merged schema. In this case,
- // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
- // groups information, and could be much smaller for large Parquet files with lots of row
- // groups.
- //
- // NOTE: Metadata stored in the summary files are merged from all part-files. However, for
- // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
- // how to merge them correctly if some key is associated with different values in different
- // part-files. When this happens, Parquet simply gives up generating the summary file. This
- // implies that if a summary file presents, then:
- //
- // 1. Either all part-files have exactly the same Spark SQL schema, or
- // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
- // their schemas may differ from each other).
- //
- // Here we tend to be pessimistic and take the second case into account. Basically this means
- // we can't trust the summary files if users require a merged schema, and must touch all part-
- // files to do the merge.
- val filesToTouch =
- if (shouldMergeSchemas) {
- // Also includes summary files, 'cause there might be empty partition directories.
- (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
- } else {
- // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
- // don't have this.
- commonMetadataStatuses.headOption
- // Falls back to "_metadata"
- .orElse(metadataStatuses.headOption)
- // Summary file(s) not found, the Parquet file is either corrupted, or different part-
- // files contain conflicting user defined metadata (two or more values are associated
- // with a same key in different files). In either case, we fall back to any of the
- // first part-file, and just assume all schemas are consistent.
- .orElse(dataStatuses.headOption)
- .toSeq
- }
-
- assert(
- filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
- "No schema defined, " +
- s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
-
- FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
- }
- }
-}
-
-private[sql] object FSBasedParquetRelation extends Logging {
- // Whether we should merge schemas collected from all Parquet part-files.
- private[sql] val MERGE_SCHEMA = "mergeSchema"
-
- // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
- // internally.
- private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
- private[parquet] def readSchema(
- footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
- footers.map { footer =>
- val metadata = footer.getParquetMetadata.getFileMetaData
- val parquetSchema = metadata.getSchema
- val maybeSparkSchema = metadata
- .getKeyValueMetaData
- .toMap
- .get(RowReadSupport.SPARK_METADATA_KEY)
- .flatMap { serializedSchema =>
- // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
- // whatever is available.
- Try(DataType.fromJson(serializedSchema))
- .recover { case _: Throwable =>
- logInfo(
- s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
- "falling back to the deprecated DataType.fromCaseClassString parser.")
- DataType.fromCaseClassString(serializedSchema)
- }
- .recover { case cause: Throwable =>
- logWarning(
- s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
- |\t$serializedSchema
- """.stripMargin,
- cause)
- }
- .map(_.asInstanceOf[StructType])
- .toOption
- }
-
- maybeSparkSchema.getOrElse {
- // Falls back to Parquet schema if Spark SQL schema is absent.
- StructType.fromAttributes(
- // TODO Really no need to use `Attribute` here, we only need to know the data type.
- ParquetTypesConverter.convertToAttributes(
- parquetSchema,
- sqlContext.conf.isParquetBinaryAsString,
- sqlContext.conf.isParquetINT96AsTimestamp))
- }
- }.reduceOption { (left, right) =>
- try left.merge(right) catch { case e: Throwable =>
- throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
- }
- }
- }
-
- /**
- * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
- * schema and Parquet schema.
- *
- * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
- * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
- * distinguish binary and string). This method generates a correct schema by merging Metastore
- * schema data types and Parquet schema field names.
- */
- private[parquet] def mergeMetastoreParquetSchema(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- def schemaConflictMessage: String =
- s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
- |${metastoreSchema.prettyJson}
- |
- |Parquet schema:
- |${parquetSchema.prettyJson}
- """.stripMargin
-
- val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
- assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
- val ordinalMap = metastoreSchema.zipWithIndex.map {
- case (field, index) => field.name.toLowerCase -> index
- }.toMap
-
- val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
- ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
- StructType(metastoreSchema.zip(reorderedParquetSchema).map {
- // Uses Parquet field names but retains Metastore data types.
- case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
- mSchema.copy(name = pSchema.name)
- case _ =>
- throw new SparkException(schemaConflictMessage)
- })
- }
-
- /**
- * Returns the original schema from the Parquet file with any missing nullable fields from the
- * Hive Metastore schema merged in.
- *
- * When constructing a DataFrame from a collection of structured data, the resulting object has
- * a schema corresponding to the union of the fields present in each element of the collection.
- * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
- * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
- * contain a particular nullable field in its schema despite that field being present in the
- * table schema obtained from the Hive Metastore. This method returns a schema representing the
- * Parquet file schema along with any additional nullable fields from the Metastore schema
- * merged in.
- */
- private[parquet] def mergeMissingNullableFields(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
- val missingFields = metastoreSchema
- .map(_.name.toLowerCase)
- .diff(parquetSchema.map(_.name.toLowerCase))
- .map(fieldMap(_))
- .filter(_.nullable)
- StructType(parquetSchema ++ missingFields)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
new file mode 100644
index 0000000..946062f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import parquet.filter2.predicate.FilterApi
+import parquet.format.converter.ParquetMetadataConverter
+import parquet.hadoop._
+import parquet.hadoop.metadata.CompressionCodecName
+import parquet.hadoop.util.ContextUtil
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+
+private[sql] class DefaultSource extends HadoopFsRelationProvider {
+ override def createRelation(
+ sqlContext: SQLContext,
+ paths: Array[String],
+ schema: Option[StructType],
+ partitionColumns: Option[StructType],
+ parameters: Map[String, String]): HadoopFsRelation = {
+ val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
+ new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
+ }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
+ extends OutputWriter {
+
+ private val recordWriter: RecordWriter[Void, Row] = {
+ val conf = context.getConfiguration
+ val outputFormat = {
+ // When appending new Parquet files to an existing Parquet file directory, to avoid
+ // overwriting existing data files, we need to find out the max task ID encoded in these data
+ // file names.
+ // TODO Make this snippet a utility function for other data source developers
+ val maxExistingTaskId = {
+ // Note that `path` may point to a temporary location. Here we retrieve the real
+ // destination path from the configuration
+ val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
+ val fs = outputPath.getFileSystem(conf)
+
+ if (fs.exists(outputPath)) {
+ // Pattern used to match task ID in part file names, e.g.:
+ //
+ // part-r-00001.gz.parquet
+ // ^~~~~
+ val partFilePattern = """part-.-(\d{1,}).*""".r
+
+ fs.listStatus(outputPath).map(_.getPath.getName).map {
+ case partFilePattern(id) => id.toInt
+ case name if name.startsWith("_") => 0
+ case name if name.startsWith(".") => 0
+ case name => sys.error(
+ s"Trying to write Parquet files to directory $outputPath, " +
+ s"but found items with illegal name '$name'.")
+ }.reduceOption(_ max _).getOrElse(0)
+ } else {
+ 0
+ }
+ }
+
+ new ParquetOutputFormat[Row]() {
+ // Here we override `getDefaultWorkFile` for two reasons:
+ //
+ // 1. To allow appending. We need to generate output file name based on the max available
+ // task ID computed above.
+ //
+ // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
+ // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
+ // partitions in the case of dynamic partitioning.
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
+ new Path(path, f"part-r-$split%05d$extension")
+ }
+ }
+ }
+
+ outputFormat.getRecordWriter(context)
+ }
+
+ override def write(row: Row): Unit = recordWriter.write(null, row)
+
+ override def close(): Unit = recordWriter.close(context)
+}
+
+private[sql] class ParquetRelation2(
+ override val paths: Array[String],
+ private val maybeDataSchema: Option[StructType],
+ private val maybePartitionSpec: Option[PartitionSpec],
+ parameters: Map[String, String])(
+ val sqlContext: SQLContext)
+ extends HadoopFsRelation(maybePartitionSpec)
+ with Logging {
+
+ // Should we merge schemas from all Parquet part-files?
+ private val shouldMergeSchemas =
+ parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
+
+ private val maybeMetastoreSchema = parameters
+ .get(ParquetRelation2.METASTORE_SCHEMA)
+ .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+ private lazy val metadataCache: MetadataCache = {
+ val meta = new MetadataCache
+ meta.refresh()
+ meta
+ }
+
+ override def equals(other: scala.Any): Boolean = other match {
+ case that: ParquetRelation2 =>
+ val schemaEquality = if (shouldMergeSchemas) {
+ this.shouldMergeSchemas == that.shouldMergeSchemas
+ } else {
+ this.dataSchema == that.dataSchema &&
+ this.schema == that.schema
+ }
+
+ this.paths.toSet == that.paths.toSet &&
+ schemaEquality &&
+ this.maybeDataSchema == that.maybeDataSchema &&
+ this.partitionColumns == that.partitionColumns
+
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ if (shouldMergeSchemas) {
+ Objects.hashCode(
+ Boolean.box(shouldMergeSchemas),
+ paths.toSet,
+ maybeDataSchema,
+ maybePartitionSpec)
+ } else {
+ Objects.hashCode(
+ Boolean.box(shouldMergeSchemas),
+ paths.toSet,
+ dataSchema,
+ schema,
+ maybeDataSchema,
+ maybePartitionSpec)
+ }
+ }
+
+ override def dataSchema: StructType = metadataCache.dataSchema
+
+ override private[sql] def refresh(): Unit = {
+ metadataCache.refresh()
+ super.refresh()
+ }
+
+ // Parquet data source always uses Catalyst internal representations.
+ override val needConversion: Boolean = false
+
+ override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
+
+ override def userDefinedPartitionColumns: Option[StructType] =
+ maybePartitionSpec.map(_.partitionColumns)
+
+ override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+ val conf = ContextUtil.getConfiguration(job)
+
+ val committerClass =
+ conf.getClass(
+ "spark.sql.parquet.output.committer.class",
+ classOf[ParquetOutputCommitter],
+ classOf[ParquetOutputCommitter])
+
+ conf.setClass(
+ "mapred.output.committer.class",
+ committerClass,
+ classOf[ParquetOutputCommitter])
+
+ // TODO There's no need to use two kinds of WriteSupport
+ // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
+ // complex types.
+ val writeSupportClass =
+ if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
+ classOf[MutableRowWriteSupport]
+ } else {
+ classOf[RowWriteSupport]
+ }
+
+ ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
+ RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+
+ // Sets compression scheme
+ conf.set(
+ ParquetOutputFormat.COMPRESSION,
+ ParquetRelation
+ .shortParquetCompressionCodecNames
+ .getOrElse(
+ sqlContext.conf.parquetCompressionCodec.toUpperCase,
+ CompressionCodecName.UNCOMPRESSED).name())
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
+ new ParquetOutputWriter(path, context)
+ }
+ }
+ }
+
+ override def buildScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputPaths: Array[String]): RDD[Row] = {
+
+ val job = new Job(SparkHadoopUtil.get.conf)
+ val conf = ContextUtil.getConfiguration(job)
+
+ ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+
+ if (inputPaths.nonEmpty) {
+ FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
+ }
+
+ // Try to push down filters when filter push-down is enabled.
+ if (sqlContext.conf.parquetFilterPushDown) {
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .reduceOption(FilterApi.and)
+ .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+ }
+
+ conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+ val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+ ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
+ })
+
+ conf.set(
+ RowWriteSupport.SPARK_ROW_SCHEMA,
+ ParquetTypesConverter.convertToString(dataSchema.toAttributes))
+
+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+ val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
+ conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+
+ val inputFileStatuses =
+ metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
+
+ val footers = inputFileStatuses.map(metadataCache.footers)
+
+ // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
+ // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
+ // footers. Especially when a global arbitrative schema (either from metastore or data source
+ // DDL) is available.
+ new NewHadoopRDD(
+ sqlContext.sparkContext,
+ classOf[FilteringParquetRowInputFormat],
+ classOf[Void],
+ classOf[Row],
+ conf) {
+
+ val cacheMetadata = useMetadataCache
+
+ @transient val cachedStatuses = inputFileStatuses.map { f =>
+ // In order to encode the authority of a Path containing special characters such as /,
+ // we need to use the string returned by the URI of the path to create a new Path.
+ val pathWithAuthority = new Path(f.getPath.toUri.toString)
+
+ new FileStatus(
+ f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+ f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+ }.toSeq
+
+ @transient val cachedFooters = footers.map { f =>
+ // In order to encode the authority of a Path containing special characters such as /,
+ // we need to use the string returned by the URI of the path to create a new Path.
+ new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+ }.toSeq
+
+ // Overridden so we can inject our own cached files statuses.
+ override def getPartitions: Array[SparkPartition] = {
+ val inputFormat = if (cacheMetadata) {
+ new FilteringParquetRowInputFormat {
+ override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
+
+ override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+ }
+ } else {
+ new FilteringParquetRowInputFormat
+ }
+
+ val jobContext = newJobContext(getConf, jobId)
+ val rawSplits = inputFormat.getSplits(jobContext)
+
+ Array.tabulate[SparkPartition](rawSplits.size) { i =>
+ new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ }
+ }.values
+ }
+
+ private class MetadataCache {
+ // `FileStatus` objects of all "_metadata" files.
+ private var metadataStatuses: Array[FileStatus] = _
+
+ // `FileStatus` objects of all "_common_metadata" files.
+ private var commonMetadataStatuses: Array[FileStatus] = _
+
+ // Parquet footer cache.
+ var footers: Map[FileStatus, Footer] = _
+
+ // `FileStatus` objects of all data files (Parquet part-files).
+ var dataStatuses: Array[FileStatus] = _
+
+ // Schema of the actual Parquet files, without partition columns discovered from partition
+ // directory paths.
+ var dataSchema: StructType = _
+
+ // Schema of the whole table, including partition columns.
+ var schema: StructType = _
+
+ /**
+ * Refreshes `FileStatus`es, footers, partition spec, and table schema.
+ */
+ def refresh(): Unit = {
+ // Support either reading a collection of raw Parquet part-files, or a collection of folders
+ // containing Parquet files (e.g. partitioned Parquet table).
+ val baseStatuses = paths.distinct.flatMap { p =>
+ val path = new Path(p)
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ Try(fs.getFileStatus(qualified)).toOption
+ }
+ assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
+
+ // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
+ val leaves = baseStatuses.flatMap { f =>
+ val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
+ SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
+ isSummaryFile(f.getPath) ||
+ !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+ }
+ }
+
+ dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+ metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
+ commonMetadataStatuses =
+ leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
+ footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
+ val parquetMetadata = ParquetFileReader.readFooter(
+ SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
+ f -> new Footer(f.getPath, parquetMetadata)
+ }.seq.toMap
+
+ dataSchema = {
+ val dataSchema0 =
+ maybeDataSchema
+ .orElse(readSchema())
+ .orElse(maybeMetastoreSchema)
+ .getOrElse(sys.error("Failed to get the schema."))
+
+ // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
+ // case insensitivity issue and possible schema mismatch (probably caused by schema
+ // evolution).
+ maybeMetastoreSchema
+ .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
+ .getOrElse(dataSchema0)
+ }
+ }
+
+ private def isSummaryFile(file: Path): Boolean = {
+ file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+ file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+ }
+
+ private def readSchema(): Option[StructType] = {
+ // Sees which file(s) we need to touch in order to figure out the schema.
+ //
+ // Always tries the summary files first if users don't require a merged schema. In this case,
+ // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
+ // groups information, and could be much smaller for large Parquet files with lots of row
+ // groups.
+ //
+ // NOTE: Metadata stored in the summary files are merged from all part-files. However, for
+ // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
+ // how to merge them correctly if some key is associated with different values in different
+ // part-files. When this happens, Parquet simply gives up generating the summary file. This
+ // implies that if a summary file presents, then:
+ //
+ // 1. Either all part-files have exactly the same Spark SQL schema, or
+ // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
+ // their schemas may differ from each other).
+ //
+ // Here we tend to be pessimistic and take the second case into account. Basically this means
+ // we can't trust the summary files if users require a merged schema, and must touch all part-
+ // files to do the merge.
+ val filesToTouch =
+ if (shouldMergeSchemas) {
+ // Also includes summary files, 'cause there might be empty partition directories.
+ (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+ } else {
+ // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
+ // don't have this.
+ commonMetadataStatuses.headOption
+ // Falls back to "_metadata"
+ .orElse(metadataStatuses.headOption)
+ // Summary file(s) not found, the Parquet file is either corrupted, or different part-
+ // files contain conflicting user defined metadata (two or more values are associated
+ // with a same key in different files). In either case, we fall back to any of the
+ // first part-file, and just assume all schemas are consistent.
+ .orElse(dataStatuses.headOption)
+ .toSeq
+ }
+
+ assert(
+ filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
+ "No schema defined, " +
+ s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
+
+ ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
+ }
+ }
+}
+
+private[sql] object ParquetRelation2 extends Logging {
+ // Whether we should merge schemas collected from all Parquet part-files.
+ private[sql] val MERGE_SCHEMA = "mergeSchema"
+
+ // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
+ // internally.
+ private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+
+ private[parquet] def readSchema(
+ footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
+ footers.map { footer =>
+ val metadata = footer.getParquetMetadata.getFileMetaData
+ val parquetSchema = metadata.getSchema
+ val maybeSparkSchema = metadata
+ .getKeyValueMetaData
+ .toMap
+ .get(RowReadSupport.SPARK_METADATA_KEY)
+ .flatMap { serializedSchema =>
+ // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
+ // whatever is available.
+ Try(DataType.fromJson(serializedSchema))
+ .recover { case _: Throwable =>
+ logInfo(
+ s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+ "falling back to the deprecated DataType.fromCaseClassString parser.")
+ DataType.fromCaseClassString(serializedSchema)
+ }
+ .recover { case cause: Throwable =>
+ logWarning(
+ s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
+ |\t$serializedSchema
+ """.stripMargin,
+ cause)
+ }
+ .map(_.asInstanceOf[StructType])
+ .toOption
+ }
+
+ maybeSparkSchema.getOrElse {
+ // Falls back to Parquet schema if Spark SQL schema is absent.
+ StructType.fromAttributes(
+ // TODO Really no need to use `Attribute` here, we only need to know the data type.
+ ParquetTypesConverter.convertToAttributes(
+ parquetSchema,
+ sqlContext.conf.isParquetBinaryAsString,
+ sqlContext.conf.isParquetINT96AsTimestamp))
+ }
+ }.reduceOption { (left, right) =>
+ try left.merge(right) catch { case e: Throwable =>
+ throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
+ }
+ }
+ }
+
+ /**
+ * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
+ * schema and Parquet schema.
+ *
+ * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
+ * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
+ * distinguish binary and string). This method generates a correct schema by merging Metastore
+ * schema data types and Parquet schema field names.
+ */
+ private[parquet] def mergeMetastoreParquetSchema(
+ metastoreSchema: StructType,
+ parquetSchema: StructType): StructType = {
+ def schemaConflictMessage: String =
+ s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
+ |${metastoreSchema.prettyJson}
+ |
+ |Parquet schema:
+ |${parquetSchema.prettyJson}
+ """.stripMargin
+
+ val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
+
+ assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
+
+ val ordinalMap = metastoreSchema.zipWithIndex.map {
+ case (field, index) => field.name.toLowerCase -> index
+ }.toMap
+
+ val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
+ ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
+
+ StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+ // Uses Parquet field names but retains Metastore data types.
+ case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
+ mSchema.copy(name = pSchema.name)
+ case _ =>
+ throw new SparkException(schemaConflictMessage)
+ })
+ }
+
+ /**
+ * Returns the original schema from the Parquet file with any missing nullable fields from the
+ * Hive Metastore schema merged in.
+ *
+ * When constructing a DataFrame from a collection of structured data, the resulting object has
+ * a schema corresponding to the union of the fields present in each element of the collection.
+ * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
+ * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
+ * contain a particular nullable field in its schema despite that field being present in the
+ * table schema obtained from the Hive Metastore. This method returns a schema representing the
+ * Parquet file schema along with any additional nullable fields from the Metastore schema
+ * merged in.
+ */
+ private[parquet] def mergeMissingNullableFields(
+ metastoreSchema: StructType,
+ parquetSchema: StructType): StructType = {
+ val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+ val missingFields = metastoreSchema
+ .map(_.name.toLowerCase)
+ .diff(parquetSchema.map(_.name.toLowerCase))
+ .map(fieldMap(_))
+ .filter(_.nullable)
+ StructType(parquetSchema ++ missingFields)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index ee099ab..e6324b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, _) => t.buildScan(a)) :: Nil
// Scanning partitioned FSBasedRelation
- case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation))
+ case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
@@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
selectedPartitions) :: Nil
// Scanning non-partitioned FSBasedRelation
- case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) =>
+ case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
@@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty =>
+ l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty =>
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
execution.ExecutedCommand(
- InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil
+ InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil
case _ => Nil
}
@@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
partitionColumns: StructType,
partitions: Array[Partition]) = {
val output = projections.map(_.toAttribute)
- val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation]
+ val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index d30f7f6..d1f0cda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String)
private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
private[sql] object PartitioningUtils {
+ // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
+ // depend on Hive.
+ private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+
private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
require(columnNames.size == literals.size)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 7879328..a09bb08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource(
}
}
-private[sql] case class InsertIntoFSBasedRelation(
- @transient relation: FSBasedRelation,
+private[sql] case class InsertIntoHadoopFsRelation(
+ @transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
partitionColumns: Array[String],
mode: SaveMode)
@@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation(
insert(new DefaultWriterContainer(relation, job), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
- relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__")
+ relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
@@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation(
}
private[sql] abstract class BaseWriterContainer(
- @transient val relation: FSBasedRelation,
+ @transient val relation: HadoopFsRelation,
@transient job: Job)
extends SparkHadoopMapReduceUtil
with Logging
@@ -261,7 +261,7 @@ private[sql] abstract class BaseWriterContainer(
protected val dataSchema = relation.dataSchema
- protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass
+ protected var outputWriterFactory: OutputWriterFactory = _
private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
@@ -269,7 +269,7 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
- relation.prepareForWrite(job)
+ outputWriterFactory = relation.prepareJobForWrite(job)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupJob(jobContext)
@@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer(
}
private[sql] class DefaultWriterContainer(
- @transient relation: FSBasedRelation,
+ @transient relation: HadoopFsRelation,
@transient job: Job)
extends BaseWriterContainer(relation, job) {
@transient private var writer: OutputWriter = _
override protected def initWriters(): Unit = {
- writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
- writer.init(getWorkPath, dataSchema, taskAttemptContext)
+ writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
}
override def outputWriterForRow(row: Row): OutputWriter = writer
@@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer(
}
private[sql] class DynamicPartitionWriterContainer(
- @transient relation: FSBasedRelation,
+ @transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
defaultPartitionName: String)
@@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer(
outputWriters.getOrElseUpdate(partitionPath, {
val path = new Path(getWorkPath, partitionPath)
- val writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path",
new Path(outputPath, partitionPath).toString)
- writer.init(path.toString, dataSchema, taskAttemptContext)
- writer
+ outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
})
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 595c5eb..37a569d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -226,7 +226,7 @@ private[sql] object ResolvedDataSource {
case Some(schema: StructType) => clazz.newInstance() match {
case dataSource: SchemaRelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
- case dataSource: FSBasedRelationProvider =>
+ case dataSource: HadoopFsRelationProvider =>
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
None
} else {
@@ -256,7 +256,7 @@ private[sql] object ResolvedDataSource {
case None => clazz.newInstance() match {
case dataSource: RelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
- case dataSource: FSBasedRelationProvider =>
+ case dataSource: HadoopFsRelationProvider =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
@@ -296,7 +296,7 @@ private[sql] object ResolvedDataSource {
val relation = clazz.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sqlContext, mode, options, data)
- case dataSource: FSBasedRelationProvider =>
+ case dataSource: HadoopFsRelationProvider =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
@@ -315,7 +315,7 @@ private[sql] object ResolvedDataSource {
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
sqlContext.executePlan(
- InsertIntoFSBasedRelation(
+ InsertIntoHadoopFsRelation(
r,
data.logicalPlan,
partitionColumns.toArray,
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6f31530..274ab44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.{StructField, StructType}
@@ -94,7 +94,7 @@ trait SchemaRelationProvider {
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source
* with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a
- * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined
+ * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined
* schema, and an optional list of partition columns, this interface is used to pass in the
* parameters specified by a user.
*
@@ -105,15 +105,15 @@ trait SchemaRelationProvider {
*
* A new instance of this class with be instantiated each time a DDL call is made.
*
- * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is
+ * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is
* that users need to provide a schema and a (possibly empty) list of partition columns when
* using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]],
- * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified
+ * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified
* schemas, and accessing partitioned relations.
*
* @since 1.4.0
*/
-trait FSBasedRelationProvider {
+trait HadoopFsRelationProvider {
/**
* Returns a new base relation with the given parameters, a user defined schema, and a list of
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
@@ -124,7 +124,7 @@ trait FSBasedRelationProvider {
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
- parameters: Map[String, String]): FSBasedRelation
+ parameters: Map[String, String]): HadoopFsRelation
}
/**
@@ -280,33 +280,42 @@ trait CatalystScan {
/**
* ::Experimental::
- * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the
- * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
- * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
- * executor side. This instance is used to persist rows to this single output file.
+ * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver
+ * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized
+ * to executor side to create actual [[OutputWriter]]s on the fly.
*
* @since 1.4.0
*/
@Experimental
-abstract class OutputWriter {
+abstract class OutputWriterFactory extends Serializable {
/**
- * Initializes this [[OutputWriter]] before any rows are persisted.
+ * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
+ * to instantiate new [[OutputWriter]]s.
*
* @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
* this may not point to the final output file. For example, `FileOutputFormat` writes to
* temporary directories and then merge written files back to the final destination. In
* this case, `path` points to a temporary output file under the temporary directory.
* @param dataSchema Schema of the rows to be written. Partition columns are not included in the
- * schema if the corresponding relation is partitioned.
+ * schema if the relation being written is partitioned.
* @param context The Hadoop MapReduce task context.
*
* @since 1.4.0
*/
- def init(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): Unit = ()
+ def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter
+}
+/**
+ * ::Experimental::
+ * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the
+ * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
+ * executor side. This instance is used to persist rows to this single output file.
+ *
+ * @since 1.4.0
+ */
+@Experimental
+abstract class OutputWriter {
/**
* Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
* tables, dynamic partition columns are not included in rows to be written.
@@ -333,74 +342,71 @@ abstract class OutputWriter {
* filter using selected predicates before producing an RDD containing all matching tuples as
* [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
* systems, it's able to discover partitioning information from the paths of input directories, and
- * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must
- * override one of the three `buildScan` methods to implement the read path.
+ * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]]
+ * must override one of the three `buildScan` methods to implement the read path.
*
* For the write path, it provides the ability to write to both non-partitioned and partitioned
* tables. Directory layout of the partitioned tables is compatible with Hive.
*
* @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for
* implementing metastore table conversion.
- * @param paths Base paths of this relation. For partitioned relations, it should be the root
- * directories of all partition directories.
- * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional
+ *
+ * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional
* [[PartitionSpec]], so that partition discovery can be skipped.
*
* @since 1.4.0
*/
@Experimental
-abstract class FSBasedRelation private[sql](
- val paths: Array[String],
- maybePartitionSpec: Option[PartitionSpec])
+abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
extends BaseRelation {
+ def this() = this(None)
+
+ private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+
+ private val codegenEnabled = sqlContext.conf.codegenEnabled
+
+ private var _partitionSpec: PartitionSpec = _
+
+ final private[sql] def partitionSpec: PartitionSpec = {
+ if (_partitionSpec == null) {
+ _partitionSpec = maybePartitionSpec
+ .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
+ .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
+ .getOrElse {
+ if (sqlContext.conf.partitionDiscoveryEnabled()) {
+ discoverPartitions()
+ } else {
+ PartitionSpec(StructType(Nil), Array.empty[Partition])
+ }
+ }
+ }
+ _partitionSpec
+ }
+
/**
- * Constructs an [[FSBasedRelation]].
- *
- * @param paths Base paths of this relation. For partitioned relations, it should be either root
- * directories of all partition directories.
- * @param partitionColumns Partition columns of this relation.
+ * Base paths of this relation. For partitioned relations, it should be either root directories
+ * of all partition directories.
*
* @since 1.4.0
*/
- def this(paths: Array[String], partitionColumns: StructType) =
- this(paths, {
- if (partitionColumns.isEmpty) None
- else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
- })
+ def paths: Array[String]
/**
- * Constructs an [[FSBasedRelation]].
- *
- * @param paths Base paths of this relation. For partitioned relations, it should be root
- * directories of all partition directories.
+ * Partition columns. Can be either defined by [[userDefinedPartitionColumns]] or automatically
+ * discovered. Note that they should always be nullable.
*
* @since 1.4.0
*/
- def this(paths: Array[String]) = this(paths, None)
-
- private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
-
- private val codegenEnabled = sqlContext.conf.codegenEnabled
-
- private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
- spec.copy(partitionColumns = spec.partitionColumns.asNullable)
- }.getOrElse {
- if (sqlContext.conf.partitionDiscoveryEnabled()) {
- discoverPartitions()
- } else {
- PartitionSpec(StructType(Nil), Array.empty[Partition])
- }
- }
-
- private[sql] def partitionSpec: PartitionSpec = _partitionSpec
+ final def partitionColumns: StructType =
+ userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns)
/**
- * Partition columns. Note that they are always nullable.
+ * Optional user defined partition columns.
*
* @since 1.4.0
*/
- def partitionColumns: StructType = partitionSpec.partitionColumns
+ def userDefinedPartitionColumns: Option[StructType] = None
private[sql] def refresh(): Unit = {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
@@ -419,7 +425,7 @@ abstract class FSBasedRelation private[sql](
}.map(_.getPath)
if (leafDirs.nonEmpty) {
- PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
+ PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
} else {
PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
}
@@ -458,7 +464,7 @@ abstract class FSBasedRelation private[sql](
* @since 1.4.0
*/
def buildScan(inputPaths: Array[String]): RDD[Row] = {
- throw new RuntimeException(
+ throw new UnsupportedOperationException(
"At least one buildScan() method should be overridden to read the relation.")
}
@@ -520,8 +526,8 @@ abstract class FSBasedRelation private[sql](
}
/**
- * Client side preparation for data writing can be put here. For example, user defined output
- * committer can be configured here.
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
+ * be put here. For example, user defined output committer can be configured here.
*
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
@@ -529,13 +535,5 @@ abstract class FSBasedRelation private[sql](
*
* @since 1.4.0
*/
- def prepareForWrite(job: Job): Unit = ()
-
- /**
- * This method is responsible for producing a new [[OutputWriter]] for each newly opened output
- * file on the executor side.
- *
- * @since 1.4.0
- */
- def outputWriterClass: Class[_ <: OutputWriter]
+ def prepareJobForWrite(job: Job): OutputWriterFactory
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index aad1d24..1eacdde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK
- case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK
+ case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK
case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 3bbc5b0..5ad4395 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}.flatten.reduceOption(_ && _)
val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
- case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters
+ case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
}.flatten.reduceOption(_ && _)
forParquetTableScan.orElse(forParquetDataSource)
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index fc90e3e..c964b6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),
@@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),
@@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),
@@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Conflicting non-nullable field names
intercept[Throwable] {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
@@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
@@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b0e82c8..2aa80b4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
@@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
- FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
- FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
+ ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+ ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
@@ -238,7 +238,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) =>
+ case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
@@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new FSBasedParquetRelation(
+ new ParquetRelation2(
paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
@@ -294,7 +294,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
+ new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 8e405e0..6609763 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -194,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
- case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) =>
+ case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index da5d203..1bf1c1b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -579,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: FSBasedParquetRelation) => // OK
+ case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+ s"${classOf[ParquetRelation2].getCanonicalName}")
}
// Clenup and reset confs.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org