You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:02:56 UTC

[44/47] incubator-kylin git commit: KYLIN-629 Kylin failed to run mapreduce job if there is no mapreduce.application.classpath in mapred-site.xml

KYLIN-629 Kylin failed to run mapreduce job if there is no mapreduce.application.classpath in mapred-site.xml

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a5821c83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a5821c83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a5821c83

Branch: refs/heads/master
Commit: a5821c83f8393853dba271522c2c96bd912f0b0e
Parents: 93f1f26
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Wed Mar 11 11:21:26 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Wed Mar 11 11:21:26 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/hadoop/AbstractHadoopJob.java     | 64 ++++++++++++++------
 1 file changed, 44 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a5821c83/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index ff19357..087dd04 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -23,18 +23,6 @@ package org.apache.kylin.job.hadoop;
  *
  */
 
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -50,9 +38,22 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +64,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.hadoop.util.StringUtils.formatTime;
+
 @SuppressWarnings("static-access")
 public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
@@ -121,7 +124,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         } else {
             job.waitForCompletion(true);
             retVal = job.isSuccessful() ? 0 : 1;
-            logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures.  Time taken ") + StringUtils.formatTime((System.nanoTime() - start) / 1000000L));
+            logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures.  Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
         }
         return retVal;
     }
@@ -135,7 +138,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             System.exit(5);
         }
     }
-    
+
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
     protected void setJobClasspath(Job job) {
@@ -154,16 +157,37 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             // yarn classpath is comma separated
             kylinHiveDependency = kylinHiveDependency.replace(":", ",");
             Configuration jobConf = job.getConfiguration();
-            final String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
-            if (classpath == null) {
-                jobConf.set(MAP_REDUCE_CLASSPATH, kylinHiveDependency);
-            } else {
-                jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
+            String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
+            if (classpath == null || classpath.length() == 0) {
+                logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
+                classpath = getDefaultMapRedClasspath();
+                logger.info("The default mapred classpath is: " + classpath);
             }
+
+            jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
+
         }
         logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
     }
 
+
+    private String getDefaultMapRedClasspath() {
+
+        String classpath = "";
+        try {
+            CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+            ShellCmdOutput output = new ShellCmdOutput();
+            executor.execute("mapred classpath", output);
+
+            classpath = output.getOutput();
+        } catch (IOException e) {
+            logger.error("Failed to run: 'mapred classpath'.", e);
+        }
+
+        return classpath;
+    }
+
+
     public void addInputDirs(String input, Job job) throws IOException {
         for (String inp : StringSplitter.split(input, ",")) {
             inp = inp.trim();