You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/02 05:20:22 UTC
spark git commit: [SPARK-17992][SQL] Return all partitions from
HiveShim when Hive throws a metastore exception when attempting to fetch
partitions by filter
Repository: spark
Updated Branches:
refs/heads/master 1ecfafa08 -> 1bbf9ff63
[SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a metastore exception when attempting to fetch partitions by filter
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17992)
## What changes were proposed in this pull request?
We recently added table partition pruning for partitioned Hive tables converted to using `TableFileCatalog`. When the Hive configuration option `hive.metastore.try.direct.sql` is set to `false`, Hive will throw an exception for unsupported filter expressions. For example, attempting to filter on an integer partition column will throw a `org.apache.hadoop.hive.metastore.api.MetaException`.
I discovered this behavior because VideoAmp uses the CDH version of Hive with a Postgresql metastore DB. In this configuration, CDH sets `hive.metastore.try.direct.sql` to `false` by default, and queries that filter on a non-string partition column will fail.
Rather than throw an exception in query planning, this patch catches this exception, logs a warning and returns all table partitions instead. Clients of this method are already expected to handle the possibility that the filters will not be honored.
## How was this patch tested?
A unit test was added.
Author: Michael Allman <mi...@videoamp.com>
Closes #15673 from mallman/spark-17992-catch_hive_partition_filter_exception.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bbf9ff6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bbf9ff6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bbf9ff6
Branch: refs/heads/master
Commit: 1bbf9ff634745148e782370009aa31d3a042638c
Parents: 1ecfafa
Author: Michael Allman <mi...@videoamp.com>
Authored: Tue Nov 1 22:20:19 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 1 22:20:19 2016 -0700
----------------------------------------------------------------------
.../apache/spark/sql/hive/client/HiveShim.scala | 31 ++++++--
.../sql/hive/client/HiveClientBuilder.scala | 56 ++++++++++++++
.../spark/sql/hive/client/HiveClientSuite.scala | 61 ++++++++++++++++
.../spark/sql/hive/client/VersionsSuite.scala | 77 +++++---------------
4 files changed, 160 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 85edaf6..3d9642d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
@@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegralType, StringType}
import org.apache.spark.util.Utils
@@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
logDebug(s"Hive metastore filter is '$filter'.")
+ val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
+ val tryDirectSql =
+ hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal)
try {
+ // Hive may throw an exception when calling this method in some circumstances, such as
+ // when filtering on a non-string partition column when the hive config key
+ // hive.metastore.try.direct.sql is false
getPartitionsByFilterMethod.invoke(hive, table, filter)
.asInstanceOf[JArrayList[Partition]]
} catch {
- case e: InvocationTargetException =>
- // SPARK-18167 retry to investigate the flaky test. This should be reverted before
- // the release is cut.
- val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter))
- logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess)
- logError("all partitions: " + getAllPartitions(hive, table))
- throw e
+ case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
+ !tryDirectSql =>
+ logWarning("Caught Hive MetaException attempting to get partition metadata by " +
+ "filter from Hive. Falling back to fetching all partition metadata, which will " +
+ "degrade performance. Modifying your Hive metastore configuration to set " +
+ s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex)
+ // HiveShim clients are expected to handle a superset of the requested partitions
+ getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
+ case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
+ tryDirectSql =>
+ throw new RuntimeException("Caught Hive MetaException attempting to get partition " +
+ "metadata by filter from Hive. You can set the Spark configuration setting " +
+ s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " +
+ "problem, however this will result in degraded performance. Please report a bug: " +
+ "https://issues.apache.org/jira/browse/SPARK", ex)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
new file mode 100644
index 0000000..591a968
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import java.io.File
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+private[client] class HiveClientBuilder {
+ private val sparkConf = new SparkConf()
+
+ // In order to speed up test execution during development or in Jenkins, you can specify the path
+ // of an existing Ivy cache:
+ private val ivyPath: Option[String] = {
+ sys.env.get("SPARK_VERSIONS_SUITE_IVY_PATH").orElse(
+ Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
+ }
+
+ private def buildConf() = {
+ lazy val warehousePath = Utils.createTempDir()
+ lazy val metastorePath = Utils.createTempDir()
+ metastorePath.delete()
+ Map(
+ "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
+ "hive.metastore.warehouse.dir" -> warehousePath.toString)
+ }
+
+ def buildClient(version: String, hadoopConf: Configuration): HiveClient = {
+ IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = version,
+ hadoopVersion = VersionInfo.getVersion,
+ sparkConf = sparkConf,
+ hadoopConf = hadoopConf,
+ config = buildConf(),
+ ivyPath = ivyPath).createClient()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
new file mode 100644
index 0000000..4790331
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.types.IntegerType
+
+class HiveClientSuite extends SparkFunSuite {
+ private val clientBuilder = new HiveClientBuilder
+
+ private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname
+
+ test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") {
+ val testPartitionCount = 5
+
+ val storageFormat = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ compressed = false,
+ properties = Map.empty)
+
+ val hadoopConf = new Configuration()
+ hadoopConf.setBoolean(tryDirectSqlKey, false)
+ val client = clientBuilder.buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
+ client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (part INT)")
+
+ val partitions = (1 to testPartitionCount).map { part =>
+ CatalogTablePartition(Map("part" -> part.toString), storageFormat)
+ }
+ client.createPartitions(
+ "default", "test", partitions, ignoreIfExists = false)
+
+ val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
+ Seq(EqualTo(AttributeReference("part", IntegerType)(), Literal(3))))
+
+ assert(filteredPartitions.size == testPartitionCount)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9a10957..081b0ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -23,9 +23,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.util.VersionInfo
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -48,46 +47,19 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {
- private val sparkConf = new SparkConf()
-
- // In order to speed up test execution during development or in Jenkins, you can specify the path
- // of an existing Ivy cache:
- private val ivyPath: Option[String] = {
- sys.env.get("SPARK_VERSIONS_SUITE_IVY_PATH").orElse(
- Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
- }
-
- private def buildConf() = {
- lazy val warehousePath = Utils.createTempDir()
- lazy val metastorePath = Utils.createTempDir()
- metastorePath.delete()
- Map(
- "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
- "hive.metastore.warehouse.dir" -> warehousePath.toString)
- }
+ private val clientBuilder = new HiveClientBuilder
+ import clientBuilder.buildClient
test("success sanity check") {
- val badClient = IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
- hadoopVersion = VersionInfo.getVersion,
- sparkConf = sparkConf,
- hadoopConf = new Configuration(),
- config = buildConf(),
- ivyPath = ivyPath).createClient()
+ val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration())
val db = new CatalogDatabase("default", "desc", "loc", Map())
badClient.createDatabase(db, ignoreIfExists = true)
}
test("hadoop configuration preserved") {
- val hadoopConf = new Configuration();
+ val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
- val client = IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
- hadoopVersion = VersionInfo.getVersion,
- sparkConf = sparkConf,
- hadoopConf = hadoopConf,
- config = buildConf(),
- ivyPath = ivyPath).createClient()
+ val client = buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
assert("success" === client.getConf("test", null))
}
@@ -109,15 +81,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
// TODO: currently only works on mysql where we manually create the schema...
ignore("failure sanity check") {
val e = intercept[Throwable] {
- val badClient = quietly {
- IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = "13",
- hadoopVersion = VersionInfo.getVersion,
- sparkConf = sparkConf,
- hadoopConf = new Configuration(),
- config = buildConf(),
- ivyPath = ivyPath).createClient()
- }
+ val badClient = quietly { buildClient("13", new Configuration()) }
}
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
@@ -130,16 +94,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: create client") {
client = null
System.gc() // Hack to avoid SEGV on some JVM versions.
- val hadoopConf = new Configuration();
+ val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
- client =
- IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = version,
- hadoopVersion = VersionInfo.getVersion,
- sparkConf = sparkConf,
- hadoopConf = hadoopConf,
- config = buildConf(),
- ivyPath = ivyPath).createClient()
+ client = buildClient(version, hadoopConf)
}
def table(database: String, tableName: String): CatalogTable = {
@@ -287,15 +244,19 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
}
+ val testPartitionCount = 2
+
test(s"$version: createPartitions") {
- val partition1 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "1"), storageFormat)
- val partition2 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "2"), storageFormat)
+ val partitions = (1 to testPartitionCount).map { key2 =>
+ CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat)
+ }
client.createPartitions(
- "default", "src_part", Seq(partition1, partition2), ignoreIfExists = true)
+ "default", "src_part", partitions, ignoreIfExists = true)
}
test(s"$version: getPartitions(catalogTable)") {
- assert(2 == client.getPartitions(client.getTable("default", "src_part")).size)
+ assert(testPartitionCount ==
+ client.getPartitions(client.getTable("default", "src_part")).size)
}
test(s"$version: getPartitionsByFilter") {
@@ -306,6 +267,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
// Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
if (version != "0.12") {
assert(result.size == 1)
+ } else {
+ assert(result.size == testPartitionCount)
}
}
@@ -327,7 +290,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test(s"$version: getPartitions(db: String, table: String)") {
- assert(2 == client.getPartitions("default", "src_part", None).size)
+ assert(testPartitionCount == client.getPartitions("default", "src_part", None).size)
}
test(s"$version: loadPartition") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org