You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/19 03:02:33 UTC
[46/50] [abbrv] kylin git commit: KYLIN-1839 code review
KYLIN-1839 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa1550c9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa1550c9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa1550c9
Branch: refs/heads/master-hbase1.x
Commit: aa1550c9ecfb1c047c107fa51187a51e958ab189
Parents: 4473d71
Author: Yang Li <li...@apache.org>
Authored: Wed Oct 19 08:19:35 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Oct 19 08:19:35 2016 +0800
----------------------------------------------------------------------
.../engine/mr/common/AbstractHadoopJob.java | 44 +++++++++++---------
1 file changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa1550c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index bbb1711..f70e3bb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -80,7 +79,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME);
protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME);
protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID);
-// @Deprecated
+ // @Deprecated
protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME);
protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID);
protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT);
@@ -253,10 +252,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
// for KylinJobMRLibDir
String mrLibDir = kylinConf.getKylinJobMRLibDir();
if (!StringUtils.isBlank(mrLibDir)) {
- if(kylinDependency.length() > 0) {
- kylinDependency.append(",");
- }
- kylinDependency.append(mrLibDir);
+ if (kylinDependency.length() > 0) {
+ kylinDependency.append(",");
+ }
+ kylinDependency.append(mrLibDir);
}
setJobTmpJarsAndFiles(job, kylinDependency.toString());
@@ -296,7 +295,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
try {
Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.getLocal(jobConf);
+ FileSystem localfs = FileSystem.getLocal(jobConf);
FileSystem hdfs = FileSystem.get(jobConf);
StringBuilder jarList = new StringBuilder();
@@ -304,20 +303,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
for (String fileName : fNameList) {
Path p = new Path(fileName);
- FileSystem current = (fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs);
- if(!current.exists(p)) {
- logger.warn("The directory '" + fileName + "for kylin dependency does not exist!!!");
+ if (p.isAbsolute() == false) {
+ logger.warn("The directory of kylin dependency '" + fileName + "' is not absolute, skip");
+ continue;
+ }
+ FileSystem fs;
+ if (hdfs.exists(p)) {
+ fs = hdfs;
+ } else if (localfs.exists(p)) {
+ fs = localfs;
+ } else {
+ logger.warn("The directory of kylin dependency '" + fileName + "' does not exist, skip");
continue;
}
- if (current.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, current, fileName);
+
+ if (fs.getFileStatus(p).isDirectory()) {
+ appendTmpDir(job, fs, p);
continue;
}
StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
if (list.length() > 0)
list.append(",");
- list.append(current.getFileStatus(p).getPath());
+ list.append(fs.getFileStatus(p).getPath());
}
appendTmpFiles(fileList.toString(), jobConf);
@@ -327,13 +335,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- private void appendTmpDir(Job job, FileSystem fs, String tmpDir) {
- if (StringUtils.isBlank(tmpDir))
- return;
-
+ private void appendTmpDir(Job job, FileSystem fs, Path tmpDir) {
try {
Configuration jobConf = job.getConfiguration();
- FileStatus[] fList = fs.listStatus(new Path(tmpDir));
+ FileStatus[] fList = fs.listStatus(tmpDir);
StringBuilder jarList = new StringBuilder();
StringBuilder fileList = new StringBuilder();
@@ -341,7 +346,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
for (FileStatus file : fList) {
Path p = file.getPath();
if (fs.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, fs, p.toString());
+ appendTmpDir(job, fs, p);
continue;
}
@@ -624,4 +629,3 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
-