You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/02 03:07:24 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4449 The spark
process is not actually killed after the job be paused
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new be0aedd KYLIN-4449 The spark process is not actually killed after the job be paused
be0aedd is described below
commit be0aedd6a2a1cf23e4ff50597a264857d2dff22b
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Tue Sep 1 14:55:44 2020 +0800
KYLIN-4449 The spark process is not actually killed after the job be paused
---
build/bin/kill-process-tree.sh | 56 ++++++++++++++++++++++
.../org/apache/kylin/common/JobProcessContext.java | 10 ++++
.../kylin/job/execution/AbstractExecutable.java | 3 +-
.../kylin/job/execution/ExecutableManager.java | 38 ++++++++++++---
4 files changed, 100 insertions(+), 7 deletions(-)
diff --git a/build/bin/kill-process-tree.sh b/build/bin/kill-process-tree.sh
new file mode 100644
index 0000000..a79ead8
--- /dev/null
+++ b/build/bin/kill-process-tree.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function help_func {
+ echo "Usage: kill-process-tree.sh <PID>(process id)"
+ echo " kill-process-tree.sh 12345"
+ exit 1
+}
+
+function isRunning() {
+ [[ -n "$(ps -p $1 -o pid=)" ]]
+}
+
+function killTree() {
+ local parent=$1 child
+ for child in $(ps ax -o ppid= -o pid= | awk "\$1==$parent {print \$2}"); do
+ killTree ${child}
+ done
+ kill ${parent}
+ if isRunning ${parent}; then
+ sleep 5
+ if isRunning ${parent}; then
+ kill -9 ${parent}
+ fi
+ fi
+}
+
+# Check parameters count.
+if [[ $# -ne 1 ]]; then
+ help_func
+fi
+
+# Check whether it contains non-digit characters.
+# Remove all digit characters and check for length.
+# If there's length it's not a number.
+if [[ -n ${1//[0-9]/} ]]; then
+ help_func
+fi
+
+killTree $@
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java b/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java
index 4166ce0..cb5ac47 100644
--- a/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java
@@ -18,8 +18,10 @@
package org.apache.kylin.common;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
import java.util.Map;
public class JobProcessContext {
@@ -37,4 +39,12 @@ public class JobProcessContext {
public static void removeProcess(String jobId){
runningProcess.remove(jobId);
}
+
+ public static int getPid(Process process) throws IllegalAccessException, NoSuchFieldException {
+ String className = process.getClass().getName();
+ Preconditions.checkState(className.equals("java.lang.UNIXProcess"));
+ Field f = process.getClass().getDeclaredField("pid");
+ f.setAccessible(true);
+ return f.getInt(process);
+ }
}
\ No newline at end of file
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 7628975..29d9b5a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -196,8 +196,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
try {
result = doWork(executableContext);
} catch (JobStoppedException e) {
- //The job be paused, ignore it
+ logger.debug("The job be paused, ignore it: {}", this.toString());
} catch (Throwable e) {
+ logger.error("error running Executable: {}", this.toString());
catchedException = e;
} finally {
cleanup(result);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 6b1242b..8772d32 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -24,11 +24,14 @@ import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.IllegalFormatException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.JobProcessContext;
@@ -53,6 +56,9 @@ public class ExecutableManager {
private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
+ private static final int CMD_EXEC_TIMEOUT_SEC = 60;
+ private static final String KILL_PROCESS_TREE = "kill-process-tree.sh";
+
public static ExecutableManager getInstance(KylinConfig config) {
return config.getManager(ExecutableManager.class);
}
@@ -499,12 +505,32 @@ public class ExecutableManager {
}
public void destroyProcess(String jobId) {
- // in ut env, there is no process for job, just do nothing
- if (!config.isUTEnv()) {
- Process process = JobProcessContext.getProcess(jobId);
- if (process != null && process.isAlive()) {
- logger.info("Will destroy process " + process.toString());
- process.destroyForcibly();
+ Process originProc = JobProcessContext.getProcess(jobId);
+ if (Objects.nonNull(originProc) && originProc.isAlive()) {
+ try {
+ final int ppid = JobProcessContext.getPid(originProc);
+ logger.info("start to destroy process {} of job {}", ppid, jobId);
+ //build cmd template
+ StringBuilder cmdBuilder = new StringBuilder("bash ");
+ cmdBuilder.append(Paths.get(KylinConfig.getKylinHome(), "bin", KILL_PROCESS_TREE));
+ cmdBuilder.append(" ");
+ cmdBuilder.append(ppid);
+ final String killCmd = cmdBuilder.toString();
+ Process killProc = Runtime.getRuntime().exec(killCmd);
+ if (killProc.waitFor(CMD_EXEC_TIMEOUT_SEC, TimeUnit.SECONDS)) {
+ logger.info("try to destroy process {} of job {}, exec cmd '{}', exitValue : {}", ppid, jobId,
+ killCmd, killProc.exitValue());
+ if (!originProc.isAlive()) {
+ logger.info("destroy process {} of job {} SUCCEED.", ppid, jobId);
+ return;
+ }
+ logger.info("destroy process {} of job {} FAILED.", ppid, jobId);
+ }
+
+ //generally, code executing wouldn't reach here
+ logger.warn("destroy process {} of job {} TIMEOUT exceed {}s.", ppid, jobId, CMD_EXEC_TIMEOUT_SEC);
+ } catch (Exception e) {
+ logger.error("destroy process of job {} FAILED.", jobId, e);
}
}
}