You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/04/08 15:42:49 UTC

[spark] branch master updated: [SPARK-27176][SQL] Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 33f3c48  [SPARK-27176][SQL] Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4
33f3c48 is described below

commit 33f3c48cac087e079b9c7e342c2e58b16eaaa681
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Mon Apr 8 08:42:21 2019 -0700

    [SPARK-27176][SQL] Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4
    
    ## What changes were proposed in this pull request?
    
    This PR mainly contains:
    1. Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4.
    2. Resolve compatibility issues between Hive 1.2.1 and Hive 2.3.4 in the `sql/hive` module.
    
    ## How was this patch tested?
    jenkins test hadoop-2.7
    manual test hadoop-3:
    ```shell
    build/sbt clean package -Phadoop-3.2 -Phive
    export SPARK_PREPEND_CLASSES=true
    
    # rm -rf metastore_db
    
    cat <<EOF > test_hadoop3.scala
    spark.range(10).write.saveAsTable("test_hadoop3")
    spark.table("test_hadoop3").show
    EOF
    
    bin/spark-shell --conf spark.hadoop.hive.metastore.schema.verification=false --conf spark.hadoop.datanucleus.schema.autoCreateAll=true -i test_hadoop3.scala
    ```
    
    Closes #23788 from wangyum/SPARK-23710-hadoop3.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 dev/deps/spark-deps-hadoop-3.2                     |   3 +-
 pom.xml                                            | 164 +++++++++++++++++++++
 .../execution/datasources/orc/OrcColumnVector.java |   2 +-
 .../sql/execution/datasources/orc/OrcFilters.scala |  10 +-
 .../execution/datasources/orc/OrcShimUtils.scala   |  10 +-
 .../execution/datasources/orc/OrcFilterSuite.scala |   2 +-
 sql/hive/pom.xml                                   |  25 ++++
 .../scala/org/apache/spark/sql/hive/HiveShim.scala |  73 ++++++---
 .../org/apache/spark/sql/hive/HiveUtils.scala      |   6 +-
 .../spark/sql/hive/client/HiveClientImpl.scala     |   2 +
 .../scala/org/apache/spark/sql/hive/hiveUDFs.scala |  19 ++-
 .../org/apache/spark/sql/hive/orc/OrcFilters.scala |  82 ++++++++---
 12 files changed, 333 insertions(+), 65 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2
index 326d085..04526f2 100644
--- a/dev/deps/spark-deps-hadoop-3.2
+++ b/dev/deps/spark-deps-hadoop-3.2
@@ -50,7 +50,7 @@ curator-client-2.13.0.jar
 curator-framework-2.13.0.jar
 curator-recipes-2.13.0.jar
 datanucleus-api-jdo-3.2.6.jar
-datanucleus-core-3.2.10.jar
+datanucleus-core-4.1.17.jar
 datanucleus-rdbms-3.2.9.jar
 derby-10.12.1.1.jar
 dnsjava-2.1.7.jar
@@ -76,6 +76,7 @@ hadoop-yarn-common-3.2.0.jar
 hadoop-yarn-registry-3.2.0.jar
 hadoop-yarn-server-common-3.2.0.jar
 hadoop-yarn-server-web-proxy-3.2.0.jar
+hive-storage-api-2.6.0.jar
 hk2-api-2.4.0-b34.jar
 hk2-locator-2.4.0-b34.jar
 hk2-utils-2.4.0-b34.jar
diff --git a/pom.xml b/pom.xml
index 62eb16d..da8e18e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
     <hive.classifier></hive.classifier>
     <!-- Version used in Maven Hive dependency -->
     <hive.version>1.2.1.spark2</hive.version>
+    <hive23.version>2.3.4</hive23.version>
     <!-- Version used for internal directory structure -->
     <hive.version.short>1.2.1</hive.version.short>
     <!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
@@ -1414,6 +1415,37 @@
             <groupId>commons-logging</groupId>
             <artifactId>commons-logging</artifactId>
           </exclusion>
