You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/04/08 15:42:49 UTC
[spark] branch master updated: [SPARK-27176][SQL] Upgrade
hadoop-3's built-in Hive maven dependencies to 2.3.4
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 33f3c48 [SPARK-27176][SQL] Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4
33f3c48 is described below
commit 33f3c48cac087e079b9c7e342c2e58b16eaaa681
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Mon Apr 8 08:42:21 2019 -0700
[SPARK-27176][SQL] Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4
## What changes were proposed in this pull request?
This PR mainly contains:
1. Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4.
2. Resolve compatibility issues between Hive 1.2.1 and Hive 2.3.4 in the `sql/hive` module.
## How was this patch tested?
jenkins test hadoop-2.7
manual test hadoop-3:
```shell
build/sbt clean package -Phadoop-3.2 -Phive
export SPARK_PREPEND_CLASSES=true
# rm -rf metastore_db
cat <<EOF > test_hadoop3.scala
spark.range(10).write.saveAsTable("test_hadoop3")
spark.table("test_hadoop3").show
EOF
bin/spark-shell --conf spark.hadoop.hive.metastore.schema.verification=false --conf spark.hadoop.datanucleus.schema.autoCreateAll=true -i test_hadoop3.scala
```
Closes #23788 from wangyum/SPARK-23710-hadoop3.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
---
dev/deps/spark-deps-hadoop-3.2 | 3 +-
pom.xml | 164 +++++++++++++++++++++
.../execution/datasources/orc/OrcColumnVector.java | 2 +-
.../sql/execution/datasources/orc/OrcFilters.scala | 10 +-
.../execution/datasources/orc/OrcShimUtils.scala | 10 +-
.../execution/datasources/orc/OrcFilterSuite.scala | 2 +-
sql/hive/pom.xml | 25 ++++
.../scala/org/apache/spark/sql/hive/HiveShim.scala | 73 ++++++---
.../org/apache/spark/sql/hive/HiveUtils.scala | 6 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 2 +
.../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 19 ++-
.../org/apache/spark/sql/hive/orc/OrcFilters.scala | 82 ++++++++---
12 files changed, 333 insertions(+), 65 deletions(-)
diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2
index 326d085..04526f2 100644
--- a/dev/deps/spark-deps-hadoop-3.2
+++ b/dev/deps/spark-deps-hadoop-3.2
@@ -50,7 +50,7 @@ curator-client-2.13.0.jar
curator-framework-2.13.0.jar
curator-recipes-2.13.0.jar
datanucleus-api-jdo-3.2.6.jar
-datanucleus-core-3.2.10.jar
+datanucleus-core-4.1.17.jar
datanucleus-rdbms-3.2.9.jar
derby-10.12.1.1.jar
dnsjava-2.1.7.jar
@@ -76,6 +76,7 @@ hadoop-yarn-common-3.2.0.jar
hadoop-yarn-registry-3.2.0.jar
hadoop-yarn-server-common-3.2.0.jar
hadoop-yarn-server-web-proxy-3.2.0.jar
+hive-storage-api-2.6.0.jar
hk2-api-2.4.0-b34.jar
hk2-locator-2.4.0-b34.jar
hk2-utils-2.4.0-b34.jar
diff --git a/pom.xml b/pom.xml
index 62eb16d..da8e18e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
<hive.classifier></hive.classifier>
<!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1.spark2</hive.version>
+ <hive23.version>2.3.4</hive23.version>
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
@@ -1414,6 +1415,37 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
+ <!-- Begin of Hive 2.3 exclusion -->
+ <!--
+ ORC is needed, but the version should be consistent with the `sql/core` ORC data source.
+ Looks like this is safe, please see the major changes from ORC 1.3.3 to 1.5.4:
+ HIVE-17631 and HIVE-19465
+ -->
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ <!-- jetty-all conflict with jetty 9.4.12.v20180830 -->
+ <exclusion>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>jetty-all</artifactId>
+ </exclusion>
+ <!-- org.apache.logging.log4j:* conflict with log4j 1.2.17 -->
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <!-- Hive includes javax.servlet to fix the Hive on Spark test failure; see HIVE-12783 -->
+ <exclusion>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <!-- hive-storage-api is needed and must be explicitly included later -->
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-storage-api</artifactId>
+ </exclusion>
+ <!-- End of Hive 2.3 exclusion -->
</exclusions>
</dependency>
@@ -1532,6 +1564,27 @@
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
+ <!-- Begin of Hive 2.3 exclusion -->
+ <!-- Do not need Tez -->
+ <exclusion>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-llap-tez</artifactId>
+ </exclusion>
+ <!-- Do not need Calcite, see SPARK-27054 -->
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-druid</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ </exclusion>
+ <!-- org.apache.logging.log4j:* conflict with log4j 1.2.17 -->
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <!-- End of Hive 2.3 exclusion -->
</exclusions>
</dependency>
<dependency>
@@ -1640,6 +1693,17 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <!-- Begin of Hive 2.3 exclusion -->
+ <!-- Hive removes the HBase Metastore; see HIVE-17234 -->
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>co.cask.tephra</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <!-- End of Hive 2.3 exclusion -->
</exclusions>
</dependency>
@@ -1697,6 +1761,22 @@
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
+ <!-- Begin of Hive 2.3 exclusion -->
+ <!-- parquet-hadoop-bundle:1.8.1 conflict with 1.10.1 -->
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ </exclusion>
+ <!-- Do not need Jasper, see HIVE-19799 -->
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <!-- End of Hive 2.3 exclusion -->
</exclusions>
</dependency>
@@ -1762,8 +1842,76 @@
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
+ <!-- Begin of Hive 2.3 exclusion -->
+ <!-- Exclude log4j-slf4j-impl, otherwise throw NCDFE when starting spark-shell -->
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <!-- End of Hive 2.3 exclusion -->
+ </exclusions>
+ </dependency>
+
+ <!-- hive-llap-common is needed when registering UDFs in Hive 2.3.
+ We add it here, otherwise -Phive-provided won't work. -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-common</artifactId>
+ <version>${hive23.version}</version>
+ <scope>${hive.deps.scope}</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <!-- hive-llap-client is needed when run MapReduce test in Hive 2.3. -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-client</artifactId>
+ <version>${hive23.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>apache-curator</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
@@ -2656,7 +2804,23 @@
<hadoop.version>3.2.0</hadoop.version>
<curator.version>2.13.0</curator.version>
<zookeeper.version>3.4.13</zookeeper.version>
+ <hive.group>org.apache.hive</hive.group>
+ <hive.classifier>core</hive.classifier>
+ <hive.version>${hive23.version}</hive.version>
+ <hive.version.short>2.3.4</hive.version.short>
+ <hive.parquet.group>org.apache.parquet</hive.parquet.group>
+ <hive.parquet.version>1.8.1</hive.parquet.version>
+ <orc.classifier></orc.classifier>
+ <datanucleus-core.version>4.1.17</datanucleus-core.version>
</properties>
+ <dependencies>
+ <!-- Both Hive and ORC need hive-storage-api, but it is excluded by orc-mapreduce -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-storage-api</artifactId>
+ <version>2.6.0</version>
+ </dependency>
+ </dependencies>
</profile>
<profile>
diff --git a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 9bfad1e..2f1925e 100644
--- a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ b/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc;
import java.math.BigDecimal;
-import org.apache.orc.storage.ql.exec.vector.*;
+import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 112dcb2..85d61bc 100644
--- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.execution.datasources.orc
-import org.apache.orc.storage.common.`type`.HiveDecimal
-import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
-import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
-import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
-import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
index 68503ab..c32f024 100644
--- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
+++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.datasources.orc
import java.sql.Date
-import org.apache.orc.storage.common.`type`.HiveDecimal
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch
-import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument}
-import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
-import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
+import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument => OrcSearchArgument}
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
+import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable}
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.types.Decimal
diff --git a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index e96c6fb..1ed42f1 100644
--- a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
-import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
+import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions._
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 55afbe7..f627227 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -208,6 +208,31 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>hadoop-3.2</id>
+ <dependencies>
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-serde</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-shims</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-client</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<build>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
index c9fc3d4..be4a0c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.io.{InputStream, OutputStream}
+import java.lang.reflect.Method
import java.rmi.server.UID
import scala.collection.JavaConverters._
@@ -28,15 +29,13 @@ import com.google.common.base.Objects
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
+import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
import org.apache.hadoop.io.Writable
-import org.apache.hive.com.esotericsoftware.kryo.Kryo
-import org.apache.hive.com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.types.Decimal
@@ -146,34 +145,60 @@ private[hive] object HiveShim {
case _ => false
}
- @transient
- def deserializeObjectByKryo[T: ClassTag](
- kryo: Kryo,
- in: InputStream,
- clazz: Class[_]): T = {
- val inp = new Input(in)
- val t: T = kryo.readObject(inp, clazz).asInstanceOf[T]
- inp.close()
- t
- }
+ private lazy val serUtilClass =
+ Utils.classForName("org.apache.hadoop.hive.ql.exec.SerializationUtilities")
+ private lazy val utilClass = Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities")
+ private val deserializeMethodName = "deserializeObjectByKryo"
+ private val serializeMethodName = "serializeObjectByKryo"
- @transient
- def serializeObjectByKryo(
- kryo: Kryo,
- plan: Object,
- out: OutputStream) {
- val output: Output = new Output(out)
- kryo.writeObject(output, plan)
- output.close()
+ private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
+ val method = klass.getDeclaredMethod(name, args: _*)
+ method.setAccessible(true)
+ method
}
def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
- deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
- .asInstanceOf[UDFType]
+ if (HiveUtils.isHive23) {
+ val borrowKryo = serUtilClass.getMethod("borrowKryo")
+ val kryo = borrowKryo.invoke(serUtilClass)
+ val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName,
+ kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]])
+ try {
+ deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
+ } finally {
+ serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
+ }
+ } else {
+ val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
+ val threadLocalValue = runtimeSerializationKryo.get(utilClass)
+ val getMethod = threadLocalValue.getClass.getMethod("get")
+ val kryo = getMethod.invoke(threadLocalValue)
+ val deserializeObjectByKryo = findMethod(utilClass, deserializeMethodName,
+ kryo.getClass, classOf[InputStream], classOf[Class[_]])
+ deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
+ }
}
def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
- serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
+ if (HiveUtils.isHive23) {
+ val borrowKryo = serUtilClass.getMethod("borrowKryo")
+ val kryo = borrowKryo.invoke(serUtilClass)
+ val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName,
+ kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream])
+ try {
+ serializeObjectByKryo.invoke(null, kryo, function, out)
+ } finally {
+ serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
+ }
+ } else {
+ val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
+ val threadLocalValue = runtimeSerializationKryo.get(utilClass)
+ val getMethod = threadLocalValue.getClass.getMethod("get")
+ val kryo = getMethod.invoke(threadLocalValue)
+ val serializeObjectByKryo = findMethod(utilClass, serializeMethodName,
+ kryo.getClass, classOf[Object], classOf[OutputStream])
+ serializeObjectByKryo.invoke(null, kryo, function, out)
+ }
}
def writeExternal(out: java.io.ObjectOutput) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 01a503d..773bf31 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.util.VersionInfo
+import org.apache.hive.common.util.HiveVersionInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -54,8 +55,11 @@ private[spark] object HiveUtils extends Logging {
sc
}
+ private val hiveVersion = HiveVersionInfo.getVersion
+ val isHive23: Boolean = hiveVersion.startsWith("2.3")
+
/** The version of hive used internally by Spark SQL. */
- val builtinHiveVersion: String = "1.2.1"
+ val builtinHiveVersion: String = if (isHive23) hiveVersion else "1.2.1"
val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8132dee..640cca0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -710,6 +710,8 @@ private[hive] class HiveClientImpl(
/**
* Execute the command using Hive and return the results as a sequence. Each element
* in the sequence is one row.
+ * Since upgrading the built-in Hive to 2.3, hive-llap-client is needed when
+ * running MapReduce jobs with `runHive`.
*/
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
logDebug(s"Running hiveql '$cmd'")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 8ece4b5..0938576 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive
+import java.lang.{Boolean => JBoolean}
import java.nio.ByteBuffer
import scala.collection.JavaConverters._
@@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.types._
-
+import org.apache.spark.util.Utils
private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
@@ -336,8 +337,20 @@ private[hive] case class HiveUDAFFunction(
funcWrapper.createFunction[AbstractGenericUDAFResolver]()
}
- val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
- resolver.getEvaluator(parameterInfo)
+ val clazz = Utils.classForName(classOf[SimpleGenericUDAFParameterInfo].getName)
+ if (HiveUtils.isHive23) {
+ val ctor = clazz.getDeclaredConstructor(
+ classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE)
+ val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE)
+ val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
+ resolver.getEvaluator(parameterInfo)
+ } else {
+ val ctor = clazz.getDeclaredConstructor(
+ classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE)
+ val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE)
+ val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
+ resolver.getEvaluator(parameterInfo)
+ }
}
private case class HiveEvaluator(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index a82576a..dfac73c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -17,12 +17,16 @@
package org.apache.spark.sql.hive.orc
+import java.lang.reflect.Method
+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.orc.{OrcFilters => DatasourceOrcFilters}
import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -57,22 +61,33 @@ import org.apache.spark.sql.types._
* known to be convertible.
*/
private[orc] object OrcFilters extends Logging {
+
+ private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
+ val method = klass.getMethod(name, args: _*)
+ method.setAccessible(true)
+ method
+ }
+
def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = {
- val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
-
- // First, tries to convert each filter individually to see whether it's convertible, and then
- // collect all convertible ones to build the final `SearchArgument`.
- val convertibleFilters = for {
- filter <- filters
- _ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
- } yield filter
-
- for {
- // Combines all convertible filters using `And` to produce a single conjunction
- conjunction <- buildTree(convertibleFilters)
- // Then tries to build a single ORC `SearchArgument` for the conjunction predicate
- builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
- } yield builder.build()
+ if (HiveUtils.isHive23) {
+ DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]]
+ } else {
+ val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
+
+ // First, tries to convert each filter individually to see whether it's convertible, and then
+ // collect all convertible ones to build the final `SearchArgument`.
+ val convertibleFilters = for {
+ filter <- filters
+ _ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
+ } yield filter
+
+ for {
+ // Combines all convertible filters using `And` to produce a single conjunction
+ conjunction <- buildTree(convertibleFilters)
+ // Then tries to build a single ORC `SearchArgument` for the conjunction predicate
+ builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
+ } yield builder.build()
+ }
}
private def buildSearchArgument(
@@ -160,31 +175,50 @@ private[orc] object OrcFilters extends Logging {
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startAnd().equals(attribute, value).end())
+ val bd = builder.startAnd()
+ val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object])
+ Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startAnd().nullSafeEquals(attribute, value).end())
+ val bd = builder.startAnd()
+ val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object])
+ Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startAnd().lessThan(attribute, value).end())
+ val bd = builder.startAnd()
+ val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object])
+ Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startAnd().lessThanEquals(attribute, value).end())
+ val bd = builder.startAnd()
+ val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object])
+ Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startNot().lessThanEquals(attribute, value).end())
+ val bd = builder.startNot()
+ val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object])
+ Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startNot().lessThan(attribute, value).end())
+ val bd = builder.startNot()
+ val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object])
+ Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startAnd().isNull(attribute).end())
+ val bd = builder.startAnd()
+ val method = findMethod(bd.getClass, "isNull", classOf[String])
+ Some(method.invoke(bd, attribute).asInstanceOf[Builder].end())
case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startNot().isNull(attribute).end())
+ val bd = builder.startNot()
+ val method = findMethod(bd.getClass, "isNull", classOf[String])
+ Some(method.invoke(bd, attribute).asInstanceOf[Builder].end())
case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
- Some(builder.startAnd().in(attribute, values.map(_.asInstanceOf[AnyRef]): _*).end())
+ val bd = builder.startAnd()
+ val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]])
+ Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef]))
+ .asInstanceOf[Builder].end())
case _ => None
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org