You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:02:56 UTC
[44/47] incubator-kylin git commit: KYLIN-629 Kylin failed to run
mapreduce job if there is no mapreduce.application.classpath in
mapred-site.xml
KYLIN-629 Kylin failed to run mapreduce job if there is no mapreduce.application.classpath in mapred-site.xml
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a5821c83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a5821c83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a5821c83
Branch: refs/heads/master
Commit: a5821c83f8393853dba271522c2c96bd912f0b0e
Parents: 93f1f26
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Wed Mar 11 11:21:26 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Wed Mar 11 11:21:26 2015 +0800
----------------------------------------------------------------------
.../kylin/job/hadoop/AbstractHadoopJob.java | 64 ++++++++++++++------
1 file changed, 44 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a5821c83/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index ff19357..087dd04 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -23,18 +23,6 @@ package org.apache.kylin.job.hadoop;
*
*/
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -50,9 +38,22 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +64,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.hadoop.util.StringUtils.formatTime;
+
@SuppressWarnings("static-access")
public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
@@ -121,7 +124,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
} else {
job.waitForCompletion(true);
retVal = job.isSuccessful() ? 0 : 1;
- logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + StringUtils.formatTime((System.nanoTime() - start) / 1000000L));
+ logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
}
return retVal;
}
@@ -135,7 +138,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
System.exit(5);
}
}
-
+
private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
protected void setJobClasspath(Job job) {
@@ -154,16 +157,37 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
// yarn classpath is comma separated
kylinHiveDependency = kylinHiveDependency.replace(":", ",");
Configuration jobConf = job.getConfiguration();
- final String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
- if (classpath == null) {
- jobConf.set(MAP_REDUCE_CLASSPATH, kylinHiveDependency);
- } else {
- jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
+ String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
+ if (classpath == null || classpath.length() == 0) {
+ logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
+ classpath = getDefaultMapRedClasspath();
+ logger.info("The default mapred classpath is: " + classpath);
}
+
+ jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
+
}
logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
}
+
+ private String getDefaultMapRedClasspath() {
+
+ String classpath = "";
+ try {
+ CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+ ShellCmdOutput output = new ShellCmdOutput();
+ executor.execute("mapred classpath", output);
+
+ classpath = output.getOutput();
+ } catch (IOException e) {
+ logger.error("Failed to run: 'mapred classpath'.", e);
+ }
+
+ return classpath;
+ }
+
+
public void addInputDirs(String input, Job job) throws IOException {
for (String inp : StringSplitter.split(input, ",")) {
inp = inp.trim();