You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/12 10:10:27 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2024][FOLLOWUP] Hive Backend Engine - ProcBuilder for HiveEngine
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a56e4b4f0 [KYUUBI #2024][FOLLOWUP] Hive Backend Engine - ProcBuilder for HiveEngine
a56e4b4f0 is described below
commit a56e4b4f0189ffcebaafc7be328e88255d449a33
Author: Kent Yao <ya...@apache.org>
AuthorDate: Tue Apr 12 18:10:18 2022 +0800
[KYUUBI #2024][FOLLOWUP] Hive Backend Engine - ProcBuilder for HiveEngine
### _Why are the changes needed?_
Make the HiveProcBuilder actually work.
- hive 2.3.9 has scala 2.11 deps, so not compatible with kyuubi common, thus, upgrade 3.1.x
- hive 3.1 does not work with java 11 as https://issues.apache.org/jira/browse/HIVE-21237 bug
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2290 from yaooqinn/2024.
Closes #2024
83e07fa5 [Kent Yao] fix
1e7167f8 [Kent Yao] import
ac7853ac [Kent Yao] jdk11
ba9f6c33 [Kent Yao] temp
44049736 [Kent Yao] temp
fd624307 [Kent Yao] temp
a4a5e42f [Kent Yao] temp
b01bb226 [Kent Yao] temp
298fc478 [Kent Yao] temp
5630857a [Kent Yao] temp
98457b6e [Kent Yao] temp
18d8f5cb [Kent Yao] [KYUUBI #2024][FOLLOWUP] Hive Backend Engine - ProcBuilder for HiveEngine
246681dd [Kent Yao] [KYUUBI #2024][FOLLOWUP] Hive Backend Engine - ProcBuilder for HiveEngine
890579b9 [Kent Yao] [KYUUBI #2084][FOLLOWUP] Support arbitrary parameters for KyuubiConf
105b70b6 [Kent Yao] tmp
bcc4a0e2 [Kent Yao] tmp
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Kent Yao <ya...@apache.org>
---
build/dist | 4 +-
externals/kyuubi-download/pom.xml | 7 +
.../kyuubi-hive-sql-engine/bin/hive-sql-engine.sh | 46 ----
externals/kyuubi-hive-sql-engine/pom.xml | 67 +++--
.../src/main/resources/META-INF/LICENSE | 8 +-
.../src/main/resources/META-INF/NOTICE | 8 +-
.../apache/kyuubi/engine/hive/HiveSQLEngine.scala | 9 +
.../engine/hive/operation/HiveOperation.scala | 5 +-
.../hive/operation/HiveOperationManager.scala | 7 +-
.../engine/hive/session/HiveSessionImpl.scala | 3 +-
.../engine/hive/session/HiveSessionManager.scala | 5 +-
.../hive/event/HiveEventLoggingServiceSuite.scala | 8 +-
.../engine/hive/operation/HiveOperationSuite.scala | 297 +--------------------
.../apache/kyuubi/operation/OperationManager.scala | 4 +-
.../operation/log/Log4j2DivertAppender.scala | 3 +-
.../kyuubi/operation/log/LogDivertAppender.scala | 18 +-
.../scala/org/apache/kyuubi/HiveEngineTests.scala | 40 +--
kyuubi-server/pom.xml | 7 +
.../org/apache/kyuubi/engine/ProcBuilder.scala | 50 +++-
.../kyuubi/engine/hive/HiveProcessBuilder.scala | 86 +++---
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 30 +--
.../scala/org/apache/kyuubi/WithKyuubiServer.scala | 5 +-
.../engine/hive/HiveProcessBuilderSuite.scala | 8 +-
.../hive/KyuubiOperationWithHiveEngineSuite.scala} | 24 +-
pom.xml | 5 +-
25 files changed, 266 insertions(+), 488 deletions(-)
diff --git a/build/dist b/build/dist
index 6db68d6bd..ef949d177 100755
--- a/build/dist
+++ b/build/dist
@@ -262,9 +262,7 @@ chmod a+x "$DISTDIR/externals/engines/trino/bin/trino-engine.sh"
cp "$KYUUBI_HOME/externals/kyuubi-trino-engine/target/kyuubi-trino-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/trino/jars"
# Copy hive engines
-cp -r "$KYUUBI_HOME/externals/kyuubi-hive-sql-engine/bin/" "$DISTDIR/externals/engines/hive/bin/"
-chmod a+x "$DISTDIR/externals/engines/hive/bin/hive-sql-engine.sh"
-cp "$KYUUBI_HOME/externals/kyuubi-hive-sql-engine/target/kyuubi-hive-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/hive/jars"
+cp "$KYUUBI_HOME/externals/kyuubi-hive-sql-engine/target/kyuubi-hive-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/hive"
# Copy kyuubi tools
if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner_${SCALA_VERSION}-${VERSION}.jar" ]]; then
diff --git a/externals/kyuubi-download/pom.xml b/externals/kyuubi-download/pom.xml
index 32579a07b..299f2f1cb 100644
--- a/externals/kyuubi-download/pom.xml
+++ b/externals/kyuubi-download/pom.xml
@@ -79,6 +79,13 @@
<unpack>true</unpack>
</configuration>
</plugin>
+
+ <plugin>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/externals/kyuubi-hive-sql-engine/bin/hive-sql-engine.sh b/externals/kyuubi-hive-sql-engine/bin/hive-sql-engine.sh
deleted file mode 100755
index b7d80fc7f..000000000
--- a/externals/kyuubi-hive-sql-engine/bin/hive-sql-engine.sh
+++ /dev/null
@@ -1,46 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-if [[ -z ${JAVA_HOME} ]]; then
- echo "[ERROR] JAVA_HOME IS NOT SET! CANNOT PROCEED."
- exit 1
-fi
-
-RUNNER="${JAVA_HOME}/bin/java"
-
-if [[ "$HIVE_ENGINE_HOME" == "$KYUUBI_HOME/externals/engines/hive" ]]; then
- HIVE_CLIENT_JAR="$HIVE_ENGINE_JAR"
- HIVE_CLIENT_JARS_DIR="$HIVE_ENGINE_HOME/jars"
-else
- echo "\nHIVE_ENGINE_HOME $HIVE_ENGINE_HOME doesn't match production directory, assuming in development environment..."
- HIVE_CLIENT_JAR=$(find $HIVE_ENGINE_HOME/target -regex '.*/kyuubi-hive-sql-engine_.*.jar$' | grep -v '\-sources.jar$' | grep -v '\-javadoc.jar$' | grep -v '\-tests.jar$')
- HIVE_CLIENT_JARS_DIR=$(find $HIVE_ENGINE_HOME/target -regex '.*/jars')
-fi
-
-HIVE_CLIENT_CLASSPATH="$HIVE_CLIENT_JARS_DIR/*"
-if [[ -z ${YARN_CONF_DIR} ]]; then
- FULL_CLASSPATH="$HIVE_CLIENT_CLASSPATH:$HIVE_CLIENT_JAR:$HADOOP_CONF_DIR:$HIVE_CONF_DIR"
-else
- FULL_CLASSPATH="$HIVE_CLIENT_CLASSPATH:$HIVE_CLIENT_JAR:$HADOOP_CONF_DIR:$HIVE_CONF_DIR:$YARN_CONF_DIR"
-fi
-
-if [ -n "$HIVE_CLIENT_JAR" ]; then
- exec $RUNNER ${HIVE_ENGINE_DYNAMIC_ARGS} -cp ${FULL_CLASSPATH} org.apache.kyuubi.engine.hive.HiveSQLEngine "$@"
-else
- (>&2 echo "[ERROR] HIVE Engine JAR file 'kyuubi-hive-sql-engine*.jar' should be located in $HIVE_ENGINE_HOME/jars.")
- exit 1
-fi
diff --git a/externals/kyuubi-hive-sql-engine/pom.xml b/externals/kyuubi-hive-sql-engine/pom.xml
index aa7c5ef03..5d328041b 100644
--- a/externals/kyuubi-hive-sql-engine/pom.xml
+++ b/externals/kyuubi-hive-sql-engine/pom.xml
@@ -46,10 +46,45 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service-rpc</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>failureaccess</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
- <version>${hive.version}</version>
+ <version>${hive.engine.hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
@@ -74,7 +109,7 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
- <version>${hive.version}</version>
+ <version>${hive.engine.hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
@@ -125,13 +160,16 @@
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
+ <include>org.scala-lang:scala-library</include>
+ <include>com.fasterxml.jackson.core:jackson-annotations</include>
+ <include>com.fasterxml.jackson.core:jackson-core</include>
+ <include>com.fasterxml.jackson.core:jackson-databind</include>
+ <include>com.fasterxml.jackson.module:jackson-module-scala_2.12</include>
<include>org.apache.kyuubi:kyuubi-common_${scala.binary.version}</include>
<include>org.apache.kyuubi:kyuubi-ha_${scala.binary.version}</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>org.apache.curator:curator-recipes</include>
- <include>org.apache.hive:hive-service-rpc</include>
- <include>org.apache.thrift:*</include>
</includes>
</artifactSet>
<filters>
@@ -160,27 +198,6 @@
<include>org.apache.curator.**</include>
</includes>
</relocation>
- <relocation>
- <pattern>org.apache.hive.service.rpc.thrift</pattern>
- <shadedPattern>${kyuubi.shade.packageName}.org.apache.hive.service.rpc.thrift</shadedPattern>
- <includes>
- <include>org.apache.hive.service.rpc.thrift.**</include>
- </includes>
- </relocation>
- <relocation>
- <pattern>com.facebook.fb303</pattern>
- <shadedPattern>${kyuubi.shade.packageName}.com.facebook.fb303</shadedPattern>
- <includes>
- <include>com.facebook.fb303.**</include>
- </includes>
- </relocation>
- <relocation>
- <pattern>org.apache.thrift</pattern>
- <shadedPattern>${kyuubi.shade.packageName}.org.apache.thrift</shadedPattern>
- <includes>
- <include>org.apache.thrift.**</include>
- </includes>
- </relocation>
</relocations>
</configuration>
<executions>
diff --git a/externals/kyuubi-hive-sql-engine/src/main/resources/META-INF/LICENSE b/externals/kyuubi-hive-sql-engine/src/main/resources/META-INF/LICENSE
index 27088b9c2..0b194d724 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/resources/META-INF/LICENSE
+++ b/externals/kyuubi-hive-sql-engine/src/main/resources/META-INF/LICENSE
@@ -210,6 +210,8 @@ Apache License Version 2.0
org.apache.curator:curator-client
org.apache.curator:curator-framework
org.apache.curator:curator-recipes
-org.apache.hive:hive-service-rpc
-org.apache.thrift:fb303
-org.apache.thrift:libthrift
+org.scala-lang:scala-library
+com.fasterxml.jackson.core:jackson-annotations
+com.fasterxml.jackson.core:jackson-core
+com.fasterxml.jackson.core:jackson-databind
+com.fasterxml.jackson.module:jackson-module-scala_2.12
diff --git a/externals/kyuubi-hive-sql-engine/src/main/resources/META-INF/NOTICE b/externals/kyuubi-hive-sql-engine/src/main/resources/META-INF/NOTICE
index c067e8282..3578d760b 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/resources/META-INF/NOTICE
+++ b/externals/kyuubi-hive-sql-engine/src/main/resources/META-INF/NOTICE
@@ -17,8 +17,6 @@ Copyright 2011-2017 The Apache Software Foundation
Curator Recipes
Copyright 2011-2017 The Apache Software Foundation
-Hive Service RPC
-Copyright 2019 The Apache Software Foundation
-
-Apache Thrift
-Copyright 2006-2010 The Apache Software Foundation.
+org.scala-lang:scala-library
+Copyright (c) 2002-2022 EPFL
+Copyright (c) 2011-2022 Lightbend, Inc.
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
index 62f5dc652..6ffa83327 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
@@ -58,6 +58,7 @@ object HiveSQLEngine extends Logging {
def startEngine(): HiveSQLEngine = {
try {
+ // TODO: hive 2.3.x has scala 2.11 deps.
initLoggerEventHandler(kyuubiConf)
} catch {
case NonFatal(e) =>
@@ -82,6 +83,14 @@ object HiveSQLEngine extends Logging {
hiveConf.set(
"hive.metastore.warehouse.dir",
Utils.createTempDir(namePrefix = "kyuubi_hive_warehouse").toString)
+ hiveConf.set("hive.metastore.fastpath", "true")
+
+ // TODO: Move this to test phase after #2021 resolved
+ val metastore = Utils.createTempDir(namePrefix = "hms_temp")
+ metastore.toFile.delete()
+ hiveConf.set(
+ "javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$metastore;create=true")
}
val engine = new HiveSQLEngine()
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index cdd8f8f66..93ec8ca72 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -58,6 +58,8 @@ abstract class HiveOperation(opType: OperationType, session: Session)
}
override def runInternal(): Unit = {
internalHiveOperation.run()
+ val hasResultSet = internalHiveOperation.getStatus.getHasResultSet
+ setHasResultSet(hasResultSet)
}
override def getBackgroundHandle: Future[_] = {
@@ -73,7 +75,6 @@ abstract class HiveOperation(opType: OperationType, session: Session)
}
override def getStatus: OperationStatus = {
- super.getStatus
val status = internalHiveOperation.getStatus
val state = OperationState.withName(status.getState.name().stripSuffix("_STATE"))
@@ -83,7 +84,7 @@ abstract class HiveOperation(opType: OperationType, session: Session)
status.getOperationStarted,
lastAccessTime,
status.getOperationCompleted,
- status.getHasResultSet,
+ hasResultSet,
Option(status.getOperationException).map(KyuubiSQLException(_)))
}
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index 04aca79ad..3d67290b6 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -32,6 +32,9 @@ import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
class HiveOperationManager() extends OperationManager("HiveOperationManager") {
+ // we use hive's operation log
+ override protected def skipOperationLog: Boolean = true
+
override def newExecuteStatementOperation(
session: Session,
statement: String,
@@ -140,7 +143,9 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
val rowSet = RowSetFactory.create(getLogSchema, operation.getProtocolVersion, false)
val operationLog = internalHiveOperation.getOperationLog
if (operationLog == null) {
- throw KyuubiSQLException("Couldn't find log associated with operation handle: " + opHandle)
+ // TODO: #2029 Operation Log support: set and read hive one directly
+ // throw KyuubiSQLException("Couldn't find log associated with operation handle: " + opHandle)
+ return rowSet.toTRowSet
}
try {
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
index 798b7b767..b6ab2d596 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
@@ -21,7 +21,6 @@ import java.util.HashMap
import scala.collection.JavaConverters._
-import org.apache.hive.service.cli.HiveSQLException
import org.apache.hive.service.cli.session.HiveSession
import org.apache.hive.service.rpc.thrift.TProtocolVersion
@@ -65,7 +64,7 @@ class HiveSessionImpl(
try {
hive.close()
} catch {
- case e: HiveSQLException =>
+ case e: Exception =>
error(s"Failed to close hive runtime session: ${e.getMessage}")
}
}
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
index 048158e70..2c74a735b 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
@@ -44,7 +44,7 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
* Avoid unnecessary hive initialization
*/
override def init(hiveConf: HiveConf): Unit = {
- this.hiveConf = hiveConf
+ // this.hiveConf = hiveConf
}
/**
@@ -80,7 +80,8 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
user,
password,
HiveSQLEngine.hiveConf,
- ipAddress)
+ ipAddress,
+ null)
hive.setSessionManager(internalSessionManager)
hive.setOperationManager(internalSessionManager.getOperationManager)
operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/event/HiveEventLoggingServiceSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/event/HiveEventLoggingServiceSuite.scala
index a1d97f07b..a9d483a0a 100644
--- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/event/HiveEventLoggingServiceSuite.scala
+++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/event/HiveEventLoggingServiceSuite.scala
@@ -20,7 +20,7 @@ import java.io.{BufferedReader, InputStreamReader}
import java.net.InetAddress
import java.nio.file.Paths
-import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.{JavaVersion, StringUtils, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
@@ -59,6 +59,8 @@ class HiveEventLoggingServiceSuite extends HiveJDBCTestHelper {
}
test("test engine event logging") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
+
val engineEventPath = Paths.get(
logRoot,
"hive_engine",
@@ -73,6 +75,8 @@ class HiveEventLoggingServiceSuite extends HiveJDBCTestHelper {
}
test("test session event logging") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
+
withJdbcStatement() { statement =>
val catalogs = statement.getConnection.getMetaData.getCatalogs
assert(!catalogs.next())
@@ -91,6 +95,8 @@ class HiveEventLoggingServiceSuite extends HiveJDBCTestHelper {
}
test("test operation event logging") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
+
withJdbcStatement("hive_engine_test") { statement =>
val createTableStatement = "CREATE TABLE hive_engine_test(id int, value string) stored as orc"
statement.execute(createTableStatement)
diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
index 7e2a63db0..63911b207 100644
--- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -17,15 +17,10 @@
package org.apache.kyuubi.engine.hive.operation
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.hive.ql.exec.FunctionInfo
-
+import org.apache.kyuubi.HiveEngineTests
import org.apache.kyuubi.engine.hive.HiveSQLEngine
-import org.apache.kyuubi.operation.HiveJDBCTestHelper
-import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
-class HiveOperationSuite extends HiveJDBCTestHelper {
+class HiveOperationSuite extends HiveEngineTests {
override def beforeAll(): Unit = {
HiveSQLEngine.startEngine()
@@ -35,292 +30,4 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
override protected def jdbcUrl: String = {
"jdbc:hive2://" + HiveSQLEngine.currentEngine.get.frontendServices.head.connectionUrl + "/;"
}
-
- test("get catalogs") {
- withJdbcStatement() { statement =>
- val catalogs = statement.getConnection.getMetaData.getCatalogs
- assert(!catalogs.next())
- }
- }
-
- test("get schemas") {
- withDatabases("test_schema") { statement =>
- statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
- val metaData = statement.getConnection.getMetaData
- var resultSet = metaData.getSchemas(null, null)
- val resultSetBuffer = ArrayBuffer[(String, String)]()
- while (resultSet.next()) {
- resultSetBuffer += Tuple2(
- resultSet.getString(TABLE_CATALOG),
- resultSet.getString(TABLE_SCHEM))
- }
- assert(resultSetBuffer.contains(("", "default")))
- assert(resultSetBuffer.contains(("", "test_schema")))
-
- resultSet = metaData.getSchemas("", "test")
- while (resultSet.next()) {
- assert(resultSet.getString(TABLE_CATALOG) == "")
- assert(resultSet.getString(TABLE_SCHEM) == "test_schema")
- }
- }
- }
-
- test("get tables") {
- withDatabases("test_schema") { statement =>
- statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
- statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a string)")
- statement.execute(
- "CREATE OR REPLACE VIEW test_schema.test_view AS SELECT * FROM test_schema.test_table")
-
- try {
- val meta = statement.getConnection.getMetaData
- var resultSet = meta.getTables(null, null, null, null)
- val resultSetBuffer = ArrayBuffer[(String, String, String, String)]()
- while (resultSet.next()) {
- resultSetBuffer += Tuple4(
- resultSet.getString(TABLE_CAT),
- resultSet.getString(TABLE_SCHEM),
- resultSet.getString(TABLE_NAME),
- resultSet.getString(TABLE_TYPE))
- }
- assert(resultSetBuffer.contains(("", "test_schema", "test_table", "TABLE")))
- assert(resultSetBuffer.contains(("", "test_schema", "test_view", "VIEW")))
-
- resultSet = meta.getTables("", null, null, null)
- resultSetBuffer.clear()
- while (resultSet.next()) {
- resultSetBuffer += Tuple4(
- resultSet.getString(TABLE_CAT),
- resultSet.getString(TABLE_SCHEM),
- resultSet.getString(TABLE_NAME),
- resultSet.getString(TABLE_TYPE))
- }
- assert(resultSetBuffer.contains(("", "test_schema", "test_table", "TABLE")))
- assert(resultSetBuffer.contains(("", "test_schema", "test_view", "VIEW")))
-
- resultSet = meta.getTables(null, "test_schema", null, null)
- resultSetBuffer.clear()
- while (resultSet.next()) {
- resultSetBuffer += Tuple4(
- resultSet.getString(TABLE_CAT),
- resultSet.getString(TABLE_SCHEM),
- resultSet.getString(TABLE_NAME),
- resultSet.getString(TABLE_TYPE))
- }
- assert(resultSetBuffer.contains(("", "test_schema", "test_table", "TABLE")))
- assert(resultSetBuffer.contains(("", "test_schema", "test_view", "VIEW")))
-
- resultSet = meta.getTables(null, null, "test_table", null)
- while (resultSet.next()) {
- assert(resultSet.getString(TABLE_CAT) == "")
- assert(resultSet.getString(TABLE_SCHEM) == "test_schema")
- assert(resultSet.getString(TABLE_NAME) == "test_table")
- assert(resultSet.getString(TABLE_TYPE) == "TABLE")
- }
-
- resultSet = meta.getTables(null, null, null, Array("VIEW"))
- while (resultSet.next()) {
- assert(resultSet.getString(TABLE_CAT) == "")
- assert(resultSet.getString(TABLE_SCHEM) == "test_schema")
- assert(resultSet.getString(TABLE_NAME) == "test_view")
- assert(resultSet.getString(TABLE_TYPE) == "VIEW")
- }
- } finally {
- statement.execute("DROP VIEW test_schema.test_view")
- statement.execute("DROP TABLE test_schema.test_table")
- }
- }
- }
-
- test("get columns") {
- withDatabases("test_schema") { statement =>
- statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
- statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a int, b string)")
-
- try {
- val meta = statement.getConnection.getMetaData
- var resultSet = meta.getColumns(null, null, null, null)
- var resultSetBuffer = ArrayBuffer[(String, String, String, String, String)]()
- while (resultSet.next()) {
- resultSetBuffer += Tuple5(
- resultSet.getString(TABLE_CAT),
- resultSet.getString(TABLE_SCHEM),
- resultSet.getString(TABLE_NAME),
- resultSet.getString(COLUMN_NAME),
- resultSet.getString(TYPE_NAME))
- }
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "a", "INT")))
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "b", "STRING")))
-
- resultSet = meta.getColumns("", null, null, null)
- resultSetBuffer.clear()
- while (resultSet.next()) {
- resultSetBuffer += Tuple5(
- resultSet.getString(TABLE_CAT),
- resultSet.getString(TABLE_SCHEM),
- resultSet.getString(TABLE_NAME),
- resultSet.getString(COLUMN_NAME),
- resultSet.getString(TYPE_NAME))
- }
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "a", "INT")))
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "b", "STRING")))
-
- resultSet = meta.getColumns(null, "test_schema", null, null)
- resultSetBuffer.clear()
- while (resultSet.next()) {
- resultSetBuffer += Tuple5(
- resultSet.getString(TABLE_CAT),
- resultSet.getString(TABLE_SCHEM),
- resultSet.getString(TABLE_NAME),
- resultSet.getString(COLUMN_NAME),
- resultSet.getString(TYPE_NAME))
- }
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "a", "INT")))
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "b", "STRING")))
-
- resultSet = meta.getColumns(null, null, "test_table", null)
- resultSetBuffer.clear()
- while (resultSet.next()) {
- resultSetBuffer += Tuple5(
- resultSet.getString(TABLE_CAT),
- resultSet.getString(TABLE_SCHEM),
- resultSet.getString(TABLE_NAME),
- resultSet.getString(COLUMN_NAME),
- resultSet.getString(TYPE_NAME))
- }
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "a", "INT")))
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "b", "STRING")))
-
- resultSet = meta.getColumns(null, null, null, "a")
- while (resultSet.next()) {
- assert(resultSet.getString(TABLE_CAT) == null)
- assert(resultSet.getString(TABLE_SCHEM) == "test_schema")
- assert(resultSet.getString(TABLE_NAME) == "test_table")
- assert(resultSet.getString(COLUMN_NAME) == "a")
- assert(resultSet.getString(TYPE_NAME) == "INT")
- }
- } finally {
- statement.execute("DROP VIEW test_schema.test_view")
- statement.execute("DROP TABLE test_schema.test_table")
- }
- }
- }
-
- test("get functions") {
- withJdbcStatement() { statement =>
- val metaData = statement.getConnection.getMetaData
- Seq("from_unixtime", "to_date", "date_format", "date_format", "round", "sin").foreach {
- func =>
- val resultSet = metaData.getFunctions(null, null, func)
- while (resultSet.next()) {
- assert(resultSet.getString(FUNCTION_CAT) === null)
- assert(resultSet.getString(FUNCTION_SCHEM) === null)
- assert(resultSet.getString(FUNCTION_NAME) === func)
- assert(resultSet.getString(REMARKS).isEmpty)
- assert(resultSet.getString(SPECIFIC_NAME) === classOf[FunctionInfo].getName)
- }
- }
- }
- }
-
- test("get table types") {
- withJdbcStatement() { statement =>
- val resultSet = statement.getConnection.getMetaData.getTableTypes
- val expected = Set("TABLE", "VIEW", "INDEX_TABLE", "MATERIALIZED_VIEW")
- var tableTypes = Set[String]()
- while (resultSet.next()) {
- assert(expected.contains(resultSet.getString(TABLE_TYPE)))
- tableTypes += resultSet.getString(TABLE_TYPE)
- }
- assert(!resultSet.next())
- assert(expected.size === tableTypes.size)
- }
- }
-
- test("get primary keys") {
- withDatabases("test_schema") { statement =>
- statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
- statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a string, " +
- "PRIMARY KEY(a) DISABLE NOVALIDATE)")
-
- try {
- val meta = statement.getConnection.getMetaData
- val resultSet = meta.getPrimaryKeys(null, "test_schema", "test_table")
- val resultSetBuffer = ArrayBuffer[(String, String, String, String)]()
- while (resultSet.next()) {
- resultSetBuffer += Tuple4(
- resultSet.getString(TABLE_CAT),
- resultSet.getString(TABLE_SCHEM),
- resultSet.getString(TABLE_NAME),
- resultSet.getString(COLUMN_NAME))
- }
- assert(resultSetBuffer.contains((null, "test_schema", "test_table", "a")))
- } finally {
- statement.execute("DROP TABLE test_schema.test_table")
- }
- }
- }
-
- test("get cross reference") {
- withDatabases("test_schema") { statement =>
- statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
- statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table1(a string, " +
- "PRIMARY KEY(a) DISABLE NOVALIDATE)")
- statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table2(a string, b string, " +
- "FOREIGN KEY(b) REFERENCES test_schema.test_table1(a) DISABLE NOVALIDATE)")
-
- try {
- val meta = statement.getConnection.getMetaData
- val resultSet = meta.getCrossReference(
- null,
- "test_schema",
- "test_table1",
- null,
- "test_schema",
- "test_table2")
- val resultSetBuffer =
- ArrayBuffer[(String, String, String, String, String, String, String, String)]()
- while (resultSet.next()) {
- resultSetBuffer += Tuple8(
- resultSet.getString("PKTABLE_CAT"),
- resultSet.getString("PKTABLE_SCHEM"),
- resultSet.getString("PKTABLE_NAME"),
- resultSet.getString("PKCOLUMN_NAME"),
- resultSet.getString("FKTABLE_CAT"),
- resultSet.getString("FKTABLE_SCHEM"),
- resultSet.getString("FKTABLE_NAME"),
- resultSet.getString("FKCOLUMN_NAME"))
- }
- assert(resultSetBuffer.contains((
- null,
- "test_schema",
- "test_table1",
- "a",
- null,
- "test_schema",
- "test_table2",
- "b")))
- } finally {
- statement.execute("DROP TABLE test_schema.test_table2")
- statement.execute("DROP TABLE test_schema.test_table1")
- }
- }
- }
-
- test("basic execute statements, create, insert query") {
- withJdbcStatement("hive_engine_test") { statement =>
- statement.execute("CREATE TABLE hive_engine_test(id int, value string) stored as orc")
- statement.execute("INSERT INTO hive_engine_test SELECT 1, '2'")
-
- val resultSet = statement.executeQuery("SELECT ID, VALUE FROM hive_engine_test")
- assert(resultSet.next())
- assert(resultSet.getInt("ID") === 1)
- assert(resultSet.getString("VALUE") === "2")
-
- val metaData = resultSet.getMetaData
- assert(metaData.getColumnType(1) === java.sql.Types.INTEGER)
- assert(metaData.getPrecision(1) === 10)
- assert(metaData.getScale(1) === 0)
- }
- }
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index 3015e3427..503487267 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -36,10 +36,12 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
final private val handleToOperation = new java.util.HashMap[OperationHandle, Operation]()
+ protected def skipOperationLog: Boolean = false
+
def getOperationCount: Int = handleToOperation.size()
override def initialize(conf: KyuubiConf): Unit = {
- LogDivertAppender.initialize()
+ LogDivertAppender.initialize(skipOperationLog)
super.initialize(conf)
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
index 82ce6feb9..68753cf98 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.{Filter, LogEvent, StringLayout}
import org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAppender, WriterManager}
-import org.apache.logging.log4j.core.config.Property
import org.apache.logging.log4j.core.filter.AbstractFilter
import org.apache.logging.log4j.core.layout.PatternLayout
@@ -41,7 +40,7 @@ class Log4j2DivertAppender(
filter,
ignoreExceptions,
immediateFlush,
- Property.EMPTY_ARRAY,
+ null,
new WriterManager(writer, name, layout, true)) {
def this() = this(
"KyuubiEngineLogDivertAppender",
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/LogDivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/LogDivertAppender.scala
index 6766962fe..7d2989303 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/LogDivertAppender.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/LogDivertAppender.scala
@@ -22,13 +22,17 @@ import org.slf4j.impl.StaticLoggerBinder
import org.apache.kyuubi.Logging
object LogDivertAppender extends Logging {
- def initialize(): Unit = {
- if (Logging.isLog4j2) {
- Log4j2DivertAppender.initialize()
- } else if (Logging.isLog4j12) {
- Log4j12DivertAppender.initialize()
- } else {
- warn(s"Unsupported SLF4J binding ${StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr}")
+ def initialize(skip: Boolean = false): Unit = {
+ if (!skip) {
+ if (Logging.isLog4j2) {
+ Log4j2DivertAppender.initialize()
+ } else if (Logging.isLog4j12) {
+ Log4j12DivertAppender.initialize()
+ } else {
+ warn(s"Unsupported SLF4J binding" +
+ s" ${StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr}")
+ }
}
+
}
}
diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/HiveEngineTests.scala
similarity index 90%
copy from externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
copy to kyuubi-common/src/test/scala/org/apache/kyuubi/HiveEngineTests.scala
index 7e2a63db0..d3c6e865a 100644
--- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/HiveEngineTests.scala
@@ -15,28 +15,25 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.hive.operation
+package org.apache.kyuubi
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.hive.ql.exec.FunctionInfo
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
-import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper
-import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.{COLUMN_NAME, FUNCTION_CAT, FUNCTION_NAME, FUNCTION_SCHEM, REMARKS, SPECIFIC_NAME, TABLE_CAT, TABLE_CATALOG, TABLE_NAME, TABLE_SCHEM, TABLE_TYPE, TYPE_NAME}
-class HiveOperationSuite extends HiveJDBCTestHelper {
-
- override def beforeAll(): Unit = {
- HiveSQLEngine.startEngine()
- super.beforeAll()
- }
-
- override protected def jdbcUrl: String = {
- "jdbc:hive2://" + HiveSQLEngine.currentEngine.get.frontendServices.head.connectionUrl + "/;"
- }
+/**
+ * hive tests disabled for JAVA 11
+ * https://issues.apache.org/jira/browse/HIVE-22415
+ * https://issues.apache.org/jira/browse/HIVE-21584
+ * hive 3.x not works with java 11
+ */
+trait HiveEngineTests extends HiveJDBCTestHelper {
test("get catalogs") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withJdbcStatement() { statement =>
val catalogs = statement.getConnection.getMetaData.getCatalogs
assert(!catalogs.next())
@@ -44,6 +41,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
test("get schemas") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withDatabases("test_schema") { statement =>
statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
val metaData = statement.getConnection.getMetaData
@@ -66,6 +64,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
test("get tables") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withDatabases("test_schema") { statement =>
statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a string)")
@@ -133,6 +132,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
test("get columns") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withDatabases("test_schema") { statement =>
statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a int, b string)")
@@ -207,6 +207,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
test("get functions") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
Seq("from_unixtime", "to_date", "date_format", "date_format", "round", "sin").foreach {
@@ -217,16 +218,18 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
assert(resultSet.getString(FUNCTION_SCHEM) === null)
assert(resultSet.getString(FUNCTION_NAME) === func)
assert(resultSet.getString(REMARKS).isEmpty)
- assert(resultSet.getString(SPECIFIC_NAME) === classOf[FunctionInfo].getName)
+ assert(resultSet.getString(SPECIFIC_NAME) ===
+ "org.apache.hadoop.hive.ql.exec.FunctionInfo")
}
}
}
}
test("get table types") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withJdbcStatement() { statement =>
val resultSet = statement.getConnection.getMetaData.getTableTypes
- val expected = Set("TABLE", "VIEW", "INDEX_TABLE", "MATERIALIZED_VIEW")
+ val expected = Set("TABLE", "VIEW", "MATERIALIZED_VIEW")
var tableTypes = Set[String]()
while (resultSet.next()) {
assert(expected.contains(resultSet.getString(TABLE_TYPE)))
@@ -238,6 +241,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
test("get primary keys") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withDatabases("test_schema") { statement =>
statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a string, " +
@@ -262,6 +266,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
test("get cross reference") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withDatabases("test_schema") { statement =>
statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table1(a string, " +
@@ -308,8 +313,9 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
test("basic execute statements, create, insert query") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
withJdbcStatement("hive_engine_test") { statement =>
- statement.execute("CREATE TABLE hive_engine_test(id int, value string) stored as orc")
+ statement.execute("CREATE TABLE hive_engine_test(id int, value string) stored as textfile")
statement.execute("INSERT INTO hive_engine_test SELECT 1, '2'")
val resultSet = statement.executeQuery("SELECT ID, VALUE FROM hive_engine_test")
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 127e578c4..15ea574e7 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -214,6 +214,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-hive-sql-engine_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-sql-engine_${scala.binary.version}</artifactId>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index c61f34ce0..2fd795178 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine
-import java.io.{File, IOException}
+import java.io.{File, FilenameFilter, IOException}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
@@ -228,6 +228,54 @@ trait ProcBuilder {
}.mkString(" ")
}
}
+
+ /**
+ * Get the home directly that contains binary distributions of engines.
+ *
+ * Take Spark as an example, we first lookup the SPARK_HOME from user specified environments.
+ * If not found, we assume that it is a dev environment and lookup the kyuubi-download's output
+ * directly. If not found again, a `KyuubiSQLException` will be raised.
+ * In summarize, we walk through
+ * `kyuubi.engineEnv.SPARK_HOME` ->
+ * System.env("SPARK_HOME") ->
+ * kyuubi-download/target/spark-* ->
+ * error.
+ *
+ * @param shortName the short name of engine, e.g. spark
+ * @return SPARK_HOME, HIVE_HOME, etc.
+ */
+ protected def getEngineHome(shortName: String): String = {
+ val homeKey = s"${shortName.toUpperCase}_HOME"
+ val homeVal = env.get(homeKey).orElse {
+ val cwd = Utils.getCodeSourceLocation(getClass).split("kyuubi-server")
+ assert(cwd.length > 1)
+ Option(
+ Paths.get(cwd.head)
+ .resolve("externals")
+ .resolve("kyuubi-download")
+ .resolve("target")
+ .toFile
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ dir.isDirectory && name.contains(s"$shortName-")
+ }
+ }))
+ .flatMap(_.headOption)
+ .map(_.getAbsolutePath)
+ }
+ if (homeVal.isEmpty) {
+ throw validateEnv(homeKey)
+ } else {
+ homeVal.get
+ }
+ }
+
+ protected def validateEnv(requiredEnv: String): Throwable = {
+ KyuubiSQLException(s"$requiredEnv is not set! For more information on installing and " +
+ s"configuring $requiredEnv, please visit https://kyuubi.apache.org/docs/latest/" +
+ s"deployment/settings.html#environments")
+ }
+
}
object ProcBuilder extends Logging {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
index 496930051..4ee0e5f43 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
@@ -17,14 +17,19 @@
package org.apache.kyuubi.engine.hive
+import java.io.File
import java.net.URI
import java.nio.file.{Files, Paths}
+import java.util.LinkedHashSet
-import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, SCALA_COMPILE_VERSION, Utils}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_MAIN_RESOURCE
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ProcBuilder
-import org.apache.kyuubi.engine.hive.HiveProcessBuilder.HIVE_ENGINE_BINARY_FILE
import org.apache.kyuubi.operation.log.OperationLog
class HiveProcessBuilder(
@@ -33,25 +38,14 @@ class HiveProcessBuilder(
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {
- override protected def executable: String = {
- val hiveHomeOpt = env.get("HIVE_ENGINE_HOME").orElse {
- val cwd = Utils.getCodeSourceLocation(getClass)
- .split("kyuubi-server")
- assert(cwd.length > 1)
- Option(
- Paths.get(cwd.head)
- .resolve("externals")
- .resolve(module)
- .toFile)
- .map(_.getAbsolutePath)
- }
+ private val hiveHome: String = getEngineHome("hive")
- hiveHomeOpt.map { dir =>
- Paths.get(dir, "bin", HIVE_ENGINE_BINARY_FILE).toAbsolutePath.toFile.getCanonicalPath
- }.getOrElse {
- throw KyuubiSQLException("HIVE_ENGINE_HOME is not set! " +
- "For more detail information on installing and configuring Hive, please visit " +
- "https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
+ override protected def executable: String = {
+ val javaHome = env.get("JAVA_HOME")
+ if (javaHome.isEmpty) {
+ throw validateEnv("JAVA_HOME")
+ } else {
+ Paths.get(javaHome.get, "bin", "java").toString
}
}
@@ -69,7 +63,7 @@ class HiveProcessBuilder(
}.orElse {
// 2. get the main resource jar from system build default
env.get(KyuubiConf.KYUUBI_HOME)
- .map { Paths.get(_, "externals", "engines", "hive", "jars", jarName) }
+ .map { Paths.get(_, "externals", "engines", "hive", jarName) }
.filter(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
}.orElse {
// 3. get the main resource from dev environment
@@ -80,21 +74,53 @@ class HiveProcessBuilder(
}
}
- override protected def childProcEnv: Map[String, String] = conf.getEnvs +
- ("HIVE_ENGINE_JAR" -> mainResource.get) +
- ("HIVE_ENGINE_DYNAMIC_ARGS" ->
- conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" "))
-
override protected def module: String = "kyuubi-hive-sql-engine"
override protected def mainClass: String = "org.apache.kyuubi.engine.hive.HiveSQLEngine"
- override protected def commands: Array[String] = Array(executable)
+ override protected def commands: Array[String] = {
+ val buffer = new ArrayBuffer[String]()
+ buffer += executable
-}
+ // TODO: How shall we deal with proxyUser,
+ // user.name
+ // kyuubi.session.user
+ // or just leave it, because we can handle it at operation layer
+ buffer += s"-D$KYUUBI_SESSION_USER_KEY=$proxyUser"
-object HiveProcessBuilder {
+ // TODO: add Kyuubi.engineEnv.HIVE_ENGINE_MEMORY or kyuubi.engine.hive.memory to configure
+ // -Xmx5g
+ // java options
+ for ((k, v) <- conf.getAll) {
+ buffer += s"-D$k=$v"
+ }
+
+ buffer += "-cp"
+ val classpathEntries = new LinkedHashSet[String]
+ // hive engine runtime jar
+ mainResource.foreach(classpathEntries.add)
+ // classpath contains hive configurations, default to hive.home/conf
+ classpathEntries.add(env.getOrElse("HIVE_CONF_DIR", s"$hiveHome${File.separator}conf"))
+ // classpath contains hadoop configurations
+ env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
+ env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
+ // jars from hive distribution
+ classpathEntries.add(s"$hiveHome${File.separator}lib${File.separator}*")
+ val hadoopCp = env.get("HIVE_HADOOP_CLASSPATH").orElse(env.get("HADOOP_CLASSPATH"))
+ hadoopCp.foreach(path => classpathEntries.add(s"$path${File.separator}*"))
+ if (hadoopCp.isEmpty) {
+ mainResource.foreach { path =>
+ val devHadoopJars = Paths.get(path).getParent
+ .resolve(s"scala-$SCALA_COMPILE_VERSION")
+ .resolve("jars")
+ classpathEntries.add(s"$devHadoopJars${File.separator}*")
+ }
- val HIVE_ENGINE_BINARY_FILE = "hive-sql-engine.sh"
+ }
+ buffer += classpathEntries.asScala.mkString(File.pathSeparator)
+ buffer += mainClass
+ buffer.toArray
+ }
+ override def toString: String = commands.mkString("\n")
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index fbb10d64e..656c47a7a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark
-import java.io.{File, FilenameFilter, IOException}
+import java.io.IOException
import java.net.URI
import java.nio.file.{Files, Paths}
@@ -50,32 +50,8 @@ class SparkProcessBuilder(
def getYarnClient: YarnClient = YarnClient.createYarnClient
override protected val executable: String = {
- val sparkHomeOpt = env.get("SPARK_HOME").orElse {
- val cwd = Utils.getCodeSourceLocation(getClass)
- .split("kyuubi-server")
- assert(cwd.length > 1)
- Option(
- Paths.get(cwd.head)
- .resolve("externals")
- .resolve("kyuubi-download")
- .resolve("target")
- .toFile
- .listFiles(new FilenameFilter {
- override def accept(dir: File, name: String): Boolean = {
- dir.isDirectory && name.startsWith("spark-")
- }
- }))
- .flatMap(_.headOption)
- .map(_.getAbsolutePath)
- }
-
- sparkHomeOpt.map { dir =>
- Paths.get(dir, "bin", SPARK_SUBMIT_FILE).toAbsolutePath.toFile.getCanonicalPath
- }.getOrElse {
- throw KyuubiSQLException("SPARK_HOME is not set! " +
- "For more detail information on installing and configuring Spark, please visit " +
- "https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
- }
+ val sparkHome = getEngineHome("spark")
+ Paths.get(sparkHome, "bin", SPARK_SUBMIT_FILE).toFile.getCanonicalPath
}
override def mainClass: String = "org.apache.kyuubi.engine.spark.SparkSQLEngine"
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
index 38171cefa..d5d9dbae1 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
@@ -53,12 +53,9 @@ trait WithKyuubiServer extends KyuubiFunSuite {
conf.set("spark.ui.enabled", "false")
conf.setIfMissing("spark.sql.catalogImplementation", "in-memory")
conf.setIfMissing(ENGINE_CHECK_INTERVAL, 1000L)
- conf.setIfMissing(ENGINE_IDLE_TIMEOUT, 3000L)
- // TODO KYUUBI #745
- conf.setIfMissing(ENGINE_INIT_TIMEOUT, 300000L)
+ conf.setIfMissing(ENGINE_IDLE_TIMEOUT, 5000L)
server = KyuubiServer.startServer(conf)
super.beforeAll()
- Thread.sleep(1500)
}
override def afterAll(): Unit = {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala
index 42aaeda2d..9411410e1 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala
@@ -26,7 +26,11 @@ class HiveProcessBuilderSuite extends KyuubiFunSuite {
test("hive process builder") {
val builder = new HiveProcessBuilder("kyuubi", conf)
- val commands = builder.toString.split(' ')
- assert(commands.exists(_.endsWith("hive-sql-engine.sh")))
+ val commands = builder.toString.split('\n')
+ assert(commands.head.endsWith("bin/java"), "wrong exec")
+ assert(commands.contains("-Dkyuubi.session.user=kyuubi"))
+ assert(commands.contains("-Dkyuubi.on=off"))
+ assert(commands.exists(ss => ss.contains("kyuubi-hive-sql-engine")), "wrong classpath")
}
+
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/hive/KyuubiOperationWithHiveEngineSuite.scala
similarity index 53%
copy from kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala
copy to kyuubi-server/src/test/scala/org/apache/kyuubi/operation/hive/KyuubiOperationWithHiveEngineSuite.scala
index 42aaeda2d..f4243f6fe 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/hive/KyuubiOperationWithHiveEngineSuite.scala
@@ -15,18 +15,22 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.hive
+package org.apache.kyuubi.operation.hive
-import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
-class HiveProcessBuilderSuite extends KyuubiFunSuite {
-
- private def conf = KyuubiConf().set("kyuubi.on", "off")
-
- test("hive process builder") {
- val builder = new HiveProcessBuilder("kyuubi", conf)
- val commands = builder.toString.split(' ')
- assert(commands.exists(_.endsWith("hive-sql-engine.sh")))
+class KyuubiOperationWithHiveEngineSuite extends WithKyuubiServer with HiveEngineTests {
+ override protected val conf: KyuubiConf = {
+ val metastore = Utils.createTempDir(namePrefix = getClass.getSimpleName)
+ metastore.toFile.delete()
+ KyuubiConf()
+ .set(ENGINE_TYPE, "HIVE_SQL")
+ // increase this to 30s as hive session state and metastore client is slow initializing
+ .setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L)
+ .set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
}
+
+ override protected def jdbcUrl: String = getJdbcUrl
}
diff --git a/pom.xml b/pom.xml
index 0e4f40ff9..3191bc83d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,10 +115,11 @@
<guava.version>30.1-jre</guava.version>
<guava.failureaccess.version>1.0.1</guava.failureaccess.version>
<hadoop.version>3.3.2</hadoop.version>
+ <hive.engine.hive.version>3.1.3</hive.engine.hive.version>
<hive.version>2.3.9</hive.version>
<hive.service.rpc.version>3.1.3</hive.service.rpc.version>
- <hive.archive.name>apache-hive-${hive.version}-bin.tar.gz</hive.archive.name>
- <hive.archive.mirror>${apache.archive.dist}/hive/hive-${hive.version}</hive.archive.mirror>
+ <hive.archive.name>apache-hive-${hive.engine.hive.version}-bin.tar.gz</hive.archive.name>
+ <hive.archive.mirror>${apache.archive.dist}/hive/hive-${hive.engine.hive.version}</hive.archive.mirror>
<hive.archive.download.skip>false</hive.archive.download.skip>
<hudi.version>0.10.0</hudi.version>
<iceberg.name>iceberg-spark-runtime-3.2_${scala.binary.version}</iceberg.name>