You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/02 05:20:22 UTC

spark git commit: [SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a metastore exception when attempting to fetch partitions by filter

Repository: spark
Updated Branches:
  refs/heads/master 1ecfafa08 -> 1bbf9ff63


[SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a metastore exception when attempting to fetch partitions by filter

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17992)
## What changes were proposed in this pull request?

We recently added table partition pruning for partitioned Hive tables converted to using `TableFileCatalog`. When the Hive configuration option `hive.metastore.try.direct.sql` is set to `false`, Hive will throw an exception for unsupported filter expressions. For example, attempting to filter on an integer partition column will throw a `org.apache.hadoop.hive.metastore.api.MetaException`.

I discovered this behavior because VideoAmp uses the CDH version of Hive with a Postgresql metastore DB. In this configuration, CDH sets `hive.metastore.try.direct.sql` to `false` by default, and queries that filter on a non-string partition column will fail.

Rather than throw an exception in query planning, this patch catches this exception, logs a warning and returns all table partitions instead. Clients of this method are already expected to handle the possibility that the filters will not be honored.
## How was this patch tested?

A unit test was added.

Author: Michael Allman <mi...@videoamp.com>

Closes #15673 from mallman/spark-17992-catch_hive_partition_filter_exception.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bbf9ff6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bbf9ff6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bbf9ff6

Branch: refs/heads/master
Commit: 1bbf9ff634745148e782370009aa31d3a042638c
Parents: 1ecfafa
Author: Michael Allman <mi...@videoamp.com>
Authored: Tue Nov 1 22:20:19 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 1 22:20:19 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/hive/client/HiveShim.scala | 31 ++++++--
 .../sql/hive/client/HiveClientBuilder.scala     | 56 ++++++++++++++
 .../spark/sql/hive/client/HiveClientSuite.scala | 61 ++++++++++++++++
 .../spark/sql/hive/client/VersionsSuite.scala   | 77 +++++---------------
 4 files changed, 160 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 85edaf6..3d9642d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
@@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegralType, StringType}
 import org.apache.spark.util.Utils
 
