You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/10/16 12:06:59 UTC
incubator-kylin git commit: KYLIN-1021 Upload dependent jars of kylin
to HDFS and set tmpjars
Repository: incubator-kylin
Updated Branches:
refs/heads/1.x-staging dca815799 -> 88c660fd2
KYLIN-1021 Upload dependent jars of kylin to HDFS and set tmpjars
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/88c660fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/88c660fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/88c660fd
Branch: refs/heads/1.x-staging
Commit: 88c660fd2b2a7aa6654b8929b831c6f861bcf51b
Parents: dca8157
Author: Feng Yu <hz...@corp.netease.com>
Authored: Fri Oct 16 15:37:55 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Oct 16 15:37:55 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 4 ++
.../kylin/job/hadoop/AbstractHadoopJob.java | 71 ++++++++++++++++++--
2 files changed, 71 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/88c660fd/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 14c0fb4..d6fe142 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -332,6 +332,10 @@ public class KylinConfig {
}
return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
}
+
+ public String getKylinJobMRLibDir() {
+ return getOptional("kylin.job.mr.lib.dir", "");
+ }
public void overrideKylinJobJarPath(String path) {
logger.info("override " + KYLIN_JOB_JAR + " to " + path);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/88c660fd/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 9b3c58f..7b3af95 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -23,7 +23,7 @@ package org.apache.kylin.job.hadoop;
*
*/
-import static org.apache.hadoop.util.StringUtils.formatTime;
+import static org.apache.hadoop.util.StringUtils.*;
import java.io.File;
import java.io.IOException;
@@ -37,6 +37,7 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -147,6 +148,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
if (classpath == null || classpath.length() == 0) {
logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
classpath = getDefaultMapRedClasspath();
+ classpath = classpath.replace(":", ","); // yarn classpath is comma separated
logger.info("The default mapred classpath is: " + classpath);
}
@@ -164,6 +166,66 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+
+ // set extra dependencies as tmpjars & tmpfiles if configured
+ setJobTmpJarsAndFiles(job);
+ }
+
+ private void setJobTmpJarsAndFiles(Job job) {
+ String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
+ if (StringUtils.isBlank(mrLibDir))
+ return;
+
+ try {
+ Configuration jobConf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(new Configuration(jobConf));
+ FileStatus[] fList = fs.listStatus(new Path(mrLibDir));
+
+ StringBuilder jarList = new StringBuilder();
+ StringBuilder fileList = new StringBuilder();
+
+ for (FileStatus file : fList) {
+ Path p = file.getPath();
+ StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
+ if (list.length() > 0)
+ list.append(",");
+ list.append(mrLibDir + "/" + file.getPath().getName());
+ }
+
+ appendTmpFiles(fileList.toString(), jobConf);
+ appendTmpJars(jarList.toString(), jobConf);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void appendTmpJars(String jarList, Configuration conf) {
+ if (StringUtils.isBlank(jarList))
+ return;
+
+ String tmpJars = conf.get("tmpjars", null);
+ if (tmpJars == null) {
+ tmpJars = jarList;
+ } else {
+ tmpJars += "," + jarList;
+ }
+ conf.set("tmpjars", tmpJars);
+ logger.info("Job 'tmpjars' updated -- " + tmpJars);
+ }
+
+ private void appendTmpFiles(String fileList, Configuration conf) {
+ if (StringUtils.isBlank(fileList))
+ return;
+
+ String tmpFiles = conf.get("tmpfiles", null);
+ if (tmpFiles == null) {
+ tmpFiles = fileList;
+ } else {
+ tmpFiles += "," + fileList;
+ }
+ conf.set("tmpfiles", tmpFiles);
+ logger.info("Job 'tmpfiles' updated -- " + tmpFiles);
}
private String getDefaultMapRedClasspath() {
@@ -174,7 +236,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
ShellCmdOutput output = new ShellCmdOutput();
executor.execute("mapred classpath", output);
- classpath = output.getOutput();
+ classpath = output.getOutput().trim();
} catch (IOException e) {
logger.error("Failed to run: 'mapred classpath'.", e);
}
@@ -276,9 +338,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
else
hdfsMetaDir = "file:///" + hdfsMetaDir;
logger.info("HDFS meta dir is: " + hdfsMetaDir);
- conf.set("tmpfiles", hdfsMetaDir);
+
+ appendTmpFiles(hdfsMetaDir, conf);
}
-
+
private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
ResourceStore from = ResourceStore.getStore(kylinConfig);
KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());