You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/31 16:56:27 UTC

spark git commit: [SPARK-20160][SQL] Move ParquetConversions and OrcConversions Out Of HiveSessionCatalog

Repository: spark
Updated Branches:
  refs/heads/master c4c03eed6 -> b2349e6a0


[SPARK-20160][SQL] Move ParquetConversions and OrcConversions Out Of HiveSessionCatalog

### What changes were proposed in this pull request?
`ParquetConversions` and `OrcConversions` should be treated as regular `Analyzer` rules. It is not reasonable to be part of `HiveSessionCatalog`. This PR also combines two rules `ParquetConversions` and `OrcConversions` to build a new rule `RelationConversions `.

After moving these two rules out of HiveSessionCatalog, the next step is to clean up, rename and move `HiveMetastoreCatalog` because it is not related to the hive package any more.

### How was this patch tested?
The existing test cases

Author: Xiao Li <ga...@gmail.com>

Closes #17484 from gatorsmile/cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2349e6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2349e6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2349e6a

Branch: refs/heads/master
Commit: b2349e6a00d569851f0ca91a60e9299306208e92
Parents: c4c03ee
Author: Xiao Li <ga...@gmail.com>
Authored: Sat Apr 1 00:56:18 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Apr 1 00:56:18 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 96 ++------------------
 .../spark/sql/hive/HiveSessionCatalog.scala     | 25 +----
 .../sql/hive/HiveSessionStateBuilder.scala      |  3 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  | 56 +++++++++++-
 .../sql/hive/JavaMetastoreDataSourcesSuite.java |  3 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |  4 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  5 +-
 7 files changed, 70 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b2349e6a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 305bd00..10f4325 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -28,11 +28,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
-import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
 import org.apache.spark.sql.types._
 
@@ -48,14 +44,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
   import HiveMetastoreCatalog._
 
-  private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
-
-  def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
-    QualifiedTableName(
-      tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
-      tableIdent.table.toLowerCase)
-  }
-
   /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
   private val tableCreationLocks = Striped.lazyWeakLock(100)
 
@@ -68,11 +56,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
   }
 
-  def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
-    // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
-    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
-    val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri
-    new Path(new Path(dbLocation), tblName).toString
+  // For testing only
+  private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
+    val key = QualifiedTableName(
+      table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
+      table.table.toLowerCase)
+    tableRelationCache.getIfPresent(key)
   }
 
   private def getCached(
@@ -122,7 +111,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
   }
 
-  private def convertToLogicalRelation(
+  def convertToLogicalRelation(
       relation: CatalogRelation,
       options: Map[String, String],
       fileFormatClass: Class[_ <: FileFormat],
@@ -273,78 +262,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     case NonFatal(ex) =>
       logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex)
   }
-
-  /**
-   * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
-   * data source relations for better performance.
-   */
-  object ParquetConversions extends Rule[LogicalPlan] {
-    private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
-      relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
-        sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
-    }
-
-    private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
-      val fileFormatClass = classOf[ParquetFileFormat]
-      val mergeSchema = sessionState.conf.getConf(
-        HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
-      val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
-
-      convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
-    }
-
-    override def apply(plan: LogicalPlan): LogicalPlan = {
-      plan transformUp {
-        // Write path
-        case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
-            // Inserting into partitioned table is not supported in Parquet data source (yet).
-            if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
-              !r.isPartitioned && shouldConvertMetastoreParquet(r) =>
-          InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
-
-        // Read path
-        case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
-            shouldConvertMetastoreParquet(relation) =>
-          convertToParquetRelation(relation)
-      }
-    }
-  }
-
-  /**
-   * When scanning Metastore ORC tables, convert them to ORC data source relations
-   * for better performance.
-   */
-  object OrcConversions extends Rule[LogicalPlan] {
-    private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
-      relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
-        sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
-    }
-
-    private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
-      val fileFormatClass = classOf[OrcFileFormat]
-      val options = Map[String, String]()
-
-      convertToLogicalRelation(relation, options, fileFormatClass, "orc")
-    }
-
-    override def apply(plan: LogicalPlan): LogicalPlan = {
-      plan transformUp {
-        // Write path
-        case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
-            // Inserting into partitioned table is not supported in Orc data source (yet).
-            if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
-              !r.isPartitioned && shouldConvertMetastoreOrc(r) =>
-          InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)
-
-        // Read path
-        case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
-            shouldConvertMetastoreOrc(relation) =>
-          convertToOrcRelation(relation)
-      }
-    }
-  }
 }
 
