You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/19 03:02:32 UTC
[45/50] [abbrv] kylin git commit: KYLIN-1839,
support kylin lib in HDFS
KYLIN-1839, support kylin lib in HDFS
Signed-off-by: terry <hz...@corp.netease.com>
Signed-off-by: Yang Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4473d710
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4473d710
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4473d710
Branch: refs/heads/master-hbase1.x
Commit: 4473d71011cc0e652eccf4f80269828caa5d3c73
Parents: d28835f
Author: terry <hz...@corp.netease.com>
Authored: Tue Oct 11 17:33:45 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Oct 19 07:30:13 2016 +0800
----------------------------------------------------------------------
.../engine/mr/common/AbstractHadoopJob.java | 29 +++++++++++---------
1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4473d710/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index a138eec..bbb1711 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -252,14 +253,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
// for KylinJobMRLibDir
String mrLibDir = kylinConf.getKylinJobMRLibDir();
if (!StringUtils.isBlank(mrLibDir)) {
- File dirFileMRLIB = new File(mrLibDir);
- if (dirFileMRLIB.exists()) {
- if (kylinDependency.length() > 0)
- kylinDependency.append(",");
+ if(kylinDependency.length() > 0) {
+ kylinDependency.append(",");
+ }
kylinDependency.append(mrLibDir);
- } else {
- logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!");
- }
}
setJobTmpJarsAndFiles(job, kylinDependency.toString());
@@ -300,21 +297,27 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
try {
Configuration jobConf = job.getConfiguration();
FileSystem fs = FileSystem.getLocal(jobConf);
+ FileSystem hdfs = FileSystem.get(jobConf);
StringBuilder jarList = new StringBuilder();
StringBuilder fileList = new StringBuilder();
for (String fileName : fNameList) {
Path p = new Path(fileName);
- if (fs.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, fileName);
+ FileSystem current = (fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs);
+ if(!current.exists(p)) {
+ logger.warn("The directory '" + fileName + "for kylin dependency does not exist!!!");
+ continue;
+ }
+ if (current.getFileStatus(p).isDirectory()) {
+ appendTmpDir(job, current, fileName);
continue;
}
StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
if (list.length() > 0)
list.append(",");
- list.append(fs.getFileStatus(p).getPath().toString());
+ list.append(current.getFileStatus(p).getPath());
}
appendTmpFiles(fileList.toString(), jobConf);
@@ -324,13 +327,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- private void appendTmpDir(Job job, String tmpDir) {
+ private void appendTmpDir(Job job, FileSystem fs, String tmpDir) {
if (StringUtils.isBlank(tmpDir))
return;
try {
Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.getLocal(jobConf);
FileStatus[] fList = fs.listStatus(new Path(tmpDir));
StringBuilder jarList = new StringBuilder();
@@ -339,7 +341,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
for (FileStatus file : fList) {
Path p = file.getPath();
if (fs.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, p.toString());
+ appendTmpDir(job, fs, p.toString());
continue;
}
@@ -622,3 +624,4 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
+