@@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
         getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
       } else {
         logDebug(s"Hive metastore filter is '$filter'.")
+        val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
+        val tryDirectSql =
+          hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal)
         try {
+          // Hive may throw an exception when calling this method in some circumstances, such as
+          // when filtering on a non-string partition column when the hive config key
+          // hive.metastore.try.direct.sql is false
           getPartitionsByFilterMethod.invoke(hive, table, filter)
             .asInstanceOf[JArrayList[Partition]]
         } catch {
-          case e: InvocationTargetException =>
-            // SPARK-18167 retry to investigate the flaky test. This should be reverted before
-            // the release is cut.
-            val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter))
-            logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess)
-            logError("all partitions: " + getAllPartitions(hive, table))
-            throw e
+          case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
+              !tryDirectSql =>
+            logWarning("Caught Hive MetaException attempting to get partition metadata by " +
+              "filter from Hive. Falling back to fetching all partition metadata, which will " +
+              "degrade performance. Modifying your Hive metastore configuration to set " +
+              s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex)
+            // HiveShim clients are expected to handle a superset of the requested partitions
+            getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
+          case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
+              tryDirectSql =>
+            throw new RuntimeException("Caught Hive MetaException attempting to get partition " +
+              "metadata by filter from Hive. You can set the Spark configuration setting " +
+              s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " +
+              "problem, however this will result in degraded performance. Please report a bug: " +
+              "https://issues.apache.org/jira/browse/SPARK", ex)
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
new file mode 100644
index 0000000..591a968
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import java.io.File
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+private[client] class HiveClientBuilder {
+  private val sparkConf = new SparkConf()
+
+  // In order to speed up test execution during development or in Jenkins, you can specify the path
+  // of an existing Ivy cache:
+  private val ivyPath: Option[String] = {
+    sys.env.get("SPARK_VERSIONS_SUITE_IVY_PATH").orElse(
+      Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
+  }
+
+  private def buildConf() = {
+    lazy val warehousePath = Utils.createTempDir()
+    lazy val metastorePath = Utils.createTempDir()
+    metastorePath.delete()
+    Map(
+      "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
+      "hive.metastore.warehouse.dir" -> warehousePath.toString)
+  }
+
+  def buildClient(version: String, hadoopConf: Configuration): HiveClient = {
+    IsolatedClientLoader.forVersion(
+      hiveMetastoreVersion = version,
+      hadoopVersion = VersionInfo.getVersion,
+      sparkConf = sparkConf,
+      hadoopConf = hadoopConf,
+      config = buildConf(),
+      ivyPath = ivyPath).createClient()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
new file mode 100644
index 0000000..4790331
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.types.IntegerType
+
+class HiveClientSuite extends SparkFunSuite {
+  private val clientBuilder = new HiveClientBuilder
+
+  private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname
+
+  test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") {
+    val testPartitionCount = 5
+
+    val storageFormat = CatalogStorageFormat(
+      locationUri = None,
+      inputFormat = None,
+      outputFormat = None,
+      serde = None,
+      compressed = false,
+      properties = Map.empty)
+
+    val hadoopConf = new Configuration()
+    hadoopConf.setBoolean(tryDirectSqlKey, false)
+    val client = clientBuilder.buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
+    client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (part INT)")
+
+    val partitions = (1 to testPartitionCount).map { part =>
+      CatalogTablePartition(Map("part" -> part.toString), storageFormat)
+    }
+    client.createPartitions(
+      "default", "test", partitions, ignoreIfExists = false)
+
+    val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
+      Seq(EqualTo(AttributeReference("part", IntegerType)(), Literal(3))))
+
+    assert(filteredPartitions.size == testPartitionCount)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9a10957..081b0ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -23,9 +23,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.util.VersionInfo
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -48,46 +47,19 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
 @ExtendedHiveTest
 class VersionsSuite extends SparkFunSuite with Logging {
 
-  private val sparkConf = new SparkConf()
-
-  // In order to speed up test execution during development or in Jenkins, you can specify the path
-  // of an existing Ivy cache:
-  private val ivyPath: Option[String] = {
-    sys.env.get("SPARK_VERSIONS_SUITE_IVY_PATH").orElse(
-      Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
-  }
-
-  private def buildConf() = {
-    lazy val warehousePath = Utils.createTempDir()
-    lazy val metastorePath = Utils.createTempDir()
-    metastorePath.delete()
-    Map(
-      "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
-      "hive.metastore.warehouse.dir" -> warehousePath.toString)
-  }
+  private val clientBuilder = new HiveClientBuilder
+  import clientBuilder.buildClient
 
   test("success sanity check") {
-    val badClient = IsolatedClientLoader.forVersion(
-      hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
-      hadoopVersion = VersionInfo.getVersion,
-      sparkConf = sparkConf,
-      hadoopConf = new Configuration(),
-      config = buildConf(),
-      ivyPath = ivyPath).createClient()
+    val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration())
     val db = new CatalogDatabase("default", "desc", "loc", Map())
     badClient.createDatabase(db, ignoreIfExists = true)
   }
 
   test("hadoop configuration preserved") {
-    val hadoopConf = new Configuration();
+    val hadoopConf = new Configuration()
     hadoopConf.set("test", "success")
-    val client = IsolatedClientLoader.forVersion(
-      hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
-      hadoopVersion = VersionInfo.getVersion,
-      sparkConf = sparkConf,
-      hadoopConf = hadoopConf,
-      config = buildConf(),
-      ivyPath = ivyPath).createClient()
+    val client = buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
     assert("success" === client.getConf("test", null))
   }
 
@@ -109,15 +81,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
   // TODO: currently only works on mysql where we manually create the schema...
   ignore("failure sanity check") {
     val e = intercept[Throwable] {
-      val badClient = quietly {
-        IsolatedClientLoader.forVersion(
-          hiveMetastoreVersion = "13",
-          hadoopVersion = VersionInfo.getVersion,
-          sparkConf = sparkConf,
-          hadoopConf = new Configuration(),
-          config = buildConf(),
-          ivyPath = ivyPath).createClient()
-      }
+      val badClient = quietly { buildClient("13", new Configuration()) }
     }
     assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
   }
@@ -130,16 +94,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
     test(s"$version: create client") {
       client = null
       System.gc() // Hack to avoid SEGV on some JVM versions.
-      val hadoopConf = new Configuration();
+      val hadoopConf = new Configuration()
       hadoopConf.set("test", "success")
-      client =
-        IsolatedClientLoader.forVersion(
-          hiveMetastoreVersion = version,
-          hadoopVersion = VersionInfo.getVersion,
-          sparkConf = sparkConf,
-          hadoopConf = hadoopConf,
-          config = buildConf(),
-          ivyPath = ivyPath).createClient()
+      client = buildClient(version, hadoopConf)
     }
 
     def table(database: String, tableName: String): CatalogTable = {
@@ -287,15 +244,19 @@ class VersionsSuite extends SparkFunSuite with Logging {
       client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
     }
 
+    val testPartitionCount = 2
+
     test(s"$version: createPartitions") {
-      val partition1 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "1"), storageFormat)
-      val partition2 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "2"), storageFormat)
+      val partitions = (1 to testPartitionCount).map { key2 =>
+        CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat)
+      }
       client.createPartitions(
-        "default", "src_part", Seq(partition1, partition2), ignoreIfExists = true)
+        "default", "src_part", partitions, ignoreIfExists = true)
     }
 
     test(s"$version: getPartitions(catalogTable)") {
-      assert(2 == client.getPartitions(client.getTable("default", "src_part")).size)
+      assert(testPartitionCount ==
+        client.getPartitions(client.getTable("default", "src_part")).size)
     }
 
     test(s"$version: getPartitionsByFilter") {
@@ -306,6 +267,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
       // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
       if (version != "0.12") {
         assert(result.size == 1)
+      } else {
+        assert(result.size == testPartitionCount)
       }
     }
 
@@ -327,7 +290,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
     }
 
     test(s"$version: getPartitions(db: String, table: String)") {
-      assert(2 == client.getPartitions("default", "src_part", None).size)
+      assert(testPartitionCount == client.getPartitions("default", "src_part", None).size)
     }
 
     test(s"$version: loadPartition") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org