You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:28:32 UTC

[18/50] incubator-kylin git commit: fix classpath for all mapred jobs, CDH 5.1 now works!

fix classpath for all mapred jobs, CDH 5.1 now works!


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f2dd6651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f2dd6651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f2dd6651

Branch: refs/heads/master
Commit: f2dd665171777db6060f455232182ed1751f9d5e
Parents: 752187b
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Feb 13 14:59:31 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Feb 13 14:59:31 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/hadoop/AbstractHadoopJob.java     | 42 +++++++++++++-------
 .../cardinality/HiveColumnCardinalityJob.java   | 22 ++--------
 .../kylin/job/hadoop/cube/CubeHFileJob.java     |  7 +---
 .../apache/kylin/job/hadoop/cube/CuboidJob.java |  7 +---
 .../job/hadoop/cube/FactDistinctColumnsJob.java | 15 ++-----
 .../job/hadoop/cube/KeyDistributionJob.java     |  2 +-
 .../kylin/job/hadoop/cube/MergeCuboidJob.java   |  8 +---
 .../hadoop/cube/RangeKeyDistributionJob.java    |  7 +---
 .../cube/RowKeyDistributionCheckerJob.java      |  2 +-
 .../hadoop/invertedindex/IICreateHFileJob.java  | 14 ++-----
 .../invertedindex/IIDistinctColumnsJob.java     | 18 +++------
 .../hadoop/invertedindex/InvertedIndexJob.java  | 16 +++-----
 .../invertedindex/RandomKeyDistributionJob.java |  3 +-
 13 files changed, 56 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 4d6d8d4..1997327 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -113,24 +113,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         return optionsHelper.hasOption(option);
     }
 
-    private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
-
     protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
         int retVal = 0;
         long start = System.nanoTime();
-        String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
-        logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " to " + MAP_REDUCE_CLASSPATH);
-        if (kylinHiveDependency != null) {
-            // yarn classpath is comma separated
-            kylinHiveDependency = kylinHiveDependency.replace(":", ",");
-            Configuration jobConf = job.getConfiguration();
-            final String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
-            if (classpath == null) {
-                jobConf.set(MAP_REDUCE_CLASSPATH, kylinHiveDependency);
-            } else {
-                jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
-            }
-        }
         if (isAsync) {
             job.submit();
         } else {
@@ -150,6 +135,33 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             System.exit(5);
         }
     }
