You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/19 00:19:48 UTC
[1/2] kylin git commit: KYLIN-1839, support kylin lib in HDFS
Repository: kylin
Updated Branches:
refs/heads/master d28835f69 -> aa1550c9e
KYLIN-1839, support kylin lib in HDFS
Signed-off-by: terry <hz...@corp.netease.com>
Signed-off-by: Yang Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4473d710
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4473d710
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4473d710
Branch: refs/heads/master
Commit: 4473d71011cc0e652eccf4f80269828caa5d3c73
Parents: d28835f
Author: terry <hz...@corp.netease.com>
Authored: Tue Oct 11 17:33:45 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Oct 19 07:30:13 2016 +0800
----------------------------------------------------------------------
.../engine/mr/common/AbstractHadoopJob.java | 29 +++++++++++---------
1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4473d710/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index a138eec..bbb1711 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -252,14 +253,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
// for KylinJobMRLibDir
String mrLibDir = kylinConf.getKylinJobMRLibDir();
if (!StringUtils.isBlank(mrLibDir)) {
- File dirFileMRLIB = new File(mrLibDir);
- if (dirFileMRLIB.exists()) {
- if (kylinDependency.length() > 0)
- kylinDependency.append(",");
+ if(kylinDependency.length() > 0) {
+ kylinDependency.append(",");
+ }
kylinDependency.append(mrLibDir);
- } else {
- logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!");
- }
}
setJobTmpJarsAndFiles(job, kylinDependency.toString());
@@ -300,21 +297,27 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
try {
Configuration jobConf = job.getConfiguration();
FileSystem fs = FileSystem.getLocal(jobConf);
+ FileSystem hdfs = FileSystem.get(jobConf);
StringBuilder jarList = new StringBuilder();
StringBuilder fileList = new StringBuilder();
for (String fileName : fNameList) {
Path p = new Path(fileName);
- if (fs.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, fileName);
+ FileSystem current = (fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs);
+ if(!current.exists(p)) {
+ logger.warn("The directory '" + fileName + "for kylin dependency does not exist!!!");
+ continue;
+ }
+ if (current.getFileStatus(p).isDirectory()) {
+ appendTmpDir(job, current, fileName);
continue;
}
StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
if (list.length() > 0)
list.append(",");
- list.append(fs.getFileStatus(p).getPath().toString());
+ list.append(current.getFileStatus(p).getPath());
}
appendTmpFiles(fileList.toString(), jobConf);
@@ -324,13 +327,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- private void appendTmpDir(Job job, String tmpDir) {
+ private void appendTmpDir(Job job, FileSystem fs, String tmpDir) {
if (StringUtils.isBlank(tmpDir))
return;
try {
Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.getLocal(jobConf);
FileStatus[] fList = fs.listStatus(new Path(tmpDir));
StringBuilder jarList = new StringBuilder();
@@ -339,7 +341,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
for (FileStatus file : fList) {
Path p = file.getPath();
if (fs.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, p.toString());
+ appendTmpDir(job, fs, p.toString());
continue;
}
@@ -622,3 +624,4 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
+
[2/2] kylin git commit: KYLIN-1839 code review
Posted by li...@apache.org.
KYLIN-1839 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa1550c9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa1550c9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa1550c9
Branch: refs/heads/master
Commit: aa1550c9ecfb1c047c107fa51187a51e958ab189
Parents: 4473d71
Author: Yang Li <li...@apache.org>
Authored: Wed Oct 19 08:19:35 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Oct 19 08:19:35 2016 +0800
----------------------------------------------------------------------
.../engine/mr/common/AbstractHadoopJob.java | 44 +++++++++++---------
1 file changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa1550c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index bbb1711..f70e3bb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -80,7 +79,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME);
protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME);
protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID);
-// @Deprecated
+ // @Deprecated
protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME);
protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID);
protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT);
@@ -253,10 +252,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
// for KylinJobMRLibDir
String mrLibDir = kylinConf.getKylinJobMRLibDir();
if (!StringUtils.isBlank(mrLibDir)) {
- if(kylinDependency.length() > 0) {
- kylinDependency.append(",");
- }
- kylinDependency.append(mrLibDir);
+ if (kylinDependency.length() > 0) {
+ kylinDependency.append(",");
+ }
+ kylinDependency.append(mrLibDir);
}
setJobTmpJarsAndFiles(job, kylinDependency.toString());
@@ -296,7 +295,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
try {
Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.getLocal(jobConf);
+ FileSystem localfs = FileSystem.getLocal(jobConf);
FileSystem hdfs = FileSystem.get(jobConf);
StringBuilder jarList = new StringBuilder();
@@ -304,20 +303,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
for (String fileName : fNameList) {
Path p = new Path(fileName);
- FileSystem current = (fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs);
- if(!current.exists(p)) {
- logger.warn("The directory '" + fileName + "for kylin dependency does not exist!!!");
+ if (p.isAbsolute() == false) {
+ logger.warn("The directory of kylin dependency '" + fileName + "' is not absolute, skip");
+ continue;
+ }
+ FileSystem fs;
+ if (hdfs.exists(p)) {
+ fs = hdfs;
+ } else if (localfs.exists(p)) {
+ fs = localfs;
+ } else {
+ logger.warn("The directory of kylin dependency '" + fileName + "' does not exist, skip");
continue;
}
- if (current.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, current, fileName);
+
+ if (fs.getFileStatus(p).isDirectory()) {
+ appendTmpDir(job, fs, p);
continue;
}
StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
if (list.length() > 0)
list.append(",");
- list.append(current.getFileStatus(p).getPath());
+ list.append(fs.getFileStatus(p).getPath());
}
appendTmpFiles(fileList.toString(), jobConf);
@@ -327,13 +335,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- private void appendTmpDir(Job job, FileSystem fs, String tmpDir) {
- if (StringUtils.isBlank(tmpDir))
- return;
-
+ private void appendTmpDir(Job job, FileSystem fs, Path tmpDir) {
try {
Configuration jobConf = job.getConfiguration();
- FileStatus[] fList = fs.listStatus(new Path(tmpDir));
+ FileStatus[] fList = fs.listStatus(tmpDir);
StringBuilder jarList = new StringBuilder();
StringBuilder fileList = new StringBuilder();
@@ -341,7 +346,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
for (FileStatus file : fList) {
Path p = file.getPath();
if (fs.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, fs, p.toString());
+ appendTmpDir(job, fs, p);
continue;
}
@@ -624,4 +629,3 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
-