+          <!-- Begin of Hive 2.3 exclusion -->
+          <!--
+            ORC is needed, but the version should be consistent with the `sql/core` ORC data source.
+            Looks like this is safe, please see the major changes from ORC 1.3.3 to 1.5.4:
+            HIVE-17631 and HIVE-19465
+          -->
+          <exclusion>
+            <groupId>org.apache.orc</groupId>
+            <artifactId>orc-core</artifactId>
+          </exclusion>
+          <!-- jetty-all conflict with jetty 9.4.12.v20180830 -->
+          <exclusion>
+            <groupId>org.eclipse.jetty.aggregate</groupId>
+            <artifactId>jetty-all</artifactId>
+          </exclusion>
+          <!-- org.apache.logging.log4j:* conflict with log4j 1.2.17 -->
+          <exclusion>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+          <!-- Hive includes javax.servlet to fix the Hive on Spark test failure; see HIVE-12783 -->
+          <exclusion>
+            <groupId>org.eclipse.jetty.orbit</groupId>
+            <artifactId>javax.servlet</artifactId>
+          </exclusion>
+          <!-- hive-storage-api is needed and must be explicitly included later -->
+          <exclusion>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-storage-api</artifactId>
+          </exclusion>
+          <!-- End of Hive 2.3 exclusion -->
         </exclusions>
       </dependency>
 
@@ -1532,6 +1564,27 @@
             <groupId>org.json</groupId>
             <artifactId>json</artifactId>
           </exclusion>
+          <!-- Begin of Hive 2.3 exclusion -->
+          <!-- Do not need Tez -->
+          <exclusion>
+            <groupId>${hive.group}</groupId>
+            <artifactId>hive-llap-tez</artifactId>
+          </exclusion>
+          <!-- Do not need Calcite, see SPARK-27054 -->
+          <exclusion>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-druid</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.calcite.avatica</groupId>
+            <artifactId>avatica</artifactId>
+          </exclusion>
+          <!-- org.apache.logging.log4j:* conflict with log4j 1.2.17 -->
+          <exclusion>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+          <!-- End of Hive 2.3 exclusion -->
         </exclusions>
       </dependency>
       <dependency>
@@ -1640,6 +1693,17 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
           </exclusion>
+          <!-- Begin of Hive 2.3 exclusion -->
+          <!-- Hive removes the HBase Metastore; see HIVE-17234 -->
+          <exclusion>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>co.cask.tephra</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+          <!-- End of Hive 2.3 exclusion -->
         </exclusions>
       </dependency>
 
@@ -1697,6 +1761,22 @@
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-all</artifactId>
           </exclusion>
+          <!-- Begin of Hive 2.3 exclusion -->
+          <!-- parquet-hadoop-bundle:1.8.1 conflict with 1.10.1 -->
+          <exclusion>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop-bundle</artifactId>
+          </exclusion>
+          <!-- Do not need Jasper, see HIVE-19799 -->
+          <exclusion>
+            <groupId>tomcat</groupId>
+            <artifactId>jasper-compiler</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>tomcat</groupId>
+            <artifactId>jasper-runtime</artifactId>
+          </exclusion>
+          <!-- End of Hive 2.3 exclusion -->
         </exclusions>
       </dependency>
 
@@ -1762,8 +1842,76 @@
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-all</artifactId>
           </exclusion>