+    
+    private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
+
+    protected void setJobClasspath(Job job) {
+        String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath();
+        File jarFile = new File(jarPath);
+        if (jarFile.exists()) {
+            job.setJar(jarPath);
+            logger.info("append job jar: " + jarPath);
+        } else {
+            job.setJarByClass(this.getClass());
+        }
+
+        String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
+        logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " to " + MAP_REDUCE_CLASSPATH);
+        if (kylinHiveDependency != null) {
+            // yarn classpath is comma separated
+            kylinHiveDependency = kylinHiveDependency.replace(":", ",");
+            Configuration jobConf = job.getConfiguration();
+            final String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
+            if (classpath == null) {
+                jobConf.set(MAP_REDUCE_CLASSPATH, kylinHiveDependency);
+            } else {
+                jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
+            }
+        }
+    }
 
     public void addInputDirs(String input, Job job) throws IOException {
         for (String inp : StringSplitter.split(input, ",")) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
index 773e62f..d5e5292 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
@@ -49,20 +49,9 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
 
     public static final String OUTPUT_PATH = "/tmp/cardinality";
 
-    /**
-     * This is the jar path
-     */
-    private String jarPath;
-
-    private String table;
-
     public HiveColumnCardinalityJob() {
     }
 
-    public HiveColumnCardinalityJob(String path, String tokenPath) {
-        this.jarPath = path;
-    }
-
     @Override
     public int run(String[] args) throws Exception {
 
@@ -80,19 +69,14 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
             Configuration conf = getConf();
             job = Job.getInstance(conf, jobName);
 
-            // set job configuration - basic
-            if (jarPath == null || !new File(jarPath).exists()) {
-                job.setJarByClass(getClass());
-            } else {
-                job.setJar(jarPath);
-            }
-
+            setJobClasspath(job);
+            
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             FileOutputFormat.setOutputPath(job, output);
             job.getConfiguration().set("dfs.block.size", "67108864");
 
             // Mapper
-            this.table = getOptionValue(OPTION_TABLE);
+            String table = getOptionValue(OPTION_TABLE);
             String[] dbTableNames = HadoopUtil.parseHiveTableName(table);
             HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
index f0910df..a0d54cb 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
@@ -67,12 +67,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
 
-            File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-            if (JarFile.exists()) {
-                job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-            } else {
-                job.setJarByClass(this.getClass());
-            }
+            setJobClasspath(job);
 
             addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
             FileOutputFormat.setOutputPath(job, output);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
index 0b3d272..be54ca7 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
@@ -83,12 +83,7 @@ public class CuboidJob extends AbstractHadoopJob {
             logger.info("Starting: " + job.getJobName());
             FileInputFormat.setInputPaths(job, input);
 
-            File jarFile = new File(config.getKylinJobJarPath());
-            if (jarFile.exists()) {
-                job.setJar(config.getKylinJobJarPath());
-            } else {
-                job.setJarByClass(this.getClass());
-            }
+            setJobClasspath(job);
 
             // Mapper
             if (this.mapperClass == null) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 47c8c50..ccc2e0d 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.cli.Options;
@@ -31,15 +30,14 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author yangli9
@@ -71,6 +69,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             System.out.println("Starting: " + job.getJobName());
 
+            setJobClasspath(job);
+            
             setupMapper(intermediateTable);
             setupReducer(output);
 
@@ -90,13 +90,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
     private void setupMapper(String intermediateTable) throws IOException {
 //        FileInputFormat.setInputPaths(job, input);
 
-        File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-        if (JarFile.exists()) {
-            job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-        } else {
-            job.setJarByClass(this.getClass());
-        }
-        
         String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
         HCatInputFormat.setInput(job, dbTableNames[0],
                 dbTableNames[1]);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
index 7de432a..39efd8f 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
@@ -89,7 +89,7 @@ package org.apache.kylin.job.hadoop.cube;
 //            Job job = Job.getInstanceFromEnv(getConf(), jobName);
 //
 //            // set job configuration - basic 
-//            job.setJarByClass(getClass());
+//            setJobClasspath(job);
 //            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
 //
 //            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
index b8b03be..0831d31 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
@@ -61,13 +61,7 @@ public class MergeCuboidJob extends CuboidJob {
             System.out.println("Starting: " + jobName);
             job = Job.getInstance(getConf(), jobName);
 
-            // set job configuration - basic
-            File JarFile = new File(config.getKylinJobJarPath());
-            if (JarFile.exists()) {
-                job.setJar(config.getKylinJobJarPath());
-            } else {
-                job.setJarByClass(this.getClass());
-            }
+            setJobClasspath(job);
 
             // set inputs
             addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
index 8b77497..061cb90 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
@@ -68,12 +68,7 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob {
             String jobName = getOptionValue(OPTION_JOB_NAME);
             job = Job.getInstance(getConf(), jobName);
 
-            File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-            if (JarFile.exists()) {
-                job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-            } else {
-                job.setJarByClass(this.getClass());
-            }
+            setJobClasspath(job);
 
             addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
index a441006..faf6675 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
@@ -59,7 +59,7 @@ public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
             String jobName = getOptionValue(OPTION_JOB_NAME);
             job = Job.getInstance(getConf(), jobName);
 
-            job.setJarByClass(this.getClass());
+            setJobClasspath(job);
 
             addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
index 4f2a568..c479b68 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.job.hadoop.invertedindex;
 
-import java.io.File;
-
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -31,12 +29,11 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author yangli9
@@ -61,12 +58,7 @@ public class IICreateHFileJob extends AbstractHadoopJob {
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
 
-            File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-            if (JarFile.exists()) {
-                job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-            } else {
-                job.setJarByClass(this.getClass());
-            }
+            setJobClasspath(job);
 
             addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
             FileOutputFormat.setOutputPath(job, output);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
index 5b67057..9c7051a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.job.hadoop.invertedindex;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.cli.Options;
@@ -31,16 +30,16 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.job.hadoop.hive.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.hadoop.hive.IntermediateColumnDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -74,6 +73,8 @@ public class IIDistinctColumnsJob extends AbstractHadoopJob {
             job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
             job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
 
+            setJobClasspath(job);
+            
             setupMapper();
             setupReducer(output);
 
@@ -99,13 +100,6 @@ public class IIDistinctColumnsJob extends AbstractHadoopJob {
 
     private void setupMapper() throws IOException {
 
-        File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-        if (JarFile.exists()) {
-            job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-        } else {
-            job.setJarByClass(this.getClass());
-        }
-
         String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME);
         String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
index 547a499..6a0532e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.job.hadoop.invertedindex;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.cli.Options;
@@ -31,15 +30,15 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
 
 /**
  * @author yangli9
@@ -69,6 +68,8 @@ public class InvertedIndexJob extends AbstractHadoopJob {
 
             IIInstance ii = getII(iiname);
             short sharding = ii.getDescriptor().getSharding();
+            
+            setJobClasspath(job);
 
             setupMapper(intermediateTable);
             setupReducer(output, sharding);
@@ -103,13 +104,6 @@ public class InvertedIndexJob extends AbstractHadoopJob {
 
     private void setupMapper(String intermediateTable) throws IOException {
 
-        File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-        if (JarFile.exists()) {
-            job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-        } else {
-            job.setJarByClass(this.getClass());
-        }
-
         String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
         HCatInputFormat.setInput(job, dbTableNames[0],
                 dbTableNames[1]);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
index c55e5d4..0f94d32 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
@@ -63,7 +63,8 @@ public class RandomKeyDistributionJob extends AbstractHadoopJob {
             String jobName = getOptionValue(OPTION_JOB_NAME);
             job = Job.getInstance(getConf(), jobName);
 
-            job.setJarByClass(this.getClass());
+            setJobClasspath(job);
+            
             addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
 
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));