You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/01 10:47:40 UTC
[kylin] branch 2.6.x updated: KYLIN-3727 Check if where is no
directories, then finish job successfully.
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new f6c618d KYLIN-3727 Check if where is no directories, then finish job successfully.
f6c618d is described below
commit f6c618d2e8435f9f1d4195563958569a3780f13d
Author: alexandr.sidorchuk <al...@apm-consult.com>
AuthorDate: Wed Feb 27 22:21:26 2019 +0300
KYLIN-3727 Check if where is no directories, then finish job successfully.
---
.../kylin/storage/hbase/steps/BulkLoadJob.java | 29 +++++++++++++++++++---
1 file changed, 25 insertions(+), 4 deletions(-)
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index f0b77aa..998c1a1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -22,9 +22,13 @@ import java.io.IOException;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -74,10 +78,27 @@ public class BulkLoadJob extends AbstractHadoopJob {
newArgs[0] = input;
newArgs[1] = tableName;
- logger.debug("Start to run LoadIncrementalHFiles");
- int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
- logger.debug("End to run LoadIncrementalHFiles");
- return ret;
+ int count = 0;
+ Path inputPath = new Path(input);
+ FileSystem fs = HadoopUtil.getFileSystem(inputPath);
+ FileStatus[] fileStatuses = fs.listStatus(inputPath);
+ for(FileStatus fileStatus: fileStatuses) {
+ if(fileStatus.isDirectory()) {
+ count++;
+ break;
+ }
+ }
+
+ int ret = 0;
+ if (count > 0) {
+ logger.debug("Start to run LoadIncrementalHFiles");
+ ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
+ logger.debug("End to run LoadIncrementalHFiles");
+ return ret;
+ } else {
+ logger.debug("Nothing to load, cube is empty");
+ return ret;
+ }
}
public static void main(String[] args) throws Exception {