You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/27 07:25:38 UTC
[25/41] incubator-kylin git commit: fix classpath for all mapred jobs,
CDH 5.1 now works!
fix classpath for all mapred jobs, CDH 5.1 now works!
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f2dd6651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f2dd6651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f2dd6651
Branch: refs/heads/inverted-index
Commit: f2dd665171777db6060f455232182ed1751f9d5e
Parents: 752187b
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Feb 13 14:59:31 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Feb 13 14:59:31 2015 +0800
----------------------------------------------------------------------
.../kylin/job/hadoop/AbstractHadoopJob.java | 42 +++++++++++++-------
.../cardinality/HiveColumnCardinalityJob.java | 22 ++--------
.../kylin/job/hadoop/cube/CubeHFileJob.java | 7 +---
.../apache/kylin/job/hadoop/cube/CuboidJob.java | 7 +---
.../job/hadoop/cube/FactDistinctColumnsJob.java | 15 ++-----
.../job/hadoop/cube/KeyDistributionJob.java | 2 +-
.../kylin/job/hadoop/cube/MergeCuboidJob.java | 8 +---
.../hadoop/cube/RangeKeyDistributionJob.java | 7 +---
.../cube/RowKeyDistributionCheckerJob.java | 2 +-
.../hadoop/invertedindex/IICreateHFileJob.java | 14 ++-----
.../invertedindex/IIDistinctColumnsJob.java | 18 +++------
.../hadoop/invertedindex/InvertedIndexJob.java | 16 +++-----
.../invertedindex/RandomKeyDistributionJob.java | 3 +-
13 files changed, 56 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 4d6d8d4..1997327 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -113,24 +113,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
return optionsHelper.hasOption(option);
}
- private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
-
protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
int retVal = 0;
long start = System.nanoTime();
- String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
- logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " to " + MAP_REDUCE_CLASSPATH);
- if (kylinHiveDependency != null) {
- // yarn classpath is comma separated
- kylinHiveDependency = kylinHiveDependency.replace(":", ",");
- Configuration jobConf = job.getConfiguration();
- final String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
- if (classpath == null) {
- jobConf.set(MAP_REDUCE_CLASSPATH, kylinHiveDependency);
- } else {
- jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
- }
- }
if (isAsync) {
job.submit();
} else {
@@ -150,6 +135,33 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
System.exit(5);
}
}
+
+ private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
+
+ protected void setJobClasspath(Job job) {
+ String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath();
+ File jarFile = new File(jarPath);
+ if (jarFile.exists()) {
+ job.setJar(jarPath);
+ logger.info("append job jar: " + jarPath);
+ } else {
+ job.setJarByClass(this.getClass());
+ }
+
+ String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
+ logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " to " + MAP_REDUCE_CLASSPATH);
+ if (kylinHiveDependency != null) {
+ // yarn classpath is comma separated
+ kylinHiveDependency = kylinHiveDependency.replace(":", ",");
+ Configuration jobConf = job.getConfiguration();
+ final String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
+ if (classpath == null) {
+ jobConf.set(MAP_REDUCE_CLASSPATH, kylinHiveDependency);
+ } else {
+ jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
+ }
+ }
+ }
public void addInputDirs(String input, Job job) throws IOException {
for (String inp : StringSplitter.split(input, ",")) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
index 773e62f..d5e5292 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
@@ -49,20 +49,9 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
public static final String OUTPUT_PATH = "/tmp/cardinality";
- /**
- * This is the jar path
- */
- private String jarPath;
-
- private String table;
-
public HiveColumnCardinalityJob() {
}
- public HiveColumnCardinalityJob(String path, String tokenPath) {
- this.jarPath = path;
- }
-
@Override
public int run(String[] args) throws Exception {
@@ -80,19 +69,14 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
Configuration conf = getConf();
job = Job.getInstance(conf, jobName);
- // set job configuration - basic
- if (jarPath == null || !new File(jarPath).exists()) {
- job.setJarByClass(getClass());
- } else {
- job.setJar(jarPath);
- }
-
+ setJobClasspath(job);
+
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set("dfs.block.size", "67108864");
// Mapper
- this.table = getOptionValue(OPTION_TABLE);
+ String table = getOptionValue(OPTION_TABLE);
String[] dbTableNames = HadoopUtil.parseHiveTableName(table);
HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
index f0910df..a0d54cb 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
@@ -67,12 +67,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
CubeInstance cube = cubeMgr.getCube(cubeName);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
+ setJobClasspath(job);
addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
FileOutputFormat.setOutputPath(job, output);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
index 0b3d272..be54ca7 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
@@ -83,12 +83,7 @@ public class CuboidJob extends AbstractHadoopJob {
logger.info("Starting: " + job.getJobName());
FileInputFormat.setInputPaths(job, input);
- File jarFile = new File(config.getKylinJobJarPath());
- if (jarFile.exists()) {
- job.setJar(config.getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
+ setJobClasspath(job);
// Mapper
if (this.mapperClass == null) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 47c8c50..ccc2e0d 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -18,7 +18,6 @@
package org.apache.kylin.job.hadoop.cube;
-import java.io.File;
import java.io.IOException;
import org.apache.commons.cli.Options;
@@ -31,15 +30,14 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author yangli9
@@ -71,6 +69,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
System.out.println("Starting: " + job.getJobName());
+ setJobClasspath(job);
+
setupMapper(intermediateTable);
setupReducer(output);
@@ -90,13 +90,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
private void setupMapper(String intermediateTable) throws IOException {
// FileInputFormat.setInputPaths(job, input);
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
HCatInputFormat.setInput(job, dbTableNames[0],
dbTableNames[1]);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
index 7de432a..39efd8f 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
@@ -89,7 +89,7 @@ package org.apache.kylin.job.hadoop.cube;
// Job job = Job.getInstanceFromEnv(getConf(), jobName);
//
// // set job configuration - basic
-// job.setJarByClass(getClass());
+// setJobClasspath(job);
// addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
//
// Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
index b8b03be..0831d31 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
@@ -61,13 +61,7 @@ public class MergeCuboidJob extends CuboidJob {
System.out.println("Starting: " + jobName);
job = Job.getInstance(getConf(), jobName);
- // set job configuration - basic
- File JarFile = new File(config.getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(config.getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
+ setJobClasspath(job);
// set inputs
addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
index 8b77497..061cb90 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
@@ -68,12 +68,7 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob {
String jobName = getOptionValue(OPTION_JOB_NAME);
job = Job.getInstance(getConf(), jobName);
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
+ setJobClasspath(job);
addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
index a441006..faf6675 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
@@ -59,7 +59,7 @@ public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
String jobName = getOptionValue(OPTION_JOB_NAME);
job = Job.getInstance(getConf(), jobName);
- job.setJarByClass(this.getClass());
+ setJobClasspath(job);
addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
index 4f2a568..c479b68 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -18,8 +18,6 @@
package org.apache.kylin.job.hadoop.invertedindex;
-import java.io.File;
-
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -31,12 +29,11 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author yangli9
@@ -61,12 +58,7 @@ public class IICreateHFileJob extends AbstractHadoopJob {
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
+ setJobClasspath(job);
addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
FileOutputFormat.setOutputPath(job, output);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
index 5b67057..9c7051a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
@@ -18,7 +18,6 @@
package org.apache.kylin.job.hadoop.invertedindex;
-import java.io.File;
import java.io.IOException;
import org.apache.commons.cli.Options;
@@ -31,16 +30,16 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.job.hadoop.hive.IIJoinedFlatTableDesc;
import org.apache.kylin.job.hadoop.hive.IntermediateColumnDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -74,6 +73,8 @@ public class IIDistinctColumnsJob extends AbstractHadoopJob {
job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
+ setJobClasspath(job);
+
setupMapper();
setupReducer(output);
@@ -99,13 +100,6 @@ public class IIDistinctColumnsJob extends AbstractHadoopJob {
private void setupMapper() throws IOException {
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME);
String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
index 547a499..6a0532e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
@@ -18,7 +18,6 @@
package org.apache.kylin.job.hadoop.invertedindex;
-import java.io.File;
import java.io.IOException;
import org.apache.commons.cli.Options;
@@ -31,15 +30,15 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
/**
* @author yangli9
@@ -69,6 +68,8 @@ public class InvertedIndexJob extends AbstractHadoopJob {
IIInstance ii = getII(iiname);
short sharding = ii.getDescriptor().getSharding();
+
+ setJobClasspath(job);
setupMapper(intermediateTable);
setupReducer(output, sharding);
@@ -103,13 +104,6 @@ public class InvertedIndexJob extends AbstractHadoopJob {
private void setupMapper(String intermediateTable) throws IOException {
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
HCatInputFormat.setInput(job, dbTableNames[0],
dbTableNames[1]);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2dd6651/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
index c55e5d4..0f94d32 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
@@ -63,7 +63,8 @@ public class RandomKeyDistributionJob extends AbstractHadoopJob {
String jobName = getOptionValue(OPTION_JOB_NAME);
job = Job.getInstance(getConf(), jobName);
- job.setJarByClass(this.getClass());
+ setJobClasspath(job);
+
addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));