You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/19 04:03:31 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2289] Use unique tag to kill applications

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c9ea7faca [KYUUBI #2289] Use unique tag to kill applications
c9ea7faca is described below

commit c9ea7facabb3b0242b9da536188ae609761120ee
Author: Nick Song <ch...@163.com>
AuthorDate: Tue Apr 19 12:03:19 2022 +0800

    [KYUUBI #2289] Use unique tag to kill applications
    
    ### _Why are the changes needed?_
    
    Use unique tag to kill applications instaed of log
    #2289
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2314 from Nick-0723/kill_app.
    
    Closes #2289
    
    28d5e7c9 [宋财礼] use eventually
    8b295197 [宋财礼] Update SparkProcessBuilder.scala
    e849cae9 [宋财礼] Update SparkProcessBuilderOnYarnSuite.scala
    11780d91 [Nick Song] Unused import
    e9b19703 [Nick Song] no need changes
    a2bb13e7 [Nick Song] resolve conflicts
    1d937f74 [Nick Song] add  application killed successfully test case
    e6481412 [Nick Song] use yarn tags kill application
    
    Lead-authored-by: Nick Song <ch...@163.com>
    Co-authored-by: 宋财礼 <31...@users.noreply.github.com>
    Co-authored-by: 宋财礼 <ca...@nio.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  8 ++-
 .../org/apache/kyuubi/engine/ProcBuilder.scala     |  6 +-
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  |  7 ++-
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 50 +++++++++------
 .../engine/flink/FlinkProcessBuilderSuite.scala    | 10 +--
 .../spark/SparkProcessBuilderOnYarnSuite.scala     | 71 ++++++++++++++++++++++
 .../engine/spark/SparkProcessBuilderSuite.scala    | 27 --------
 7 files changed, 123 insertions(+), 56 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 894acfdbb..ff5ea0063 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -170,7 +170,8 @@ private[kyuubi] class EngineRef(
         // tag is a seq type with comma-separated
         conf.set(
           SparkProcessBuilder.TAG_KEY,
-          conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
+          conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
+            "KYUUBI," + engineRefId)
         new SparkProcessBuilder(appUser, conf, extraEngineLog)
       case FLINK_SQL =>
         conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
@@ -203,7 +204,10 @@ private[kyuubi] class EngineRef(
           }
         }
         if (started + timeout <= System.currentTimeMillis()) {
-          val killMessage = builder.killApplication()
+          val killMessage = engineType match {
+            case SPARK_SQL => builder.killApplication(Left(engineRefId))
+            case _ => builder.killApplication()
+          }
           process.destroyForcibly()
           MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
           throw KyuubiSQLException(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 39c58b04d..24b6f0365 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -237,7 +237,11 @@ trait ProcBuilder {
     process
   }
 
-  def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String = ""
+  /**
+   * Use Left to represent engineRefId and Right to represent line.
+   */
+  def killApplication(clue: Either[String, String] = Right(lastRowsOfLog.toArray.mkString("\n")))
+      : String = ""
 
   def close(): Unit = synchronized {
     if (logCaptureThread != null) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 8e53d9e6b..d63d073e9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -78,7 +78,12 @@ class FlinkProcessBuilder(
 
   override protected def commands: Array[String] = Array(executable)
 
-  override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String = {
+  override def killApplication(clue: Either[String, String]): String = clue match {
+    case Left(_) => ""
+    case Right(line) => killApplicationByLog(line)
+  }
+
+  def killApplicationByLog(line: String = lastRowsOfLog.toArray.mkString("\n")): String = {
     "Job ID: .*".r.findFirstIn(line) match {
       case Some(jobIdLine) =>
         val jobId = jobIdLine.split("Job ID: ")(1).trim
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 12170c75b..e249846a8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -20,8 +20,8 @@ package org.apache.kyuubi.engine.spark
 import java.io.{File, IOException}
 import java.nio.file.Paths
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
-import scala.util.matching.Regex
 
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_TYPE}
 import org.apache.kyuubi.engine.ProcBuilder
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.AuthTypes
@@ -44,12 +45,11 @@ class SparkProcessBuilder(
 
   import SparkProcessBuilder._
 
-  val yarnClient = getYarnClient
-
   def getYarnClient: YarnClient = YarnClient.createYarnClient
 
+  private val sparkHome = getEngineHome(shortName)
+
   override protected val executable: String = {
-    val sparkHome = getEngineHome("spark")
     Paths.get(sparkHome, "bin", SPARK_SUBMIT_FILE).toFile.getCanonicalPath
   }
 
@@ -101,8 +101,6 @@ class SparkProcessBuilder(
 
   override protected def module: String = "kyuubi-spark-sql-engine"
 
-  val YARN_APP_NAME_REGEX: Regex = "application_\\d+_\\d+".r
-
   private def useKeytab(): Boolean = {
     val principal = conf.getOption(PRINCIPAL)
     val keytab = conf.getOption(KEYTAB)
@@ -141,33 +139,46 @@ class SparkProcessBuilder(
     }
   }
 
-  override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String =
-    YARN_APP_NAME_REGEX.findFirstIn(line) match {
-      case Some(appId) =>
+  override def killApplication(clue: Either[String, String]): String = clue match {
+    case Left(engineRefId) => killApplicationByTag(engineRefId)
+    case Right(_) => ""
+  }
+
+  private def killApplicationByTag(engineRefId: String): String = {
+    conf.getOption(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY)) match {
+      case Some("yarn") =>
+        var applicationId: ApplicationId = null
+        val yarnClient = getYarnClient
         try {
           val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
           yarnClient.init(yarnConf)
           yarnClient.start()
-          val applicationId = ApplicationId.fromString(appId)
-          yarnClient.killApplication(applicationId)
-          s"Killed Application $appId successfully."
+          val apps = yarnClient.getApplications(null, null, Set(engineRefId).asJava)
+          if (apps.isEmpty) return s"There are no Application tagged with $engineRefId," +
+            s" please kill it manually."
+          applicationId = apps.asScala.head.getApplicationId
+          yarnClient.killApplication(
+            applicationId,
+            s"Kyuubi killed this caused by: Timeout(${conf.get(ENGINE_INIT_TIMEOUT)} ms) to" +
+              s" launched ${conf.get(ENGINE_TYPE)} engine with $this.")
+          s"Killed Application $applicationId tagged with $engineRefId successfully."
         } catch {
           case e: Throwable =>
-            s"Failed to kill Application $appId, please kill it manually." +
-              s" Caused by ${e.getMessage}."
+            s"Failed to kill Application $applicationId tagged with $engineRefId," +
+              s" please kill it manually. Caused by ${e.getMessage}."
         } finally {
-          if (yarnClient != null) {
-            yarnClient.stop()
-          }
+          yarnClient.stop()
         }
-      case None => ""
+      case _ => "Kill Application only works with YARN, please kill it manually." +
+          s" Application tagged with $engineRefId"
     }
+  }
 
   override protected def shortName: String = "spark"
 
   protected def getSparkDefaultsConf(): Map[String, String] = {
     val sparkDefaultsConfFile = env.get(SPARK_CONF_DIR)
-      .orElse(env.get(SPARK_HOME).map(_ + File.separator + "conf"))
+      .orElse(Option(s"$sparkHome${File.separator}conf"))
       .map(_ + File.separator + SPARK_CONF_FILE_NAME)
       .map(new File(_)).filter(_.exists())
     Utils.getPropertiesFromFile(sparkDefaultsConfFile)
@@ -187,7 +198,6 @@ object SparkProcessBuilder {
   final private[spark] val KEYTAB = "spark.kerberos.keytab"
   // Get the appropriate spark-submit file
   final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
-  final private val SPARK_HOME = "SPARK_HOME"
   final private val SPARK_CONF_DIR = "SPARK_CONF_DIR"
   final private val SPARK_CONF_FILE_NAME = "spark-defaults.conf"
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index ee1125421..28ce47ba8 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -34,14 +34,14 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
       override protected def env: Map[String, String] = Map("FLINK_HOME" -> "")
     }
     val exit1 = processBuilder.killApplication(
-      """
-        |[INFO] SQL update statement has been successfully submitted to the cluster:
-        |Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
-        |""".stripMargin)
+      Right("""
+              |[INFO] SQL update statement has been successfully submitted to the cluster:
+              |Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
+              |""".stripMargin))
     assert(exit1.contains("6b1af540c0c0bb3fcfcad50ac037c862")
       && !exit1.contains("FLINK_HOME is not set!"))
 
-    val exit2 = processBuilder.killApplication("unknow")
+    val exit2 = processBuilder.killApplication(Right("unknow"))
     assert(exit2.equals(""))
   }
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
new file mode 100644
index 000000000..245a10719
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import java.util.UUID
+
+import scala.concurrent.duration.DurationInt
+
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.scalatestplus.mockito.MockitoSugar.mock
+
+import org.apache.kyuubi.{Utils, WithKyuubiServerOnYarn}
+import org.apache.kyuubi.config.KyuubiConf
+
+class SparkProcessBuilderOnYarnSuite extends WithKyuubiServerOnYarn {
+
+  override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
+
+  override protected val connectionConf: Map[String, String] = Map(
+    "spark.master" -> "yarn",
+    "spark.executor.instances" -> "1")
+
+  test("test kill application") {
+    val engineRefId = UUID.randomUUID().toString
+
+    conf.set(
+      SparkProcessBuilder.TAG_KEY,
+      conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
+        "KYUUBI," + engineRefId)
+    val builder = new SparkProcessBuilder(Utils.currentUser, conf)
+    val proc = builder.start
+    eventually(timeout(3.minutes), interval(1.seconds)) {
+      val killMsg = builder.killApplication(Left(engineRefId))
+      assert(killMsg.contains(s"tagged with $engineRefId successfully."))
+    }
+    proc.destroyForcibly()
+
+    val pb1 = new FakeSparkProcessBuilder(conf.clone) {
+      override protected def env: Map[String, String] = Map()
+      override def getYarnClient: YarnClient = mock[YarnClient]
+    }
+    val exit1 = pb1.killApplication(Left(engineRefId))
+    assert(exit1.equals(s"There are no Application tagged with $engineRefId," +
+      s" please kill it manually."))
+
+    val pb2 = new FakeSparkProcessBuilder(conf.clone) {
+      override protected def env: Map[String, String] = Map()
+      override def getYarnClient: YarnClient = mock[YarnClient]
+    }
+    pb2.conf.set("spark.master", "local")
+    val exit2 = pb2.killApplication(Left(engineRefId))
+    assert(exit2.equals("Kill Application only works with YARN, please kill it manually." +
+      s" Application tagged with $engineRefId"))
+  }
+
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 46525436c..2614f9409 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -22,7 +22,6 @@ import java.nio.file.{Files, Path, Paths, StandardOpenOption}
 import java.time.Duration
 import java.util.concurrent.{Executors, TimeUnit}
 
-import org.apache.hadoop.yarn.client.api.YarnClient
 import org.scalatest.time.SpanSugar._
 import org.scalatestplus.mockito.MockitoSugar
 
@@ -239,32 +238,6 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
     assert(b2.mainResource.getOrElse("") != jarPath.toString)
   }
 
-  test("kill application") {
-    val pb1 = new FakeSparkProcessBuilder(conf) {
-      override protected def env: Map[String, String] = Map()
-      override def getYarnClient: YarnClient = mock[YarnClient]
-    }
-    val exit1 = pb1.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
-      "Application report for application_1593587619692_20149 (state: ACCEPTED)")
-    assert(exit1.contains("Killed Application application_1593587619692_20149 successfully."))
-
-    val pb2 = new FakeSparkProcessBuilder(conf) {
-      override protected def env: Map[String, String] = Map()
-      override def getYarnClient: YarnClient = null
-    }
-    val exit2 = pb2.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
-      "Application report for application_1593587619692_20149 (state: ACCEPTED)")
-    assert(exit2.contains("Failed to kill Application application_1593587619692_20149")
-      && exit2.contains("Caused by"))
-
-    val pb3 = new FakeSparkProcessBuilder(conf) {
-      override protected def env: Map[String, String] = Map()
-      override def getYarnClient: YarnClient = mock[YarnClient]
-    }
-    val exit3 = pb3.killApplication("unknow")
-    assert(exit3.equals(""))
-  }
-
   test("add spark prefix for conf") {
     val conf = KyuubiConf(false)
     conf.set("kyuubi.kent", "yao")