You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/10/16 12:06:59 UTC

incubator-kylin git commit: KYLIN-1021 Upload dependent jars of kylin to HDFS and set tmpjars

Repository: incubator-kylin
Updated Branches:
  refs/heads/1.x-staging dca815799 -> 88c660fd2


KYLIN-1021 Upload dependent jars of kylin to HDFS and set tmpjars


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/88c660fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/88c660fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/88c660fd

Branch: refs/heads/1.x-staging
Commit: 88c660fd2b2a7aa6654b8929b831c6f861bcf51b
Parents: dca8157
Author: Feng Yu <hz...@corp.netease.com>
Authored: Fri Oct 16 15:37:55 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Oct 16 15:37:55 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  4 ++
 .../kylin/job/hadoop/AbstractHadoopJob.java     | 71 ++++++++++++++++++--
 2 files changed, 71 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/88c660fd/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 14c0fb4..d6fe142 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -332,6 +332,10 @@ public class KylinConfig {
         }
         return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
     }
+    
+    public String getKylinJobMRLibDir() {
+        return getOptional("kylin.job.mr.lib.dir", "");
+    }
 
     public void overrideKylinJobJarPath(String path) {
         logger.info("override " + KYLIN_JOB_JAR + " to " + path);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/88c660fd/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 9b3c58f..7b3af95 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -23,7 +23,7 @@ package org.apache.kylin.job.hadoop;
  *
  */
 
-import static org.apache.hadoop.util.StringUtils.formatTime;
+import static org.apache.hadoop.util.StringUtils.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -37,6 +37,7 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -147,6 +148,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         if (classpath == null || classpath.length() == 0) {
             logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
             classpath = getDefaultMapRedClasspath();
+            classpath = classpath.replace(":", ","); // yarn classpath is comma separated
             logger.info("The default mapred classpath is: " + classpath);
         }
 
@@ -164,6 +166,66 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
 
         jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
         logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+
+        // set extra dependencies as tmpjars & tmpfiles if configured
+        setJobTmpJarsAndFiles(job);
+    }
+
+    private void setJobTmpJarsAndFiles(Job job) {
+        String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
+        if (StringUtils.isBlank(mrLibDir))
+            return;
+
+        try {
+            Configuration jobConf = job.getConfiguration();
+            FileSystem fs = FileSystem.get(new Configuration(jobConf));
+            FileStatus[] fList = fs.listStatus(new Path(mrLibDir));
+            
+            StringBuilder jarList = new StringBuilder();
+            StringBuilder fileList = new StringBuilder();
+            
+            for (FileStatus file : fList) {
+                Path p = file.getPath();
+                StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
+                if (list.length() > 0)
+                    list.append(",");
+                list.append(mrLibDir + "/" + file.getPath().getName());
+            }
+            
+            appendTmpFiles(fileList.toString(), jobConf);
+            appendTmpJars(jarList.toString(), jobConf);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void appendTmpJars(String jarList, Configuration conf) {
+        if (StringUtils.isBlank(jarList))
+            return;
+        
+        String tmpJars = conf.get("tmpjars", null);
+        if (tmpJars == null) {
+            tmpJars = jarList;
+        } else {
+            tmpJars += "," + jarList;
+        }
+        conf.set("tmpjars", tmpJars);
+        logger.info("Job 'tmpjars' updated -- " + tmpJars);
+    }
+
+    private void appendTmpFiles(String fileList, Configuration conf) {
+        if (StringUtils.isBlank(fileList))
+            return;
+        
+        String tmpFiles = conf.get("tmpfiles", null);
+        if (tmpFiles == null) {
+            tmpFiles = fileList;
+        } else {
+            tmpFiles += "," + fileList;
+        }
+        conf.set("tmpfiles", tmpFiles);
+        logger.info("Job 'tmpfiles' updated -- " + tmpFiles);
     }
 
     private String getDefaultMapRedClasspath() {
@@ -174,7 +236,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             ShellCmdOutput output = new ShellCmdOutput();
             executor.execute("mapred classpath", output);
 
-            classpath = output.getOutput();
+            classpath = output.getOutput().trim();
         } catch (IOException e) {
             logger.error("Failed to run: 'mapred classpath'.", e);
         }
@@ -276,9 +338,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         else
             hdfsMetaDir = "file:///" + hdfsMetaDir;
         logger.info("HDFS meta dir is: " + hdfsMetaDir);
-        conf.set("tmpfiles", hdfsMetaDir);
+        
+        appendTmpFiles(hdfsMetaDir, conf);
     }
-
+    
     private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
         ResourceStore from = ResourceStore.getStore(kylinConfig);
         KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());