You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/10 08:00:22 UTC
[kylin] 01/02: KYLIN-3382 Yarn job link wasn't displayed when job
is running
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ee7f2f435c4b82ac6b3867d4d6505de0de43c24f
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Mon Jul 9 13:27:15 2018 +0800
KYLIN-3382 Yarn job link wasn't displayed when job is running
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../org/apache/kylin/job/common/PatternedLogger.java | 11 +++++++++++
.../org/apache/kylin/engine/spark/SparkExecutable.java | 17 ++++++++++++++---
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 8be5d02..0f1bd2d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Maps;
*/
public class PatternedLogger extends BufferedLogger {
private final Map<String, String> info = Maps.newHashMap();
+ ILogListener listener;
private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
@@ -56,6 +57,11 @@ public class PatternedLogger extends BufferedLogger {
super(wrappedLogger);
}
+ public PatternedLogger(Logger wrappedLogger, ILogListener listener) {
+ super(wrappedLogger);
+ this.listener = listener;
+ }
+
@Override
public void log(String message) {
super.log(message);
@@ -128,6 +134,7 @@ public class PatternedLogger extends BufferedLogger {
if (matcher.find()) {
String trackingUrl = matcher.group(1);
info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+ listener.onLogEvent(info);
}
}
@@ -135,4 +142,8 @@ public class PatternedLogger extends BufferedLogger {
return info;
}
+ public interface ILogListener{
+ void onLogEvent(Map<String, String> info);
+ }
+
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index dfaa2e1..6a1c2c6 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.ResourceTool;
@@ -135,8 +136,13 @@ public class SparkExecutable extends AbstractExecutable {
}
StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(
- "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+ if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
+ stringBuilder.append(
+ "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+ } else {
+ stringBuilder.append(
+ "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+ }
Map<String, String> sparkConfs = config.getSparkConfigOverride();
for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -149,7 +155,12 @@ public class SparkExecutable extends AbstractExecutable {
formatArgs());
logger.info("cmd: " + cmd);
CliCommandExecutor exec = new CliCommandExecutor();
- PatternedLogger patternedLogger = new PatternedLogger(logger);
+ PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
+ @Override
+ public void onLogEvent(Map<String, String> info) {
+ getManager().addJobInfo(getId(), info);
+ }
+ });
exec.execute(cmd, patternedLogger);
Map<String, String> joblogInfo = patternedLogger.getInfo();