You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/29 10:23:43 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b04d9942 [KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable
3b04d9942 is described below

commit 3b04d9942a253a7d99559356b3240228efbf913d
Author: jiaoqingbo <11...@qq.com>
AuthorDate: Fri Apr 29 18:23:34 2022 +0800

    [KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable
    
    …xecutable
    
    ### _Why are the changes needed?_
    
    fix #2346
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2418 from jiaoqingbo/kyuubi2346.
    
    Closes #2346
    
    eda5d383 [jiaoqingbo] after code review
    720fede3 [jiaoqingbo] execute source config.sh and fix ut failed and delete debug log
    1b53a9ba [jiaoqingbo] Merge branch 'master' into kyuubi2346
    0c7eb5e7 [jiaoqingbo] add hadoop classpath to UT
    ff2bb14c [jiaoqingbo] Merge branch 'master' into kyuubi2346
    c9e8019d [jiaoqingbo] change ut
    308ae5fd [jiaoqingbo] code review
    f3eb068b [jiaoqingbo] fix ui failed
    4e6d168d [jiaoqingbo] spoltless apply
    9eae5576 [jiaoqingbo] fix ut failed
    bc00e690 [jiaoqingbo] delete flink-sql-engine.sh
    d6b87b9c [jiaoqingbo] delete childProcEnv
    3aa8738a [jiaoqingbo] [KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable
    
    Authored-by: jiaoqingbo <11...@qq.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 build/dist                                         |   5 +-
 .../bin/flink-sql-engine.sh                        |  69 --------------
 .../it/flink/operation/FlinkOperationSuite.scala   |  19 +++-
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  | 100 ++++++++++++++-------
 .../engine/flink/FlinkProcessBuilderSuite.scala    |  85 ++++++++++++++++--
 5 files changed, 166 insertions(+), 112 deletions(-)

diff --git a/build/dist b/build/dist
index b6103a090..d7083ee2c 100755
--- a/build/dist
+++ b/build/dist
@@ -214,7 +214,6 @@ mkdir -p "$DISTDIR/pid"
 mkdir -p "$DISTDIR/logs"
 mkdir -p "$DISTDIR/work"
 mkdir -p "$DISTDIR/externals/engines/flink"
-mkdir -p "$DISTDIR/externals/engines/flink/lib"
 mkdir -p "$DISTDIR/externals/engines/spark"
 mkdir -p "$DISTDIR/externals/engines/trino"
 mkdir -p "$DISTDIR/externals/engines/hive"
@@ -245,9 +244,7 @@ done
 cd -
 
 # Copy flink engines
-cp -r "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/bin/" "$DISTDIR/externals/engines/flink/bin/"
-chmod a+x "$DISTDIR/externals/engines/flink/bin/flink-sql-engine.sh"
-cp "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/target/kyuubi-flink-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/flink/lib"
+cp "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/target/kyuubi-flink-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/flink"
 
 # Copy spark engines
 cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/spark"
diff --git a/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh b/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh
deleted file mode 100755
index db634226a..000000000
--- a/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh
+++ /dev/null
@@ -1,69 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-################################################################################
-# Adopted from "flink" bash script
-################################################################################
-
-if [[ -z "$FLINK_HOME" || ! -d "$FLINK_HOME" ]]; then
-  (>&2 echo "Invalid FLINK_HOME: ${FLINK_HOME:-unset}")
-  exit 1
-fi
-
-# do NOT let config.sh detect FLINK_HOME
-_FLINK_HOME_DETERMINED=1 . "$FLINK_HOME/bin/config.sh"
-
-FLINK_IDENT_STRING=${FLINK_IDENT_STRING:-"$USER"}
-FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")
-CC_CLASSPATH=`constructFlinkClassPath`
-
-FLINK_SQL_ENGINE_HOME="$(cd `dirname $0`/..; pwd)"
-if [[ "$FLINK_SQL_ENGINE_HOME" == "$KYUUBI_HOME/externals/engines/flink" ]]; then
-  FLINK_SQL_ENGINE_LIB_DIR="$FLINK_SQL_ENGINE_HOME/lib"
-  FLINK_SQL_ENGINE_JAR=$(find "$FLINK_SQL_ENGINE_LIB_DIR" -regex ".*/kyuubi-flink-sql-engine_.*\.jar")
-  FLINK_HADOOP_CLASSPATH="$INTERNAL_HADOOP_CLASSPATHS"
-  log_file="$KYUUBI_LOG_DIR/kyuubi-flink-sql-engine-$FLINK_IDENT_STRING-$HOSTNAME.log"
-  log4j2_conf_file="file:$FLINK_CONF_DIR/log4j.properties"
-  logback_conf_file="file:$FLINK_CONF_DIR/logback.xml"
-else
-  echo -e "\nFLINK_SQL_ENGINE_HOME $FLINK_SQL_ENGINE_HOME doesn't match production directory, assuming in development environment..."
-  FLINK_SQL_ENGINE_LIB_DIR="$FLINK_SQL_ENGINE_HOME/target"
-  FLINK_SQL_ENGINE_JAR=$(find "$FLINK_SQL_ENGINE_LIB_DIR" -regex '.*/kyuubi-flink-sql-engine_.*\.jar$' | grep -v '\-javadoc.jar$' | grep -v '\-tests.jar$')
-  _FLINK_SQL_ENGINE_HADOOP_CLIENT_JARS=$(find $FLINK_SQL_ENGINE_LIB_DIR -regex '.*/hadoop-client-.*\.jar$' | tr '\n' ':')
-  FLINK_HADOOP_CLASSPATH="${_FLINK_SQL_ENGINE_HADOOP_CLIENT_JARS%:}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
-  log_file="unused.log"
-  log4j2_conf_file="file:$FLINK_CONF_DIR/log4j-session.properties" # which send all logs to console
-  logback_conf_file="unused.xml"
-fi
-
-FULL_CLASSPATH="$FLINK_SQL_ENGINE_JAR:$FLINK_SQL_CLIENT_JAR:$CC_CLASSPATH:$FLINK_HADOOP_CLASSPATH"
-
-log_setting=(
-  -Dlog.file="$log_file"
-  -Dlog4j2.configurationFile="$log4j2_conf_file"
-  -Dlogback.configurationFile="$logback_conf_file"
-)
-
-if [ -n "$FLINK_SQL_ENGINE_JAR" ]; then
-  exec $JAVA_RUN ${FLINK_SQL_ENGINE_DYNAMIC_ARGS} "${log_setting[@]}" -cp ${FULL_CLASSPATH} \
-    org.apache.kyuubi.engine.flink.FlinkSQLEngine "$@"
-else
-  (>&2 echo "[ERROR] Flink SQL Engine JAR file 'kyuubi-flink-sql-engine*.jar' should be located in $FLINK_SQL_ENGINE_LIB_DIR.")
-  exit 1
-fi
diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index 583c366a2..4d2c8dfd7 100644
--- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -17,16 +17,33 @@
 
 package org.apache.kyuubi.it.flink.operation
 
+import java.io.File
+import java.nio.file.Paths
+
+import org.apache.kyuubi.{HADOOP_COMPILE_VERSION, SCALA_COMPILE_VERSION, Utils}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX}
 import org.apache.kyuubi.it.flink.WithKyuubiServerAndFlinkMiniCluster
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
 
 class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster with HiveJDBCTestHelper {
+  val kyuubiHome: String = Utils.getCodeSourceLocation(getClass).split("integration-tests")(0)
+  val hadoopClasspath: String = Paths.get(
+    kyuubiHome,
+    "externals",
+    "kyuubi-flink-sql-engine",
+    "target",
+    s"scala-$SCALA_COMPILE_VERSION",
+    "jars").toAbsolutePath.toString
   override val conf: KyuubiConf = KyuubiConf()
     .set(ENGINE_TYPE, "FLINK_SQL")
     .set("flink.parallelism.default", "6")
+    .set(
+      s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH",
+      s"$hadoopClasspath${File.separator}" +
+        s"hadoop-client-api-$HADOOP_COMPILE_VERSION.jar${File.pathSeparator}" +
+        s"$hadoopClasspath${File.separator}hadoop-client-runtime-$HADOOP_COMPILE_VERSION.jar")
 
   override protected def jdbcUrl: String = getJdbcUrl
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 978f9bdd7..c2549433d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -19,13 +19,17 @@ package org.apache.kyuubi.engine.flink
 
 import java.io.{File, FilenameFilter}
 import java.nio.file.Paths
+import java.util.LinkedHashSet
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.annotations.VisibleForTesting
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
 import org.apache.kyuubi.engine.ProcBuilder
-import org.apache.kyuubi.engine.flink.FlinkProcessBuilder.FLINK_ENGINE_BINARY_FILE
 import org.apache.kyuubi.operation.log.OperationLog
 
 /**
@@ -37,28 +41,6 @@ class FlinkProcessBuilder(
     val extraEngineLog: Option[OperationLog] = None)
   extends ProcBuilder with Logging {
 
-  override protected def executable: String = {
-    val flinkEngineHomeOpt = env.get("FLINK_ENGINE_HOME").orElse {
-      val cwd = Utils.getCodeSourceLocation(getClass)
-        .split("kyuubi-server")
-      assert(cwd.length > 1)
-      Option(
-        Paths.get(cwd.head)
-          .resolve("externals")
-          .resolve("kyuubi-flink-sql-engine")
-          .toFile)
-        .map(_.getAbsolutePath)
-    }
-
-    flinkEngineHomeOpt.map { dir =>
-      Paths.get(dir, "bin", FLINK_ENGINE_BINARY_FILE).toAbsolutePath.toFile.getCanonicalPath
-    } getOrElse {
-      throw KyuubiSQLException("FLINK_ENGINE_HOME is not set! " +
-        "For more detail information on installing and configuring Flink, please visit " +
-        "https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
-    }
-  }
-
   override protected def module: String = "kyuubi-flink-sql-engine"
 
   override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
@@ -66,14 +48,68 @@ class FlinkProcessBuilder(
   override protected def childProcEnv: Map[String, String] = conf.getEnvs +
     ("FLINK_HOME" -> FLINK_HOME) +
     ("FLINK_CONF_DIR" -> s"$FLINK_HOME/conf") +
-    ("FLINK_SQL_ENGINE_JAR" -> mainResource.get) +
-    ("FLINK_SQL_ENGINE_DYNAMIC_ARGS" ->
-      conf.getAll.filter { case (k, _) =>
-        k.startsWith("kyuubi.") || k.startsWith("flink.") ||
-          k.startsWith("hadoop.") || k.startsWith("yarn.")
-      }.map { case (k, v) => s"-D$k=$v" }.mkString(" "))
-
-  override protected def commands: Array[String] = Array(executable)
+    ("_FLINK_HOME_DETERMINED" -> s"1")
+
+  override protected def commands: Array[String] = {
+    val buffer = new ArrayBuffer[String]()
+    buffer += s"bash"
+    buffer += s"-c"
+    val commandStr = new StringBuilder()
+
+    commandStr.append(s"source $FLINK_HOME${File.separator}bin" +
+      s"${File.separator}config.sh && $executable")
+
+    // TODO: How shall we deal with proxyUser,
+    // user.name
+    // kyuubi.session.user
+    // or just leave it, because we can handle it at operation layer
+    commandStr.append(s" -D$KYUUBI_SESSION_USER_KEY=$proxyUser ")
+
+    // TODO: add Kyuubi.engineEnv.FLINK_ENGINE_MEMORY or kyuubi.engine.flink.memory to configure
+    // -Xmx5g
+    // java options
+    val confStr = conf.getAll.filter { case (k, _) =>
+      k.startsWith("kyuubi.") || k.startsWith("flink.") ||
+        k.startsWith("hadoop.") || k.startsWith("yarn.")
+    }.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+    commandStr.append(confStr)
+
+    commandStr.append(" -cp ")
+    val classpathEntries = new LinkedHashSet[String]
+    // flink engine runtime jar
+    mainResource.foreach(classpathEntries.add)
+    // flink sql client jar
+    val flinkSqlClientPath = Paths.get(FLINK_HOME)
+      .resolve("opt")
+      .toFile
+      .listFiles(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = {
+          name.toLowerCase.startsWith("flink-sql-client")
+        }
+      }).head.getAbsolutePath
+    classpathEntries.add(flinkSqlClientPath)
+
+    // jars from flink lib
+    classpathEntries.add(s"$FLINK_HOME${File.separator}lib${File.separator}*")
+
+    // classpath contains flink configurations, default to flink.home/conf
+    classpathEntries.add(env.getOrElse("FLINK_CONF_DIR", s"$FLINK_HOME${File.separator}conf"))
+    // classpath contains hadoop configurations
+    env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
+    env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
+    env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
+    val hadoopClasspath = env.get("HADOOP_CLASSPATH")
+    if (hadoopClasspath.isEmpty) {
+      throw KyuubiSQLException("HADOOP_CLASSPATH is not set! " +
+        "For more detail information on configuring HADOOP_CLASSPATH" +
+        "https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
+    }
+    classpathEntries.add(hadoopClasspath.get)
+    commandStr.append(classpathEntries.asScala.mkString(File.pathSeparator))
+    commandStr.append(s" $mainClass")
+    buffer += commandStr.toString()
+    buffer.toArray
+  }
 
   @VisibleForTesting
   def FLINK_HOME: String = {
@@ -112,6 +148,4 @@ class FlinkProcessBuilder(
 object FlinkProcessBuilder {
   final val APP_KEY = "yarn.application.name"
   final val TAG_KEY = "yarn.tags"
-
-  final private val FLINK_ENGINE_BINARY_FILE = "flink-sql-engine.sh"
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 94c5cb450..d63ec55fb 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -17,15 +17,90 @@
 
 package org.apache.kyuubi.engine.flink
 
-import org.apache.kyuubi.KyuubiFunSuite
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.ListMap
+
+import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiFunSuite, KyuubiSQLException, SCALA_COMPILE_VERSION}
 import org.apache.kyuubi.config.KyuubiConf
 
 class FlinkProcessBuilderSuite extends KyuubiFunSuite {
   private def conf = KyuubiConf().set("kyuubi.on", "off")
+  private def envDefault: ListMap[String, String] = ListMap(
+    "JAVA_HOME" -> s"${File.separator}jdk1.8.0_181")
+  private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
+    ("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
+    ("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
+    ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
+  private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH +
+    ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+  private def confStr: String = {
+    conf.getAll.filter { case (k, _) =>
+      k.startsWith("kyuubi.") || k.startsWith("flink.") ||
+        k.startsWith("hadoop.") || k.startsWith("yarn.")
+    }.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+  }
+  private def compareActualAndExpected(builder: FlinkProcessBuilder) = {
+    val actualCommands = builder.toString
+    val classpathStr: String = constructClasspathStr(builder)
+    val expectedCommands = s"bash -c source ${builder.FLINK_HOME}" +
+      s"${File.separator}bin${File.separator}config.sh && $javaPath " +
+      s"-Dkyuubi.session.user=vinoyang $confStr" +
+      s" -cp $classpathStr $mainClassStr"
+    info(s"\n\n actualCommands $actualCommands")
+    info(s"\n\n expectedCommands $expectedCommands")
+    assert(actualCommands.equals(expectedCommands))
+  }
+
+  private def constructClasspathStr(builder: FlinkProcessBuilder) = {
+    val classpathEntries = new java.util.LinkedHashSet[String]
+    builder.mainResource.foreach(classpathEntries.add)
+    val flinkSqlClientJarPath = s"${builder.FLINK_HOME}$flinkSqlClientJarPathSuffix"
+    val flinkLibPath = s"${builder.FLINK_HOME}$flinkLibPathSuffix"
+    val flinkConfPath = s"${builder.FLINK_HOME}$flinkConfPathSuffix"
+    classpathEntries.add(flinkSqlClientJarPath)
+    classpathEntries.add(flinkLibPath)
+    classpathEntries.add(flinkConfPath)
+    val envMethod = classOf[FlinkProcessBuilder].getDeclaredMethod("env")
+    envMethod.setAccessible(true)
+    val envMap = envMethod.invoke(builder).asInstanceOf[Map[String, String]]
+    envMap.foreach { case (k, v) =>
+      if (!k.equals("JAVA_HOME")) {
+        classpathEntries.add(v)
+      }
+    }
+    val classpathStr = classpathEntries.asScala.mkString(File.pathSeparator)
+    classpathStr
+  }
+
+  private val javaPath = s"${envDefault("JAVA_HOME")}${File.separator}bin${File.separator}java"
+  private val flinkSqlClientJarPathSuffix = s"${File.separator}opt${File.separator}" +
+    s"flink-sql-client_$SCALA_COMPILE_VERSION-$FLINK_COMPILE_VERSION.jar"
+  private val flinkLibPathSuffix = s"${File.separator}lib${File.separator}*"
+  private val flinkConfPathSuffix = s"${File.separator}conf"
+  private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
+
+  test("all hadoop related environment variables are configured") {
+    val builder = new FlinkProcessBuilder("vinoyang", conf) {
+      override protected def env: Map[String, String] = envWithAllHadoop
+
+    }
+    compareActualAndExpected(builder)
+  }
+
+  test("all hadoop related environment variables are configured except HADOOP_CLASSPATH") {
+    val builder = new FlinkProcessBuilder("vinoyang", conf) {
+      override def env: Map[String, String] = envWithoutHadoopCLASSPATH
+    }
+    assertThrows[KyuubiSQLException](builder.toString)
+  }
 
-  test("flink engine process builder") {
-    val builder = new FlinkProcessBuilder("vinoyang", conf)
-    val commands = builder.toString.split(' ')
-    assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
+  test("only HADOOP_CLASSPATH environment variables are configured") {
+    val builder = new FlinkProcessBuilder("vinoyang", conf) {
+      override def env: Map[String, String] = envDefault +
+        ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+    }
+    compareActualAndExpected(builder)
   }
 }