You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/21 12:52:03 UTC

[08/52] [abbrv] kylin git commit: Feature changed: automatically append hive dependent jars to 'tmpjars'

Feature changed: automatically append hive dependent jars to 'tmpjars'

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bfccac90
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bfccac90
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bfccac90

Branch: refs/heads/1.x-HBase1.1.3
Commit: bfccac90f54a39f87738eb7a6955a0f0e404bc60
Parents: 73cfbef
Author: Zhong Yanghong <ya...@ebay.com>
Authored: Sun Jan 24 09:35:22 2016 +0000
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 25 09:47:11 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/hadoop/AbstractHadoopJob.java     | 123 ++++++++++++++++---
 1 file changed, 103 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/bfccac90/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index a851756..f5c85eb 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -127,6 +129,26 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         return retVal;
     }
 
+    private static final String KYLIN_HIVE_DEPENDENCY_JARS = "[^,]*hive-exec.jar|[^,]*hive-metastore.jar|[^,]*hive-hcatalog-core[0-9.-]*jar";
+
+    String filterKylinHiveDependency(String kylinHiveDependency) {
+        if (StringUtils.isBlank(kylinHiveDependency))
+            return "";
+
+        StringBuilder jarList = new StringBuilder();
+
+        Pattern hivePattern = Pattern.compile(KYLIN_HIVE_DEPENDENCY_JARS);
+        Matcher matcher = hivePattern.matcher(kylinHiveDependency);
+
+        while (matcher.find()) {
+            if (jarList.length() > 0)
+                jarList.append(",");
+            jarList.append(matcher.group());
+        }
+
+        return jarList.toString();
+    }
+
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
     protected void setJobClasspath(Job job) {
@@ -141,14 +163,13 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
 
         String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
         String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
-        logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
+        logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
 
         Configuration jobConf = job.getConfiguration();
         String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
         if (classpath == null || classpath.length() == 0) {
             logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
             classpath = getDefaultMapRedClasspath();
-            classpath = classpath.replace(":", ","); // yarn classpath is comma separated
             logger.info("The default mapred classpath is: " + classpath);
         }
 
@@ -158,40 +179,102 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             classpath = classpath + "," + kylinHBaseDependency;
         }
 
+        jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
+        logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+
+        /*
+         *  set extra dependencies as tmpjars & tmpfiles if configured
+         */
+        StringBuilder kylinDependency = new StringBuilder();
+
+        // for hive dependencies
         if (kylinHiveDependency != null) {
             // yarn classpath is comma separated
             kylinHiveDependency = kylinHiveDependency.replace(":", ",");
-            classpath = classpath + "," + kylinHiveDependency;
+
+            logger.info("Hive Dependencies Before Filtered: " + kylinHiveDependency);
+            String filteredHive = filterKylinHiveDependency(kylinHiveDependency);
+            logger.info("Hive Dependencies After Filtered: " + filteredHive);
+
+            if (kylinDependency.length() > 0)
+                kylinDependency.append(",");
+            kylinDependency.append(filteredHive);
         }
 
-        jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
-        logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+        // for KylinJobMRLibDir
+        String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
+        if (!StringUtils.isBlank(mrLibDir)) {
+            File dirFileMRLIB = new File(mrLibDir);
+            if (dirFileMRLIB.exists()) {
+                if (kylinDependency.length() > 0)
+                    kylinDependency.append(",");
+                kylinDependency.append(mrLibDir);
+            } else {
+                logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!");
+            }
+        }
 
-        // set extra dependencies as tmpjars & tmpfiles if configured
-        setJobTmpJarsAndFiles(job);
+        setJobTmpJarsAndFiles(job, kylinDependency.toString());
     }
 
-    private void setJobTmpJarsAndFiles(Job job) {
-        String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
-        if (StringUtils.isBlank(mrLibDir))
+    private void setJobTmpJarsAndFiles(Job job, String kylinDependency) {
+        if (StringUtils.isBlank(kylinDependency))
             return;
 
+        String[] fNameList = kylinDependency.split(",");
+
         try {
             Configuration jobConf = job.getConfiguration();
-            FileSystem fs = FileSystem.get(new Configuration(jobConf));
-            FileStatus[] fList = fs.listStatus(new Path(mrLibDir));
-            
+            FileSystem fs = FileSystem.getLocal(jobConf);
+
             StringBuilder jarList = new StringBuilder();
             StringBuilder fileList = new StringBuilder();
-            
+
+            for (String fileName : fNameList) {
+                Path p = new Path(fileName);
+                if (fs.getFileStatus(p).isDirectory()) {
+                    appendTmpDir(job, fileName);
+                    continue;
+                }
+
+                StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
+                if (list.length() > 0)
+                    list.append(",");
+                list.append(fs.getFileStatus(p).getPath().toString());
+            }
+
+            appendTmpFiles(fileList.toString(), jobConf);
+            appendTmpJars(jarList.toString(), jobConf);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void appendTmpDir(Job job, String tmpDir) {
+        if (StringUtils.isBlank(tmpDir))
+            return;
+
+        try {
+            Configuration jobConf = job.getConfiguration();
+            FileSystem fs = FileSystem.getLocal(jobConf);
+            FileStatus[] fList = fs.listStatus(new Path(tmpDir));
+
+            StringBuilder jarList = new StringBuilder();
+            StringBuilder fileList = new StringBuilder();
+
             for (FileStatus file : fList) {
                 Path p = file.getPath();
+                if (fs.getFileStatus(p).isDirectory()) {
+                    appendTmpDir(job, p.toString());
+                    continue;
+                }
+
                 StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
                 if (list.length() > 0)
                     list.append(",");
-                list.append(mrLibDir + "/" + file.getPath().getName());
+                list.append(fs.getFileStatus(p).getPath().toString());
             }
-            
+
             appendTmpFiles(fileList.toString(), jobConf);
             appendTmpJars(jarList.toString(), jobConf);
 
@@ -203,7 +286,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     private void appendTmpJars(String jarList, Configuration conf) {
         if (StringUtils.isBlank(jarList))
             return;
-        
+
         String tmpJars = conf.get("tmpjars", null);
         if (tmpJars == null) {
             tmpJars = jarList;
@@ -217,7 +300,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     private void appendTmpFiles(String fileList, Configuration conf) {
         if (StringUtils.isBlank(fileList))
             return;
-        
+
         String tmpFiles = conf.get("tmpfiles", null);
         if (tmpFiles == null) {
             tmpFiles = fileList;
@@ -338,10 +421,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         else
             hdfsMetaDir = "file:///" + hdfsMetaDir;
         logger.info("HDFS meta dir is: " + hdfsMetaDir);
-        
+
         appendTmpFiles(hdfsMetaDir, conf);
     }
-    
+
     private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
         ResourceStore from = ResourceStore.getStore(kylinConfig);
         KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());