+          <!-- Begin of Hive 2.3 exclusion -->
+          <!-- Exclude log4j-slf4j-impl, otherwise throw NCDFE when starting spark-shell -->
+          <exclusion>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+          </exclusion>
+          <!-- End of Hive 2.3 exclusion -->
+        </exclusions>
+      </dependency>
+
+      <!-- hive-llap-common is needed when registering UDFs in Hive 2.3.
+         We add it here, otherwise -Phive-provided won't work. -->
+      <dependency>
+        <groupId>org.apache.hive</groupId>
+        <artifactId>hive-llap-common</artifactId>
+        <version>${hive23.version}</version>
+        <scope>${hive.deps.scope}</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-serde</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
+      <!-- hive-llap-client is needed when run MapReduce test in Hive 2.3. -->
+      <dependency>
+        <groupId>org.apache.hive</groupId>
+        <artifactId>hive-llap-client</artifactId>
+        <version>${hive23.version}</version>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-serde</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-llap-common</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>apache-curator</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.orc</groupId>
         <artifactId>orc-core</artifactId>
@@ -2656,7 +2804,23 @@
         <hadoop.version>3.2.0</hadoop.version>
         <curator.version>2.13.0</curator.version>
         <zookeeper.version>3.4.13</zookeeper.version>
+        <hive.group>org.apache.hive</hive.group>
+        <hive.classifier>core</hive.classifier>
+        <hive.version>${hive23.version}</hive.version>
+        <hive.version.short>2.3.4</hive.version.short>
+        <hive.parquet.group>org.apache.parquet</hive.parquet.group>
+        <hive.parquet.version>1.8.1</hive.parquet.version>
+        <orc.classifier></orc.classifier>
+        <datanucleus-core.version>4.1.17</datanucleus-core.version>
       </properties>
+      <dependencies>
+        <!-- Both Hive and ORC need hive-storage-api, but it is excluded by orc-mapreduce -->
+        <dependency>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-storage-api</artifactId>
+          <version>2.6.0</version>
+        </dependency>
+      </dependencies>
     </profile>
 
     <profile>
diff --git a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 9bfad1e..2f1925e 100644
--- a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ b/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc;
 
 import java.math.BigDecimal;
 
-import org.apache.orc.storage.ql.exec.vector.*;
+import org.apache.hadoop.hive.ql.exec.vector.*;
 
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 112dcb2..85d61bc 100644
--- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.execution.datasources.orc
 
-import org.apache.orc.storage.common.`type`.HiveDecimal
-import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
-import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
-import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
-import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
 
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
index 68503ab..c32f024 100644
--- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
+++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.datasources.orc
 
 import java.sql.Date
 
-import org.apache.orc.storage.common.`type`.HiveDecimal
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch
-import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument}
-import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
-import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
+import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument => OrcSearchArgument}
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
+import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable}
 
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.types.Decimal
diff --git a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index e96c6fb..1ed42f1 100644
--- a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
 
 import scala.collection.JavaConverters._
 
-import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
+import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
 
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
 import org.apache.spark.sql.catalyst.dsl.expressions._
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 55afbe7..f627227 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -208,6 +208,31 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>hadoop-3.2</id>
+      <dependencies>
+        <dependency>
+          <groupId>${hive.group}</groupId>
+          <artifactId>hive-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>${hive.group}</groupId>
+          <artifactId>hive-serde</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>${hive.group}</groupId>
+          <artifactId>hive-shims</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-llap-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-llap-client</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 
   <build>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
index c9fc3d4..be4a0c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.{InputStream, OutputStream}
+import java.lang.reflect.Method
 import java.rmi.server.UID
 
 import scala.collection.JavaConverters._
@@ -28,15 +29,13 @@ import com.google.common.base.Objects
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
+import org.apache.hadoop.hive.ql.exec.UDF
 import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
 import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
 import org.apache.hadoop.io.Writable
-import org.apache.hive.com.esotericsoftware.kryo.Kryo
-import org.apache.hive.com.esotericsoftware.kryo.io.{Input, Output}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.Decimal
@@ -146,34 +145,60 @@ private[hive] object HiveShim {
       case _ => false
     }
 
