You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/29 10:23:43 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3b04d9942 [KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable
3b04d9942 is described below
commit 3b04d9942a253a7d99559356b3240228efbf913d
Author: jiaoqingbo <11...@qq.com>
AuthorDate: Fri Apr 29 18:23:34 2022 +0800
[KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable
…xecutable
### _Why are the changes needed?_
fix #2346
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2418 from jiaoqingbo/kyuubi2346.
Closes #2346
eda5d383 [jiaoqingbo] after code review
720fede3 [jiaoqingbo] execute source config.sh and fix ut failed and delete debug log
1b53a9ba [jiaoqingbo] Merge branch 'master' into kyuubi2346
0c7eb5e7 [jiaoqingbo] add hadoop classpath to UT
ff2bb14c [jiaoqingbo] Merge branch 'master' into kyuubi2346
c9e8019d [jiaoqingbo] change ut
308ae5fd [jiaoqingbo] code review
f3eb068b [jiaoqingbo] fix ui failed
4e6d168d [jiaoqingbo] spoltless apply
9eae5576 [jiaoqingbo] fix ut failed
bc00e690 [jiaoqingbo] delete flink-sql-engine.sh
d6b87b9c [jiaoqingbo] delete childProcEnv
3aa8738a [jiaoqingbo] [KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable
Authored-by: jiaoqingbo <11...@qq.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
build/dist | 5 +-
.../bin/flink-sql-engine.sh | 69 --------------
.../it/flink/operation/FlinkOperationSuite.scala | 19 +++-
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 100 ++++++++++++++-------
.../engine/flink/FlinkProcessBuilderSuite.scala | 85 ++++++++++++++++--
5 files changed, 166 insertions(+), 112 deletions(-)
diff --git a/build/dist b/build/dist
index b6103a090..d7083ee2c 100755
--- a/build/dist
+++ b/build/dist
@@ -214,7 +214,6 @@ mkdir -p "$DISTDIR/pid"
mkdir -p "$DISTDIR/logs"
mkdir -p "$DISTDIR/work"
mkdir -p "$DISTDIR/externals/engines/flink"
-mkdir -p "$DISTDIR/externals/engines/flink/lib"
mkdir -p "$DISTDIR/externals/engines/spark"
mkdir -p "$DISTDIR/externals/engines/trino"
mkdir -p "$DISTDIR/externals/engines/hive"
@@ -245,9 +244,7 @@ done
cd -
# Copy flink engines
-cp -r "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/bin/" "$DISTDIR/externals/engines/flink/bin/"
-chmod a+x "$DISTDIR/externals/engines/flink/bin/flink-sql-engine.sh"
-cp "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/target/kyuubi-flink-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/flink/lib"
+cp "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/target/kyuubi-flink-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/flink"
# Copy spark engines
cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/spark"
diff --git a/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh b/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh
deleted file mode 100755
index db634226a..000000000
--- a/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh
+++ /dev/null
@@ -1,69 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-################################################################################
-# Adopted from "flink" bash script
-################################################################################
-
-if [[ -z "$FLINK_HOME" || ! -d "$FLINK_HOME" ]]; then
- (>&2 echo "Invalid FLINK_HOME: ${FLINK_HOME:-unset}")
- exit 1
-fi
-
-# do NOT let config.sh detect FLINK_HOME
-_FLINK_HOME_DETERMINED=1 . "$FLINK_HOME/bin/config.sh"
-
-FLINK_IDENT_STRING=${FLINK_IDENT_STRING:-"$USER"}
-FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")
-CC_CLASSPATH=`constructFlinkClassPath`
-
-FLINK_SQL_ENGINE_HOME="$(cd `dirname $0`/..; pwd)"
-if [[ "$FLINK_SQL_ENGINE_HOME" == "$KYUUBI_HOME/externals/engines/flink" ]]; then
- FLINK_SQL_ENGINE_LIB_DIR="$FLINK_SQL_ENGINE_HOME/lib"
- FLINK_SQL_ENGINE_JAR=$(find "$FLINK_SQL_ENGINE_LIB_DIR" -regex ".*/kyuubi-flink-sql-engine_.*\.jar")
- FLINK_HADOOP_CLASSPATH="$INTERNAL_HADOOP_CLASSPATHS"
- log_file="$KYUUBI_LOG_DIR/kyuubi-flink-sql-engine-$FLINK_IDENT_STRING-$HOSTNAME.log"
- log4j2_conf_file="file:$FLINK_CONF_DIR/log4j.properties"
- logback_conf_file="file:$FLINK_CONF_DIR/logback.xml"
-else
- echo -e "\nFLINK_SQL_ENGINE_HOME $FLINK_SQL_ENGINE_HOME doesn't match production directory, assuming in development environment..."
- FLINK_SQL_ENGINE_LIB_DIR="$FLINK_SQL_ENGINE_HOME/target"
- FLINK_SQL_ENGINE_JAR=$(find "$FLINK_SQL_ENGINE_LIB_DIR" -regex '.*/kyuubi-flink-sql-engine_.*\.jar$' | grep -v '\-javadoc.jar$' | grep -v '\-tests.jar$')
- _FLINK_SQL_ENGINE_HADOOP_CLIENT_JARS=$(find $FLINK_SQL_ENGINE_LIB_DIR -regex '.*/hadoop-client-.*\.jar$' | tr '\n' ':')
- FLINK_HADOOP_CLASSPATH="${_FLINK_SQL_ENGINE_HADOOP_CLIENT_JARS%:}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
- log_file="unused.log"
- log4j2_conf_file="file:$FLINK_CONF_DIR/log4j-session.properties" # which send all logs to console
- logback_conf_file="unused.xml"
-fi
-
-FULL_CLASSPATH="$FLINK_SQL_ENGINE_JAR:$FLINK_SQL_CLIENT_JAR:$CC_CLASSPATH:$FLINK_HADOOP_CLASSPATH"
-
-log_setting=(
- -Dlog.file="$log_file"
- -Dlog4j2.configurationFile="$log4j2_conf_file"
- -Dlogback.configurationFile="$logback_conf_file"
-)
-
-if [ -n "$FLINK_SQL_ENGINE_JAR" ]; then
- exec $JAVA_RUN ${FLINK_SQL_ENGINE_DYNAMIC_ARGS} "${log_setting[@]}" -cp ${FULL_CLASSPATH} \
- org.apache.kyuubi.engine.flink.FlinkSQLEngine "$@"
-else
- (>&2 echo "[ERROR] Flink SQL Engine JAR file 'kyuubi-flink-sql-engine*.jar' should be located in $FLINK_SQL_ENGINE_LIB_DIR.")
- exit 1
-fi
diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index 583c366a2..4d2c8dfd7 100644
--- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -17,16 +17,33 @@
package org.apache.kyuubi.it.flink.operation
+import java.io.File
+import java.nio.file.Paths
+
+import org.apache.kyuubi.{HADOOP_COMPILE_VERSION, SCALA_COMPILE_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX}
import org.apache.kyuubi.it.flink.WithKyuubiServerAndFlinkMiniCluster
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster with HiveJDBCTestHelper {
+ val kyuubiHome: String = Utils.getCodeSourceLocation(getClass).split("integration-tests")(0)
+ val hadoopClasspath: String = Paths.get(
+ kyuubiHome,
+ "externals",
+ "kyuubi-flink-sql-engine",
+ "target",
+ s"scala-$SCALA_COMPILE_VERSION",
+ "jars").toAbsolutePath.toString
override val conf: KyuubiConf = KyuubiConf()
.set(ENGINE_TYPE, "FLINK_SQL")
.set("flink.parallelism.default", "6")
+ .set(
+ s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH",
+ s"$hadoopClasspath${File.separator}" +
+ s"hadoop-client-api-$HADOOP_COMPILE_VERSION.jar${File.pathSeparator}" +
+ s"$hadoopClasspath${File.separator}hadoop-client-runtime-$HADOOP_COMPILE_VERSION.jar")
override protected def jdbcUrl: String = getJdbcUrl
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 978f9bdd7..c2549433d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -19,13 +19,17 @@ package org.apache.kyuubi.engine.flink
import java.io.{File, FilenameFilter}
import java.nio.file.Paths
+import java.util.LinkedHashSet
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ProcBuilder
-import org.apache.kyuubi.engine.flink.FlinkProcessBuilder.FLINK_ENGINE_BINARY_FILE
import org.apache.kyuubi.operation.log.OperationLog
/**
@@ -37,28 +41,6 @@ class FlinkProcessBuilder(
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {
- override protected def executable: String = {
- val flinkEngineHomeOpt = env.get("FLINK_ENGINE_HOME").orElse {
- val cwd = Utils.getCodeSourceLocation(getClass)
- .split("kyuubi-server")
- assert(cwd.length > 1)
- Option(
- Paths.get(cwd.head)
- .resolve("externals")
- .resolve("kyuubi-flink-sql-engine")
- .toFile)
- .map(_.getAbsolutePath)
- }
-
- flinkEngineHomeOpt.map { dir =>
- Paths.get(dir, "bin", FLINK_ENGINE_BINARY_FILE).toAbsolutePath.toFile.getCanonicalPath
- } getOrElse {
- throw KyuubiSQLException("FLINK_ENGINE_HOME is not set! " +
- "For more detail information on installing and configuring Flink, please visit " +
- "https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
- }
- }
-
override protected def module: String = "kyuubi-flink-sql-engine"
override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
@@ -66,14 +48,68 @@ class FlinkProcessBuilder(
override protected def childProcEnv: Map[String, String] = conf.getEnvs +
("FLINK_HOME" -> FLINK_HOME) +
("FLINK_CONF_DIR" -> s"$FLINK_HOME/conf") +
- ("FLINK_SQL_ENGINE_JAR" -> mainResource.get) +
- ("FLINK_SQL_ENGINE_DYNAMIC_ARGS" ->
- conf.getAll.filter { case (k, _) =>
- k.startsWith("kyuubi.") || k.startsWith("flink.") ||
- k.startsWith("hadoop.") || k.startsWith("yarn.")
- }.map { case (k, v) => s"-D$k=$v" }.mkString(" "))
-
- override protected def commands: Array[String] = Array(executable)
+ ("_FLINK_HOME_DETERMINED" -> s"1")
+
+ override protected def commands: Array[String] = {
+ val buffer = new ArrayBuffer[String]()
+ buffer += s"bash"
+ buffer += s"-c"
+ val commandStr = new StringBuilder()
+
+ commandStr.append(s"source $FLINK_HOME${File.separator}bin" +
+ s"${File.separator}config.sh && $executable")
+
+ // TODO: How shall we deal with proxyUser,
+ // user.name
+ // kyuubi.session.user
+ // or just leave it, because we can handle it at operation layer
+ commandStr.append(s" -D$KYUUBI_SESSION_USER_KEY=$proxyUser ")
+
+ // TODO: add Kyuubi.engineEnv.FLINK_ENGINE_MEMORY or kyuubi.engine.flink.memory to configure
+ // -Xmx5g
+ // java options
+ val confStr = conf.getAll.filter { case (k, _) =>
+ k.startsWith("kyuubi.") || k.startsWith("flink.") ||
+ k.startsWith("hadoop.") || k.startsWith("yarn.")
+ }.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+ commandStr.append(confStr)
+
+ commandStr.append(" -cp ")
+ val classpathEntries = new LinkedHashSet[String]
+ // flink engine runtime jar
+ mainResource.foreach(classpathEntries.add)
+ // flink sql client jar
+ val flinkSqlClientPath = Paths.get(FLINK_HOME)
+ .resolve("opt")
+ .toFile
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.toLowerCase.startsWith("flink-sql-client")
+ }
+ }).head.getAbsolutePath
+ classpathEntries.add(flinkSqlClientPath)
+
+ // jars from flink lib
+ classpathEntries.add(s"$FLINK_HOME${File.separator}lib${File.separator}*")
+
+ // classpath contains flink configurations, default to flink.home/conf
+ classpathEntries.add(env.getOrElse("FLINK_CONF_DIR", s"$FLINK_HOME${File.separator}conf"))
+ // classpath contains hadoop configurations
+ env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
+ env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
+ env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
+ val hadoopClasspath = env.get("HADOOP_CLASSPATH")
+ if (hadoopClasspath.isEmpty) {
+ throw KyuubiSQLException("HADOOP_CLASSPATH is not set! " +
+ "For more detail information on configuring HADOOP_CLASSPATH" +
+ "https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
+ }
+ classpathEntries.add(hadoopClasspath.get)
+ commandStr.append(classpathEntries.asScala.mkString(File.pathSeparator))
+ commandStr.append(s" $mainClass")
+ buffer += commandStr.toString()
+ buffer.toArray
+ }
@VisibleForTesting
def FLINK_HOME: String = {
@@ -112,6 +148,4 @@ class FlinkProcessBuilder(
object FlinkProcessBuilder {
final val APP_KEY = "yarn.application.name"
final val TAG_KEY = "yarn.tags"
-
- final private val FLINK_ENGINE_BINARY_FILE = "flink-sql-engine.sh"
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 94c5cb450..d63ec55fb 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -17,15 +17,90 @@
package org.apache.kyuubi.engine.flink
-import org.apache.kyuubi.KyuubiFunSuite
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.ListMap
+
+import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiFunSuite, KyuubiSQLException, SCALA_COMPILE_VERSION}
import org.apache.kyuubi.config.KyuubiConf
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def conf = KyuubiConf().set("kyuubi.on", "off")
+ private def envDefault: ListMap[String, String] = ListMap(
+ "JAVA_HOME" -> s"${File.separator}jdk1.8.0_181")
+ private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
+ ("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
+ ("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
+ ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
+ private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH +
+ ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+ private def confStr: String = {
+ conf.getAll.filter { case (k, _) =>
+ k.startsWith("kyuubi.") || k.startsWith("flink.") ||
+ k.startsWith("hadoop.") || k.startsWith("yarn.")
+ }.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+ }
+ private def compareActualAndExpected(builder: FlinkProcessBuilder) = {
+ val actualCommands = builder.toString
+ val classpathStr: String = constructClasspathStr(builder)
+ val expectedCommands = s"bash -c source ${builder.FLINK_HOME}" +
+ s"${File.separator}bin${File.separator}config.sh && $javaPath " +
+ s"-Dkyuubi.session.user=vinoyang $confStr" +
+ s" -cp $classpathStr $mainClassStr"
+ info(s"\n\n actualCommands $actualCommands")
+ info(s"\n\n expectedCommands $expectedCommands")
+ assert(actualCommands.equals(expectedCommands))
+ }
+
+ private def constructClasspathStr(builder: FlinkProcessBuilder) = {
+ val classpathEntries = new java.util.LinkedHashSet[String]
+ builder.mainResource.foreach(classpathEntries.add)
+ val flinkSqlClientJarPath = s"${builder.FLINK_HOME}$flinkSqlClientJarPathSuffix"
+ val flinkLibPath = s"${builder.FLINK_HOME}$flinkLibPathSuffix"
+ val flinkConfPath = s"${builder.FLINK_HOME}$flinkConfPathSuffix"
+ classpathEntries.add(flinkSqlClientJarPath)
+ classpathEntries.add(flinkLibPath)
+ classpathEntries.add(flinkConfPath)
+ val envMethod = classOf[FlinkProcessBuilder].getDeclaredMethod("env")
+ envMethod.setAccessible(true)
+ val envMap = envMethod.invoke(builder).asInstanceOf[Map[String, String]]
+ envMap.foreach { case (k, v) =>
+ if (!k.equals("JAVA_HOME")) {
+ classpathEntries.add(v)
+ }
+ }
+ val classpathStr = classpathEntries.asScala.mkString(File.pathSeparator)
+ classpathStr
+ }
+
+ private val javaPath = s"${envDefault("JAVA_HOME")}${File.separator}bin${File.separator}java"
+ private val flinkSqlClientJarPathSuffix = s"${File.separator}opt${File.separator}" +
+ s"flink-sql-client_$SCALA_COMPILE_VERSION-$FLINK_COMPILE_VERSION.jar"
+ private val flinkLibPathSuffix = s"${File.separator}lib${File.separator}*"
+ private val flinkConfPathSuffix = s"${File.separator}conf"
+ private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
+
+ test("all hadoop related environment variables are configured") {
+ val builder = new FlinkProcessBuilder("vinoyang", conf) {
+ override protected def env: Map[String, String] = envWithAllHadoop
+
+ }
+ compareActualAndExpected(builder)
+ }
+
+ test("all hadoop related environment variables are configured except HADOOP_CLASSPATH") {
+ val builder = new FlinkProcessBuilder("vinoyang", conf) {
+ override def env: Map[String, String] = envWithoutHadoopCLASSPATH
+ }
+ assertThrows[KyuubiSQLException](builder.toString)
+ }
- test("flink engine process builder") {
- val builder = new FlinkProcessBuilder("vinoyang", conf)
- val commands = builder.toString.split(' ')
- assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
+ test("only HADOOP_CLASSPATH environment variables are configured") {
+ val builder = new FlinkProcessBuilder("vinoyang", conf) {
+ override def env: Map[String, String] = envDefault +
+ ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+ }
+ compareActualAndExpected(builder)
}
}