You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/06/07 07:10:58 UTC

kylin git commit: KYLIN-1752 Add an option to fail cube build job when source table is empty

Repository: kylin
Updated Branches:
  refs/heads/master 9f1029a76 -> d2e96bda0


KYLIN-1752 Add an option to fail cube build job when source table is empty

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d2e96bda
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d2e96bda
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d2e96bda

Branch: refs/heads/master
Commit: d2e96bda0a5848815cf4430746276bc9e896bad7
Parents: 9f1029a
Author: gaodayue <ga...@meituan.com>
Authored: Fri Jun 3 18:17:43 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Jun 7 15:09:29 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 ++++
 .../source/hive/CreateFlatHiveTableStep.java    | 22 +++++++++++++-------
 2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d2e96bda/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 68e3b6c..7664c66 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -363,6 +363,10 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.job.cmd.extra.args");
     }
 
+    public boolean isEmptySegmentAllowed() {
+        return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
+    }
+
     public String getOverrideHiveTableLocation(String table) {
         return getOptional("hive.table.location." + table.toUpperCase());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2e96bda/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index e9b9994..443de99 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -23,9 +23,11 @@ import java.io.InputStream;
 public class CreateFlatHiveTableStep extends AbstractExecutable {
     private final BufferedLogger stepLogger = new BufferedLogger(logger);
 
-    private long readRowCountFromFile(Path file) throws IOException {
-        FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration());
-        InputStream in = fs.open(file);
+    private long readRowCountFromFile() throws IOException {
+        Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
+
+        FileSystem fs = FileSystem.get(rowCountFile.toUri(), HadoopUtil.getCurrentConfiguration());
+        InputStream in = fs.open(rowCountFile);
         try {
             String content = IOUtils.toString(in);
             return Long.valueOf(content.trim()); // strip the '\n' character
@@ -35,9 +37,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
         }
     }
 
-    private int determineNumReducer(KylinConfig config) throws IOException {
-        Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
-        long rowCount = readRowCountFromFile(rowCountFile);
+    private int determineNumReducer(KylinConfig config, long rowCount) throws IOException {
         int mapperInputRows = config.getHadoopJobMapperInputRows();
 
         int numReducers = Math.round(rowCount / ((float) mapperInputRows));
@@ -78,8 +78,14 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
         try {
-
-            int numReducers = determineNumReducer(config);
+            long rowCount = readRowCountFromFile();
+            if (!config.isEmptySegmentAllowed() && rowCount == 0) {
+                stepLogger.log("Detect upstream hive table is empty, " +
+                        "fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
+                return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+            }
+
+            int numReducers = determineNumReducer(config, rowCount);
             createFlatHiveTable(config, numReducers);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());