-    @transient
-    def deserializeObjectByKryo[T: ClassTag](
-        kryo: Kryo,
-        in: InputStream,
-        clazz: Class[_]): T = {
-      val inp = new Input(in)
-      val t: T = kryo.readObject(inp, clazz).asInstanceOf[T]
-      inp.close()
-      t
-    }
+    private lazy val serUtilClass =
+      Utils.classForName("org.apache.hadoop.hive.ql.exec.SerializationUtilities")
+    private lazy val utilClass = Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities")
+    private val deserializeMethodName = "deserializeObjectByKryo"
+    private val serializeMethodName = "serializeObjectByKryo"
 
-    @transient
-    def serializeObjectByKryo(
-        kryo: Kryo,
-        plan: Object,
-        out: OutputStream) {
-      val output: Output = new Output(out)
-      kryo.writeObject(output, plan)
-      output.close()
+    private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
+      val method = klass.getDeclaredMethod(name, args: _*)
+      method.setAccessible(true)
+      method
     }
 
     def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
-      deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
-        .asInstanceOf[UDFType]
+      if (HiveUtils.isHive23) {
+        val borrowKryo = serUtilClass.getMethod("borrowKryo")
+        val kryo = borrowKryo.invoke(serUtilClass)
+        val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName,
+          kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]])
+        try {
+          deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
+        } finally {
+          serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
+        }
+      } else {
+        val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
+        val threadLocalValue = runtimeSerializationKryo.get(utilClass)
+        val getMethod = threadLocalValue.getClass.getMethod("get")
+        val kryo = getMethod.invoke(threadLocalValue)
+        val deserializeObjectByKryo = findMethod(utilClass, deserializeMethodName,
+          kryo.getClass, classOf[InputStream], classOf[Class[_]])
+        deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
+      }
     }
 
     def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
-      serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
+      if (HiveUtils.isHive23) {
+        val borrowKryo = serUtilClass.getMethod("borrowKryo")
+        val kryo = borrowKryo.invoke(serUtilClass)
+        val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName,
+          kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream])
+        try {
+          serializeObjectByKryo.invoke(null, kryo, function, out)
+        } finally {
+          serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
+        }
+      } else {
+        val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
+        val threadLocalValue = runtimeSerializationKryo.get(utilClass)
+        val getMethod = threadLocalValue.getClass.getMethod("get")
+        val kryo = getMethod.invoke(threadLocalValue)
+        val serializeObjectByKryo = findMethod(utilClass, serializeMethodName,
+          kryo.getClass, classOf[Object], classOf[OutputStream])
+        serializeObjectByKryo.invoke(null, kryo, function, out)
+      }
     }
 
     def writeExternal(out: java.io.ObjectOutput) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 01a503d..773bf31 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.util.VersionInfo
+import org.apache.hive.common.util.HiveVersionInfo
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -54,8 +55,11 @@ private[spark] object HiveUtils extends Logging {
     sc
   }
 
+  private val hiveVersion = HiveVersionInfo.getVersion
+  val isHive23: Boolean = hiveVersion.startsWith("2.3")
+
   /** The version of hive used internally by Spark SQL. */
