You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/19 04:03:31 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2289] Use unique tag to kill applications
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c9ea7faca [KYUUBI #2289] Use unique tag to kill applications
c9ea7faca is described below
commit c9ea7facabb3b0242b9da536188ae609761120ee
Author: Nick Song <ch...@163.com>
AuthorDate: Tue Apr 19 12:03:19 2022 +0800
[KYUUBI #2289] Use unique tag to kill applications
### _Why are the changes needed?_
Use unique tag to kill applications instaed of log
#2289
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2314 from Nick-0723/kill_app.
Closes #2289
28d5e7c9 [宋财礼] use eventually
8b295197 [宋财礼] Update SparkProcessBuilder.scala
e849cae9 [宋财礼] Update SparkProcessBuilderOnYarnSuite.scala
11780d91 [Nick Song] Unused import
e9b19703 [Nick Song] no need changes
a2bb13e7 [Nick Song] resolve conflicts
1d937f74 [Nick Song] add application killed successfully test case
e6481412 [Nick Song] use yarn tags kill application
Lead-authored-by: Nick Song <ch...@163.com>
Co-authored-by: 宋财礼 <31...@users.noreply.github.com>
Co-authored-by: 宋财礼 <ca...@nio.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 8 ++-
.../org/apache/kyuubi/engine/ProcBuilder.scala | 6 +-
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 7 ++-
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 50 +++++++++------
.../engine/flink/FlinkProcessBuilderSuite.scala | 10 +--
.../spark/SparkProcessBuilderOnYarnSuite.scala | 71 ++++++++++++++++++++++
.../engine/spark/SparkProcessBuilderSuite.scala | 27 --------
7 files changed, 123 insertions(+), 56 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 894acfdbb..ff5ea0063 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -170,7 +170,8 @@ private[kyuubi] class EngineRef(
// tag is a seq type with comma-separated
conf.set(
SparkProcessBuilder.TAG_KEY,
- conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
+ conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
+ "KYUUBI," + engineRefId)
new SparkProcessBuilder(appUser, conf, extraEngineLog)
case FLINK_SQL =>
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
@@ -203,7 +204,10 @@ private[kyuubi] class EngineRef(
}
}
if (started + timeout <= System.currentTimeMillis()) {
- val killMessage = builder.killApplication()
+ val killMessage = engineType match {
+ case SPARK_SQL => builder.killApplication(Left(engineRefId))
+ case _ => builder.killApplication()
+ }
process.destroyForcibly()
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 39c58b04d..24b6f0365 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -237,7 +237,11 @@ trait ProcBuilder {
process
}
- def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String = ""
+ /**
+ * Use Left to represent engineRefId and Right to represent line.
+ */
+ def killApplication(clue: Either[String, String] = Right(lastRowsOfLog.toArray.mkString("\n")))
+ : String = ""
def close(): Unit = synchronized {
if (logCaptureThread != null) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 8e53d9e6b..d63d073e9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -78,7 +78,12 @@ class FlinkProcessBuilder(
override protected def commands: Array[String] = Array(executable)
- override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String = {
+ override def killApplication(clue: Either[String, String]): String = clue match {
+ case Left(_) => ""
+ case Right(line) => killApplicationByLog(line)
+ }
+
+ def killApplicationByLog(line: String = lastRowsOfLog.toArray.mkString("\n")): String = {
"Job ID: .*".r.findFirstIn(line) match {
case Some(jobIdLine) =>
val jobId = jobIdLine.split("Job ID: ")(1).trim
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 12170c75b..e249846a8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -20,8 +20,8 @@ package org.apache.kyuubi.engine.spark
import java.io.{File, IOException}
import java.nio.file.Paths
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import scala.util.matching.Regex
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_TYPE}
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
@@ -44,12 +45,11 @@ class SparkProcessBuilder(
import SparkProcessBuilder._
- val yarnClient = getYarnClient
-
def getYarnClient: YarnClient = YarnClient.createYarnClient
+ private val sparkHome = getEngineHome(shortName)
+
override protected val executable: String = {
- val sparkHome = getEngineHome("spark")
Paths.get(sparkHome, "bin", SPARK_SUBMIT_FILE).toFile.getCanonicalPath
}
@@ -101,8 +101,6 @@ class SparkProcessBuilder(
override protected def module: String = "kyuubi-spark-sql-engine"
- val YARN_APP_NAME_REGEX: Regex = "application_\\d+_\\d+".r
-
private def useKeytab(): Boolean = {
val principal = conf.getOption(PRINCIPAL)
val keytab = conf.getOption(KEYTAB)
@@ -141,33 +139,46 @@ class SparkProcessBuilder(
}
}
- override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String =
- YARN_APP_NAME_REGEX.findFirstIn(line) match {
- case Some(appId) =>
+ override def killApplication(clue: Either[String, String]): String = clue match {
+ case Left(engineRefId) => killApplicationByTag(engineRefId)
+ case Right(_) => ""
+ }
+
+ private def killApplicationByTag(engineRefId: String): String = {
+ conf.getOption(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY)) match {
+ case Some("yarn") =>
+ var applicationId: ApplicationId = null
+ val yarnClient = getYarnClient
try {
val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
yarnClient.init(yarnConf)
yarnClient.start()
- val applicationId = ApplicationId.fromString(appId)
- yarnClient.killApplication(applicationId)
- s"Killed Application $appId successfully."
+ val apps = yarnClient.getApplications(null, null, Set(engineRefId).asJava)
+ if (apps.isEmpty) return s"There are no Application tagged with $engineRefId," +
+ s" please kill it manually."
+ applicationId = apps.asScala.head.getApplicationId
+ yarnClient.killApplication(
+ applicationId,
+ s"Kyuubi killed this caused by: Timeout(${conf.get(ENGINE_INIT_TIMEOUT)} ms) to" +
+ s" launched ${conf.get(ENGINE_TYPE)} engine with $this.")
+ s"Killed Application $applicationId tagged with $engineRefId successfully."
} catch {
case e: Throwable =>
- s"Failed to kill Application $appId, please kill it manually." +
- s" Caused by ${e.getMessage}."
+ s"Failed to kill Application $applicationId tagged with $engineRefId," +
+ s" please kill it manually. Caused by ${e.getMessage}."
} finally {
- if (yarnClient != null) {
- yarnClient.stop()
- }
+ yarnClient.stop()
}
- case None => ""
+ case _ => "Kill Application only works with YARN, please kill it manually." +
+ s" Application tagged with $engineRefId"
}
+ }
override protected def shortName: String = "spark"
protected def getSparkDefaultsConf(): Map[String, String] = {
val sparkDefaultsConfFile = env.get(SPARK_CONF_DIR)
- .orElse(env.get(SPARK_HOME).map(_ + File.separator + "conf"))
+ .orElse(Option(s"$sparkHome${File.separator}conf"))
.map(_ + File.separator + SPARK_CONF_FILE_NAME)
.map(new File(_)).filter(_.exists())
Utils.getPropertiesFromFile(sparkDefaultsConfFile)
@@ -187,7 +198,6 @@ object SparkProcessBuilder {
final private[spark] val KEYTAB = "spark.kerberos.keytab"
// Get the appropriate spark-submit file
final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
- final private val SPARK_HOME = "SPARK_HOME"
final private val SPARK_CONF_DIR = "SPARK_CONF_DIR"
final private val SPARK_CONF_FILE_NAME = "spark-defaults.conf"
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index ee1125421..28ce47ba8 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -34,14 +34,14 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
override protected def env: Map[String, String] = Map("FLINK_HOME" -> "")
}
val exit1 = processBuilder.killApplication(
- """
- |[INFO] SQL update statement has been successfully submitted to the cluster:
- |Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
- |""".stripMargin)
+ Right("""
+ |[INFO] SQL update statement has been successfully submitted to the cluster:
+ |Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
+ |""".stripMargin))
assert(exit1.contains("6b1af540c0c0bb3fcfcad50ac037c862")
&& !exit1.contains("FLINK_HOME is not set!"))
- val exit2 = processBuilder.killApplication("unknow")
+ val exit2 = processBuilder.killApplication(Right("unknow"))
assert(exit2.equals(""))
}
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
new file mode 100644
index 000000000..245a10719
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import java.util.UUID
+
+import scala.concurrent.duration.DurationInt
+
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.scalatestplus.mockito.MockitoSugar.mock
+
+import org.apache.kyuubi.{Utils, WithKyuubiServerOnYarn}
+import org.apache.kyuubi.config.KyuubiConf
+
+class SparkProcessBuilderOnYarnSuite extends WithKyuubiServerOnYarn {
+
+ override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
+
+ override protected val connectionConf: Map[String, String] = Map(
+ "spark.master" -> "yarn",
+ "spark.executor.instances" -> "1")
+
+ test("test kill application") {
+ val engineRefId = UUID.randomUUID().toString
+
+ conf.set(
+ SparkProcessBuilder.TAG_KEY,
+ conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
+ "KYUUBI," + engineRefId)
+ val builder = new SparkProcessBuilder(Utils.currentUser, conf)
+ val proc = builder.start
+ eventually(timeout(3.minutes), interval(1.seconds)) {
+ val killMsg = builder.killApplication(Left(engineRefId))
+ assert(killMsg.contains(s"tagged with $engineRefId successfully."))
+ }
+ proc.destroyForcibly()
+
+ val pb1 = new FakeSparkProcessBuilder(conf.clone) {
+ override protected def env: Map[String, String] = Map()
+ override def getYarnClient: YarnClient = mock[YarnClient]
+ }
+ val exit1 = pb1.killApplication(Left(engineRefId))
+ assert(exit1.equals(s"There are no Application tagged with $engineRefId," +
+ s" please kill it manually."))
+
+ val pb2 = new FakeSparkProcessBuilder(conf.clone) {
+ override protected def env: Map[String, String] = Map()
+ override def getYarnClient: YarnClient = mock[YarnClient]
+ }
+ pb2.conf.set("spark.master", "local")
+ val exit2 = pb2.killApplication(Left(engineRefId))
+ assert(exit2.equals("Kill Application only works with YARN, please kill it manually." +
+ s" Application tagged with $engineRefId"))
+ }
+
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 46525436c..2614f9409 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -22,7 +22,6 @@ import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import java.time.Duration
import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.hadoop.yarn.client.api.YarnClient
import org.scalatest.time.SpanSugar._
import org.scalatestplus.mockito.MockitoSugar
@@ -239,32 +238,6 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
assert(b2.mainResource.getOrElse("") != jarPath.toString)
}
- test("kill application") {
- val pb1 = new FakeSparkProcessBuilder(conf) {
- override protected def env: Map[String, String] = Map()
- override def getYarnClient: YarnClient = mock[YarnClient]
- }
- val exit1 = pb1.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
- "Application report for application_1593587619692_20149 (state: ACCEPTED)")
- assert(exit1.contains("Killed Application application_1593587619692_20149 successfully."))
-
- val pb2 = new FakeSparkProcessBuilder(conf) {
- override protected def env: Map[String, String] = Map()
- override def getYarnClient: YarnClient = null
- }
- val exit2 = pb2.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
- "Application report for application_1593587619692_20149 (state: ACCEPTED)")
- assert(exit2.contains("Failed to kill Application application_1593587619692_20149")
- && exit2.contains("Caused by"))
-
- val pb3 = new FakeSparkProcessBuilder(conf) {
- override protected def env: Map[String, String] = Map()
- override def getYarnClient: YarnClient = mock[YarnClient]
- }
- val exit3 = pb3.killApplication("unknow")
- assert(exit3.equals(""))
- }
-
test("add spark prefix for conf") {
val conf = KyuubiConf(false)
conf.set("kyuubi.kent", "yao")