You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/11 06:06:34 UTC

[38/43] kylin git commit: KYLIN-2004 check whether source data is empty

KYLIN-2004 check whether source data is empty

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/56136ede
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/56136ede
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/56136ede

Branch: refs/heads/v1.5.4-release2
Commit: 56136ede7c8b9abac5ddd7b7785b3f63c59b74db
Parents: 233a699
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 10 17:52:32 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Sep 10 17:59:59 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/hive/HiveMRInput.java   | 37 ++++++++++----------
 1 file changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/56136ede/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 3ea9af5..520d7cc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -281,23 +281,6 @@ public class HiveMRInput implements IMRInput {
             }
         }
 
-        private int determineNumReducer(KylinConfig config) throws IOException {
-            computeRowCount(config.getCliCommandExecutor());
-
-            Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
-            long rowCount = readRowCountFromFile(rowCountFile);
-            int mapperInputRows = config.getHadoopJobMapperInputRows();
-
-            int numReducers = Math.round(rowCount / ((float) mapperInputRows));
-            numReducers = Math.max(1, numReducers);
-
-            stepLogger.log("total input rows = " + rowCount);
-            stepLogger.log("expected input rows per mapper = " + mapperInputRows);
-            stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
-
-            return numReducers;
-        }
-
         private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
             final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
             hiveCmdBuilder.addStatement(getInitStatement());
@@ -327,7 +310,25 @@ public class HiveMRInput implements IMRInput {
             KylinConfig config = getCubeSpecificConfig();
 
             try {
-                int numReducers = determineNumReducer(config);
+
+                computeRowCount(config.getCliCommandExecutor());
+
+                Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
+                long rowCount = readRowCountFromFile(rowCountFile);
+                if (!config.isEmptySegmentAllowed() && rowCount == 0) {
+                    stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
+                    return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+                }
+
+                int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+                int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+                numReducers = Math.max(1, numReducers);
+
+                stepLogger.log("total input rows = " + rowCount);
+                stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+                stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
+
                 redistributeTable(config, numReducers);
                 return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());