-  val builtinHiveVersion: String = "1.2.1"
+  val builtinHiveVersion: String = if (isHive23) hiveVersion else "1.2.1"
 
   val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
     .doc("Version of the Hive metastore. Available options are " +
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8132dee..640cca0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -710,6 +710,8 @@ private[hive] class HiveClientImpl(
   /**
    * Execute the command using Hive and return the results as a sequence. Each element
    * in the sequence is one row.
+   * Since upgrading the built-in Hive to 2.3, hive-llap-client is needed when
+   * running MapReduce jobs with `runHive`.
    */
   protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
     logDebug(s"Running hiveql '$cmd'")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 8ece4b5..0938576 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import java.lang.{Boolean => JBoolean}
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
@@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.sql.types._
-
+import org.apache.spark.util.Utils
 
 private[hive] case class HiveSimpleUDF(
     name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
@@ -336,8 +337,20 @@ private[hive] case class HiveUDAFFunction(
       funcWrapper.createFunction[AbstractGenericUDAFResolver]()
     }
 
-    val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
-    resolver.getEvaluator(parameterInfo)
+    val clazz = Utils.classForName(classOf[SimpleGenericUDAFParameterInfo].getName)
+    if (HiveUtils.isHive23) {
+      val ctor = clazz.getDeclaredConstructor(
+        classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE)
+      val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE)
+      val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
+      resolver.getEvaluator(parameterInfo)
+    } else {
+      val ctor = clazz.getDeclaredConstructor(
+        classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE)
+      val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE)
+      val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
+      resolver.getEvaluator(parameterInfo)
+    }
   }
 
   private case class HiveEvaluator(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index a82576a..dfac73c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -17,12 +17,16 @@
 
 package org.apache.spark.sql.hive.orc
 
+import java.lang.reflect.Method
+
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.orc.{OrcFilters => DatasourceOrcFilters}
 import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree
+import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 
@@ -57,22 +61,33 @@ import org.apache.spark.sql.types._
  * known to be convertible.
  */
 private[orc] object OrcFilters extends Logging {
+
+  private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
+    val method = klass.getMethod(name, args: _*)
+    method.setAccessible(true)
+    method
+  }
+
   def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = {
-    val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
-
-    // First, tries to convert each filter individually to see whether it's convertible, and then
-    // collect all convertible ones to build the final `SearchArgument`.
-    val convertibleFilters = for {
-      filter <- filters
-      _ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
-    } yield filter
-
-    for {
-      // Combines all convertible filters using `And` to produce a single conjunction
-      conjunction <- buildTree(convertibleFilters)
-      // Then tries to build a single ORC `SearchArgument` for the conjunction predicate
-      builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
-    } yield builder.build()
+    if (HiveUtils.isHive23) {
+      DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]]
+    } else {
+      val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
+
+      // First, tries to convert each filter individually to see whether it's convertible, and then
+      // collect all convertible ones to build the final `SearchArgument`.
+      val convertibleFilters = for {
+        filter <- filters
+        _ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
+      } yield filter
+
+      for {
+        // Combines all convertible filters using `And` to produce a single conjunction
+        conjunction <- buildTree(convertibleFilters)
+        // Then tries to build a single ORC `SearchArgument` for the conjunction predicate
+        builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
+      } yield builder.build()
+    }
   }
 
   private def buildSearchArgument(
@@ -160,31 +175,50 @@ private[orc] object OrcFilters extends Logging {
       // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
 
       case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startAnd().equals(attribute, value).end())
+        val bd = builder.startAnd()
+        val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object])
+        Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
 
       case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startAnd().nullSafeEquals(attribute, value).end())
+        val bd = builder.startAnd()
+        val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object])
+        Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
 
       case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startAnd().lessThan(attribute, value).end())
+        val bd = builder.startAnd()
+        val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object])
+        Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
 
       case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startAnd().lessThanEquals(attribute, value).end())
+        val bd = builder.startAnd()
+        val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object])
+        Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
 
       case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startNot().lessThanEquals(attribute, value).end())
+        val bd = builder.startNot()
+        val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object])
+        Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
 
       case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startNot().lessThan(attribute, value).end())
+        val bd = builder.startNot()
+        val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object])
+        Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
 
       case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startAnd().isNull(attribute).end())
+        val bd = builder.startAnd()
+        val method = findMethod(bd.getClass, "isNull", classOf[String])
+        Some(method.invoke(bd, attribute).asInstanceOf[Builder].end())
 
       case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startNot().isNull(attribute).end())
+        val bd = builder.startNot()
+        val method = findMethod(bd.getClass, "isNull", classOf[String])
+        Some(method.invoke(bd, attribute).asInstanceOf[Builder].end())
 
       case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startAnd().in(attribute, values.map(_.asInstanceOf[AnyRef]): _*).end())
+        val bd = builder.startAnd()
+        val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]])
+        Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef]))
+          .asInstanceOf[Builder].end())
 
       case _ => None
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org