+
 private[hive] object HiveMetastoreCatalog {
   def mergeWithMetastoreSchema(
       metastoreSchema: StructType,

http://git-wip-us.apache.org/repos/asf/spark/blob/b2349e6a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 2cc20a7..9e3eb2d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
 import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DecimalType, DoubleType}
@@ -43,7 +41,7 @@ import org.apache.spark.util.Utils
 private[sql] class HiveSessionCatalog(
     externalCatalog: HiveExternalCatalog,
     globalTempViewManager: GlobalTempViewManager,
-    private val metastoreCatalog: HiveMetastoreCatalog,
+    val metastoreCatalog: HiveMetastoreCatalog,
     functionRegistry: FunctionRegistry,
     conf: SQLConf,
     hadoopConf: Configuration,
@@ -58,25 +56,6 @@ private[sql] class HiveSessionCatalog(
       parser,
       functionResourceLoader) {
 
-  // ----------------------------------------------------------------
-  // | Methods and fields for interacting with HiveMetastoreCatalog |
-  // ----------------------------------------------------------------
-
-  // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e.
-  // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
-  val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
-  val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
-
-  def hiveDefaultTableFilePath(name: TableIdentifier): String = {
-    metastoreCatalog.hiveDefaultTableFilePath(name)
-  }
-
-  // For testing only
-  private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
-    val key = metastoreCatalog.getQualifiedTableName(table)
-    tableRelationCache.getIfPresent(key)
-  }
-
   override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
     makeFunctionBuilder(funcName, Utils.classForName(className))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b2349e6a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 2f3dfa0..9d3b31f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -75,8 +75,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
 
     override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
       new DetermineTableStats(session) +:
-      catalog.ParquetConversions +:
-      catalog.OrcConversions +:
+      RelationConversions(conf, catalog) +:
       PreprocessTableCreation(session) +:
       PreprocessTableInsertion(conf) +:
       DataSourceAnalysis(conf) +:

http://git-wip-us.apache.org/repos/asf/spark/blob/b2349e6a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b5ce027..0465e9c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.hive
 
 import java.io.IOException
-import java.net.URI
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.StatsSetupConst
@@ -31,9 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
 import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.hive.orc.OrcFileFormat
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 
 
 /**
@@ -170,6 +171,55 @@ object HiveAnalysis extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Relation conversion from metastore relations to data source relations for better performance
+ *
+ * - When writing to non-partitioned Hive-serde Parquet/Orc tables
+ * - When scanning Hive-serde Parquet/ORC tables
+ *
+ * This rule must be run before all other DDL post-hoc resolution rules, i.e.
+ * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
+ */
+case class RelationConversions(
+    conf: SQLConf,
+    sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
+  private def isConvertible(relation: CatalogRelation): Boolean = {
+    (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
+      conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)) ||
+      (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
+        conf.getConf(HiveUtils.CONVERT_METASTORE_ORC))
+  }
+
+  private def convert(relation: CatalogRelation): LogicalRelation = {
+    if (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet")) {
+      val options = Map(ParquetOptions.MERGE_SCHEMA ->
+        conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
+      sessionCatalog.metastoreCatalog
+        .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
+    } else {
+      val options = Map[String, String]()
+      sessionCatalog.metastoreCatalog
+        .convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
+    }
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan transformUp {
+      // Write path
+      case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
+        // Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
+        if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
+          !r.isPartitioned && isConvertible(r) =>
+        InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists)
+
+      // Read path
+      case relation: CatalogRelation
+          if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
+        convert(relation)
+    }
+  }
+}
+
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
   self: SparkPlanner =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b2349e6a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index 0b157a4..25bd4d0 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -72,8 +72,7 @@ public class JavaMetastoreDataSourcesSuite {
       path.delete();
     }
     HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog();
-    hiveManagedPath = new Path(
-      catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
+    hiveManagedPath = new Path(catalog.defaultTablePath(new TableIdentifier("javaSavedTable")));
     fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
     fs.delete(hiveManagedPath, true);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b2349e6a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 962998e..3191b99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -413,7 +413,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       }
       // Table lookup will make the table cached.
       spark.table(tableIndent)
-      statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent)
+      statsBeforeUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent)
         .asInstanceOf[LogicalRelation].catalogTable.get.stats.get
 
       sql(s"INSERT INTO $tableName SELECT 2")
@@ -423,7 +423,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
       }
       spark.table(tableIndent)
-      statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent)
+      statsAfterUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent)
         .asInstanceOf[LogicalRelation].catalogTable.get.stats.get
     }
     (statsBeforeUpdate, statsAfterUpdate)

http://git-wip-us.apache.org/repos/asf/spark/blob/b2349e6a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 9fc2923..23f21e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -449,8 +449,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
     }
   }
 
-  private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = {
-    sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id)
+  private def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
+    sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+      .getCachedDataSourceTable(table)
   }
 
   test("Caching converted data source Parquet Relations") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org