You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/02 03:07:24 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4449 The spark process is not actually killed after the job be paused

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new be0aedd  KYLIN-4449 The spark process is not actually killed after the job be paused
be0aedd is described below

commit be0aedd6a2a1cf23e4ff50597a264857d2dff22b
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Tue Sep 1 14:55:44 2020 +0800

    KYLIN-4449 The spark process is not actually killed after the job be paused
---
 build/bin/kill-process-tree.sh                     | 56 ++++++++++++++++++++++
 .../org/apache/kylin/common/JobProcessContext.java | 10 ++++
 .../kylin/job/execution/AbstractExecutable.java    |  3 +-
 .../kylin/job/execution/ExecutableManager.java     | 38 ++++++++++++---
 4 files changed, 100 insertions(+), 7 deletions(-)

diff --git a/build/bin/kill-process-tree.sh b/build/bin/kill-process-tree.sh
new file mode 100644
index 0000000..a79ead8
--- /dev/null
+++ b/build/bin/kill-process-tree.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function help_func {
+    echo "Usage: kill-process-tree.sh <PID>(process id)"
+    echo "       kill-process-tree.sh 12345"
+    exit 1
+}
+
+function isRunning() {
+    [[ -n "$(ps -p $1 -o pid=)" ]]
+}
+
+function killTree() {
+    local parent=$1 child
+    for child in $(ps ax -o ppid= -o pid= | awk "\$1==$parent {print \$2}"); do
+        killTree ${child}
+    done
+    kill ${parent}
+    if isRunning ${parent}; then
+        sleep 5
+        if isRunning ${parent}; then
+            kill -9 ${parent}
+        fi
+    fi
+}
+
+# Check parameters count.
+if [[ $# -ne 1 ]]; then
+    help_func
+fi
+
+# Check whether it contains non-digit characters.
+# Remove all digit characters and check for length.
+# If there's length it's not a number.
+if [[ -n ${1//[0-9]/} ]]; then
+    help_func
+fi
+
+killTree $@
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java b/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java
index 4166ce0..cb5ac47 100644
--- a/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java
@@ -18,8 +18,10 @@
 
 package org.apache.kylin.common;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
+import java.lang.reflect.Field;
 import java.util.Map;
 
 public class JobProcessContext {
@@ -37,4 +39,12 @@ public class JobProcessContext {
     public static void removeProcess(String jobId){
         runningProcess.remove(jobId);
     }
+
+    public static int getPid(Process process) throws IllegalAccessException, NoSuchFieldException {
+        String className = process.getClass().getName();
+        Preconditions.checkState(className.equals("java.lang.UNIXProcess"));
+        Field f = process.getClass().getDeclaredField("pid");
+        f.setAccessible(true);
+        return f.getInt(process);
+    }
 }
\ No newline at end of file
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 7628975..29d9b5a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -196,8 +196,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
                 try {
                     result = doWork(executableContext);
                 } catch (JobStoppedException e) {
-                    //The job be paused, ignore it
+                    logger.debug("The job be paused, ignore it: {}", this.toString());
                 } catch (Throwable e) {
+                    logger.error("error running Executable: {}", this.toString());
                     catchedException = e;
                 } finally {
                     cleanup(result);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 6b1242b..8772d32 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -24,11 +24,14 @@ import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
 import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.IllegalFormatException;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.JobProcessContext;
@@ -53,6 +56,9 @@ public class ExecutableManager {
 
     private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
 
+    private static final int CMD_EXEC_TIMEOUT_SEC = 60;
+    private static final String KILL_PROCESS_TREE = "kill-process-tree.sh";
+
     public static ExecutableManager getInstance(KylinConfig config) {
         return config.getManager(ExecutableManager.class);
     }
@@ -499,12 +505,32 @@ public class ExecutableManager {
     }
 
     public void destroyProcess(String jobId) {
-        // in ut env, there is no process for job, just do nothing
-        if (!config.isUTEnv()) {
-            Process process = JobProcessContext.getProcess(jobId);
-            if (process != null && process.isAlive()) {
-                logger.info("Will destroy process " + process.toString());
-                process.destroyForcibly();
+        Process originProc = JobProcessContext.getProcess(jobId);
+        if (Objects.nonNull(originProc) && originProc.isAlive()) {
+            try {
+                final int ppid = JobProcessContext.getPid(originProc);
+                logger.info("start to destroy process {} of job {}", ppid, jobId);
+                //build cmd template
+                StringBuilder cmdBuilder = new StringBuilder("bash ");
+                cmdBuilder.append(Paths.get(KylinConfig.getKylinHome(), "bin", KILL_PROCESS_TREE));
+                cmdBuilder.append(" ");
+                cmdBuilder.append(ppid);
+                final String killCmd = cmdBuilder.toString();
+                Process killProc = Runtime.getRuntime().exec(killCmd);
+                if (killProc.waitFor(CMD_EXEC_TIMEOUT_SEC, TimeUnit.SECONDS)) {
+                    logger.info("try to destroy process {} of job {}, exec cmd '{}', exitValue : {}", ppid, jobId,
+                            killCmd, killProc.exitValue());
+                    if (!originProc.isAlive()) {
+                        logger.info("destroy process {} of job {} SUCCEED.", ppid, jobId);
+                        return;
+                    }
+                    logger.info("destroy process {} of job {} FAILED.", ppid, jobId);
+                }
+
+                //generally, code executing wouldn't reach here
+                logger.warn("destroy process {} of job {} TIMEOUT exceed {}s.", ppid, jobId, CMD_EXEC_TIMEOUT_SEC);
+            } catch (Exception e) {
+                logger.error("destroy process of job {} FAILED.", jobId, e);
             }
         }
     }