You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/07/28 02:32:39 UTC

spark git commit: [SPARK-9386] [SQL] Feature flag for metastore partition pruning

Repository: spark
Updated Branches:
  refs/heads/master 8ddfa52c2 -> ce89ff477


[SPARK-9386] [SQL] Feature flag for metastore partition pruning

Since we have been seeing a lot of failures related to this new feature, lets put it behind a flag and turn it off by default.

Author: Michael Armbrust <mi...@databricks.com>

Closes #7703 from marmbrus/optionalMetastorePruning and squashes the following commits:

6ad128c [Michael Armbrust] style
8447835 [Michael Armbrust] [SPARK-9386][SQL] Feature flag for metastore partition pruning
fd37b87 [Michael Armbrust] add config flag


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce89ff47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce89ff47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce89ff47

Branch: refs/heads/master
Commit: ce89ff477aea6def68265ed218f6105680755c9a
Parents: 8ddfa52
Author: Michael Armbrust <mi...@databricks.com>
Authored: Mon Jul 27 17:32:34 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jul 27 17:32:34 2015 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/SQLConf.scala   |  7 +++++++
 .../apache/spark/sql/hive/HiveMetastoreCatalog.scala    | 12 +++++++++++-
 .../apache/spark/sql/hive/client/ClientInterface.scala  | 10 ++++------
 3 files changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce89ff47/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9b2dbd7..40eba33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -301,6 +301,11 @@ private[spark] object SQLConf {
     defaultValue = Some(true),
     doc = "<TODO>")
 
+  val HIVE_METASTORE_PARTITION_PRUNING = booleanConf("spark.sql.hive.metastorePartitionPruning",
+    defaultValue = Some(false),
+    doc = "When true, some predicates will be pushed down into the Hive metastore so that " +
+          "unmatching partitions can be eliminated earlier.")
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
     defaultValue = Some("_corrupt_record"),
     doc = "<TODO>")
@@ -456,6 +461,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
 
   private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
 
+  private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
+
   private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
 
   private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)

http://git-wip-us.apache.org/repos/asf/spark/blob/ce89ff47/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9c707a7..3180c05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -678,8 +678,18 @@ private[hive] case class MetastoreRelation
     }
   )
 
+  // When metastore partition pruning is turned off, we cache the list of all partitions to
+  // mimic the behavior of Spark < 1.5
+  lazy val allPartitions = table.getAllPartitions
+
   def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
-    table.getPartitions(predicates).map { p =>
+    val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
+      table.getPartitions(predicates)
+    } else {
+      allPartitions
+    }
+
+    rawPartitions.map { p =>
       val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
       tPartition.setDbName(databaseName)
       tPartition.setTableName(tableName)

http://git-wip-us.apache.org/repos/asf/spark/blob/ce89ff47/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
index 1656587..d834b4e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
@@ -72,12 +72,10 @@ private[hive] case class HiveTable(
 
   def isPartitioned: Boolean = partitionColumns.nonEmpty
 
-  def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = {
-    predicates match {
-      case Nil => client.getAllPartitions(this)
-      case _ => client.getPartitionsByFilter(this, predicates)
-    }
-  }
+  def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
+
+  def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
+    client.getPartitionsByFilter(this, predicates)
 
   // Hive does not support backticks when passing names to the client.
   def qualifiedName: String = s"$database.$name"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org