You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/14 08:50:05 UTC
[18/50] [abbrv] kylin git commit: KYLIN-2004 check whether source
data is empty
KYLIN-2004 check whether source data is empty
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/56136ede
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/56136ede
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/56136ede
Branch: refs/heads/KYLIN-1726
Commit: 56136ede7c8b9abac5ddd7b7785b3f63c59b74db
Parents: 233a699
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 10 17:52:32 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Sep 10 17:59:59 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/hive/HiveMRInput.java | 37 ++++++++++----------
1 file changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/56136ede/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 3ea9af5..520d7cc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -281,23 +281,6 @@ public class HiveMRInput implements IMRInput {
}
}
- private int determineNumReducer(KylinConfig config) throws IOException {
- computeRowCount(config.getCliCommandExecutor());
-
- Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
- long rowCount = readRowCountFromFile(rowCountFile);
- int mapperInputRows = config.getHadoopJobMapperInputRows();
-
- int numReducers = Math.round(rowCount / ((float) mapperInputRows));
- numReducers = Math.max(1, numReducers);
-
- stepLogger.log("total input rows = " + rowCount);
- stepLogger.log("expected input rows per mapper = " + mapperInputRows);
- stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
-
- return numReducers;
- }
-
private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement(getInitStatement());
@@ -327,7 +310,25 @@ public class HiveMRInput implements IMRInput {
KylinConfig config = getCubeSpecificConfig();
try {
- int numReducers = determineNumReducer(config);
+
+ computeRowCount(config.getCliCommandExecutor());
+
+ Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
+ long rowCount = readRowCountFromFile(rowCountFile);
+ if (!config.isEmptySegmentAllowed() && rowCount == 0) {
+ stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ }
+
+ int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+ int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+ numReducers = Math.max(1, numReducers);
+
+ stepLogger.log("total input rows = " + rowCount);
+ stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+ stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
+
redistributeTable(config, numReducers);
return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());