You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/01 10:45:28 UTC

[kylin] branch master updated: KYLIN-3727 Check if where is no directories, then finish job successfully.

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 68bffc1  KYLIN-3727 Check if where is no directories, then finish job successfully.
68bffc1 is described below

commit 68bffc14a8770fa7e1cf5dbad2c027eb027f7be2
Author: alexandr.sidorchuk <al...@apm-consult.com>
AuthorDate: Wed Feb 27 22:21:26 2019 +0300

    KYLIN-3727 Check if where is no directories, then finish job successfully.
---
 .../kylin/storage/hbase/steps/BulkLoadJob.java     | 29 +++++++++++++++++++---
 1 file changed, 25 insertions(+), 4 deletions(-)

diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index f0b77aa..998c1a1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -22,9 +22,13 @@ import java.io.IOException;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -74,10 +78,27 @@ public class BulkLoadJob extends AbstractHadoopJob {
         newArgs[0] = input;
         newArgs[1] = tableName;
 
-        logger.debug("Start to run LoadIncrementalHFiles");
-        int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
-        logger.debug("End to run LoadIncrementalHFiles");
-        return ret;
+        int count = 0;
+        Path inputPath = new Path(input);
+        FileSystem fs = HadoopUtil.getFileSystem(inputPath);
+        FileStatus[] fileStatuses = fs.listStatus(inputPath);
+        for(FileStatus fileStatus: fileStatuses) {
+            if(fileStatus.isDirectory()) {
+                count++;
+                break;
+            }
+        }
+
+        int ret = 0;
+        if (count > 0) {
+            logger.debug("Start to run LoadIncrementalHFiles");
+            ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
+            logger.debug("End to run LoadIncrementalHFiles");
+            return ret;
+        } else {
+            logger.debug("Nothing to load, cube is empty");
+            return ret;
+        }
     }
 
     public static void main(String[] args) throws Exception {