You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/12 11:24:49 UTC

[13/25] kylin git commit: KYLIN-2004 Make the creating intermediate hive table steps configurable

KYLIN-2004 Make the creating intermediate hive table steps configurable

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/233a699f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/233a699f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/233a699f

Branch: refs/heads/1.5.x-CDH5.7
Commit: 233a699f3b6f7a6c64ecf43fb80108b56db61f5f
Parents: d7cbf67
Author: shaofengshi <sh...@apache.org>
Authored: Fri Sep 9 19:04:10 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Sep 10 17:59:46 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +-
 .../org/apache/kylin/job/JoinedFlatTable.java   |  48 ++++--
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../kylin/job/execution/AbstractExecutable.java |   2 +-
 .../apache/kylin/job/JoinedFlatTableTest.java   |   2 +-
 .../kylin/metadata/model/DataModelDesc.java     |   8 +-
 ...t_kylin_cube_without_slr_left_join_desc.json |   3 +-
 .../kylin/rest/controller/CubeController.java   |   2 +-
 .../source/hive/CreateFlatHiveTableStep.java    |  32 +++-
 .../apache/kylin/source/hive/HiveMRInput.java   | 169 ++++++++++++++++++-
 10 files changed, 234 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2ac9d48..de9051c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -805,7 +805,7 @@ abstract public class KylinConfigBase implements Serializable {
         setProperty("kylin.dict.append.cache.size", String.valueOf(cacheSize));
     }
 
-    public boolean getTableJoinTypeCheck() {
-        return Boolean.valueOf(this.getOptional("kylin.table.join.strong.check", "true"));
+    public String getCreateFlatHiveTableMethod() {
+        return getOptional("kylin.hive.create.flat.table.method", "1");
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index b39265d..699d084 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -107,14 +107,14 @@ public class JoinedFlatTable {
         return ddl.toString();
     }
 
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) {
+    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig, boolean redistribute) {
         StringBuilder sql = new StringBuilder();
         sql.append(generateHiveSetStatements(engineConfig));
-        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
+        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc, redistribute) + ";").append("\n");
         return sql.toString();
     }
 
-    public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
+    public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean redistribute) {
         StringBuilder sql = new StringBuilder();
         sql.append("SELECT" + "\n");
         String tableAlias;
@@ -129,7 +129,15 @@ public class JoinedFlatTable {
         }
         appendJoinStatement(flatDesc, sql, tableAliasMap);
         appendWhereStatement(flatDesc, sql, tableAliasMap);
-        appendDistributeStatement(flatDesc, sql, tableAliasMap);
+        if (redistribute == true) {
+            String redistributeCol = null;
+            TblColRef distDcol = flatDesc.getDistributedBy();
+            if (distDcol != null) {
+                String tblAlias = tableAliasMap.get(distDcol.getTable());
+                redistributeCol = tblAlias + "." + distDcol.getName();
+            }
+            appendDistributeStatement(sql, redistributeCol);
+        }
         return sql.toString();
     }
 
@@ -228,14 +236,11 @@ public class JoinedFlatTable {
         return result;
     }
 
-    private static void appendDistributeStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        TblColRef distDcol = flatDesc.getDistributedBy();
-
-        if (distDcol != null) {
-            String tblAlias = tableAliasMap.get(distDcol.getTable());
-            sql.append(" DISTRIBUTE BY ").append(tblAlias).append(".").append(distDcol.getName());
+    private static void appendDistributeStatement(StringBuilder sql, String redistributeCol) {
+        if (redistributeCol != null) {
+            sql.append(" DISTRIBUTE BY ").append(redistributeCol).append(";\n");
         } else {
-            sql.append(" DISTRIBUTE BY RAND()");
+            sql.append(" DISTRIBUTE BY RAND()").append(";\n");
         }
     }
 
@@ -280,4 +285,25 @@ public class JoinedFlatTable {
         return hiveDataType.toLowerCase();
     }
 
+    public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) {
+        StringBuilder sql = new StringBuilder();
+        sql.append("set hive.exec.compress.output=false;\n");
+        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n");
+        return sql.toString();
+    }
+
+    public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+        final String tableName = intermediateTableDesc.getTableName();
+        StringBuilder sql = new StringBuilder();
+        sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName);
+
+        String redistributeCol = null;
+        TblColRef distDcol = intermediateTableDesc.getDistributedBy();
+        if (distDcol != null) {
+            redistributeCol = colName(distDcol.getCanonicalName());
+        }
+        appendDistributeStatement(sql, redistributeCol);
+        return sql.toString();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 6084e7b..893c034 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -56,5 +56,6 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
     public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
     public static final String STEP_NAME_UPDATE_II_INFO = "Update Inverted Index Info";
+    public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Flat Hive Table";
     public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Env: ${env_name}</li>" + "<li>Project: ${project_name}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 4dedad1..09f9b54 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -49,7 +49,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     protected static final String START_TIME = "startTime";
     protected static final String END_TIME = "endTime";
 
-    private static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
     protected int retry = 0;
 
     private String name;

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index 0faf22a..1fe47f8 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -77,7 +77,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testGenerateInsertSql() throws IOException {
-        String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+        String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()), true);
         System.out.println(sqls);
 
         int length = sqls.length();

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 7f5edfe..d04830b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -314,13 +314,7 @@ public class DataModelDesc extends RootPersistentEntity {
             }
             for (int i = 0; i < fkCols.length; i++) {
                 if (!fkCols[i].getDatatype().equals(pkCols[i].getDatatype())) {
-                    final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-                    final String msg = "Primary key " + lookup.getTable() + "." + pkCols[i].getName() + "." + pkCols[i].getDatatype() + " are not consistent with Foreign key " + this.getFactTable() + "." + fkCols[i].getName() + "." + fkCols[i].getDatatype();
-                    if (kylinConfig.getTableJoinTypeCheck() == true) {
-                        throw new IllegalStateException(msg);
-                    } else {
-                        logger.warn(msg);
-                    }
+                    logger.warn("Primary key " + lookup.getTable() + "." + pkCols[i].getName() + "." + pkCols[i].getDatatype() + " are not consistent with Foreign key " + this.getFactTable() + "." + fkCols[i].getName() + "." + fkCols[i].getDatatype());
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index ca1b35c..0470dc6 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -289,7 +289,8 @@
   "engine_type" : 2,
   "storage_type" : 2,
   "override_kylin_properties": {
-    "kylin.job.cubing.inmem.sampling.hll.precision": "16"
+    "kylin.job.cubing.inmem.sampling.hll.precision": "16",
+    "kylin.hive.create.flat.table.method": "2"
   },
   "partition_date_start": 0
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 7081d02..5397df7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -152,7 +152,7 @@ public class CubeController extends BasicController {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
         CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.READY);
         IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
-        String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc);
+        String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc, false);
 
         GeneralResponse repsonse = new GeneralResponse();
         repsonse.setProperty("sql", sql);

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index cd32f9c..bcb9a38 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -76,8 +76,11 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
     private void createFlatHiveTable(KylinConfig config, int numReducers) throws IOException {
         final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement(getInitStatement());
-        hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
-        hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); //disable merge
+        boolean useRedistribute = getUseRedistribute();
+        if (useRedistribute == true) {
+            hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
+            hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); //disable merge
+        }
         hiveCmdBuilder.addStatement(getCreateTableStatement());
         final String cmd = hiveCmdBuilder.toString();
 
@@ -101,13 +104,20 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
         try {
-            long rowCount = readRowCountFromFile();
-            if (!config.isEmptySegmentAllowed() && rowCount == 0) {
-                stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
-                return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+
+            boolean useRedistribute = getUseRedistribute();
+
+            int numReducers = 0;
+            if (useRedistribute == true) {
+                long rowCount = readRowCountFromFile();
+                if (!config.isEmptySegmentAllowed() && rowCount == 0) {
+                    stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
+                    return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+                }
+
+                numReducers = determineNumReducer(config, rowCount);
             }
 
-            int numReducers = determineNumReducer(config, rowCount);
             createFlatHiveTable(config, numReducers);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
 
@@ -125,6 +135,14 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
         return getParam("HiveInit");
     }
 
+    public void setUseRedistribute(boolean useRedistribute) {
+        setParam("useRedistribute", String.valueOf(useRedistribute));
+    }
+
+    public boolean getUseRedistribute() {
+        return Boolean.valueOf(getParam("useRedistribute"));
+    }
+
     public void setCreateTableStatement(String sql) {
         setParam("HiveRedistributeData", sql);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index e3d7879..3ea9af5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -19,8 +19,10 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Set;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,6 +30,11 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -110,16 +117,46 @@ public class HiveMRInput implements IMRInput {
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
 
-            final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
+            final KylinConfig kylinConfig = CubeManager.getInstance(conf.getConfig()).getCube(cubeName).getConfig();
+
+            String createFlatTableMethod = kylinConfig.getCreateFlatHiveTableMethod();
+            if ("1".equals(createFlatTableMethod)) {
+                // create flat table first, then count and redistribute
+                jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, false, ""));
+                jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName));
+            } else if ("2".equals(createFlatTableMethod)) {
+                // count from source table first, and then redistribute, suitable for partitioned table
+                final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
+                jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir));
+                jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, true, rowCountOutputDir));
+            } else {
+                throw new IllegalArgumentException("Unknown value for kylin.hive.create.flat.table.method: " + createFlatTableMethod);
+            }
 
-            jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir));
-            jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, rowCountOutputDir));
             AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
             if (task != null) {
                 jobFlow.addTask(task);
             }
         }
 
+        public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
+            StringBuilder hiveInitBuf = new StringBuilder();
+            hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
+            hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
+
+            String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
+
+            RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
+            step.setInitStatement(hiveInitBuf.toString());
+            step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir));
+            step.setRowCountOutputDir(rowCountOutputDir);
+            step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc));
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+            return step;
+        }
+
+
         public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) {
             final ShellExecutable step = new ShellExecutable();
 
@@ -174,17 +211,17 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
-        public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, String rowCountOutputDir) {
+        public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, boolean redistribute, String rowCountOutputDir) {
             StringBuilder hiveInitBuf = new StringBuilder();
             hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
 
             final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
             final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
             final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
-            String insertDataHqls;
-            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf, redistribute);
 
             CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            step.setUseRedistribute(redistribute);
             step.setInitStatement(hiveInitBuf.toString());
             step.setRowCountOutputDir(rowCountOutputDir);
             step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql + insertDataHqls);
@@ -213,6 +250,126 @@ public class HiveMRInput implements IMRInput {
         }
     }
 
+    public static class RedistributeFlatHiveTableStep extends AbstractExecutable {
+        private final BufferedLogger stepLogger = new BufferedLogger(logger);
+
+        private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException {
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            hiveCmdBuilder.addStatement(getInitStatement());
+            hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
+            hiveCmdBuilder.addStatement(getSelectRowCountStatement());
+            final String cmd = hiveCmdBuilder.build();
+
+            stepLogger.log("Compute row count of flat hive table, cmd: ");
+            stepLogger.log(cmd);
+
+            Pair<Integer, String> response = cmdExecutor.execute(cmd, stepLogger);
+            if (response.getFirst() != 0) {
+                throw new RuntimeException("Failed to compute row count of flat hive table");
+            }
+        }
+
+        private long readRowCountFromFile(Path file) throws IOException {
+            FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration());
+            InputStream in = fs.open(file);
+            try {
+                String content = IOUtils.toString(in);
+                return Long.valueOf(content.trim()); // strip the '\n' character
+
+            } finally {
+                IOUtils.closeQuietly(in);
+            }
+        }
+
+        private int determineNumReducer(KylinConfig config) throws IOException {
+            computeRowCount(config.getCliCommandExecutor());
+
+            Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
+            long rowCount = readRowCountFromFile(rowCountFile);
+            int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+            int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+            numReducers = Math.max(1, numReducers);
+
+            stepLogger.log("total input rows = " + rowCount);
+            stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+            stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
+
+            return numReducers;
+        }
+
+        private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            hiveCmdBuilder.addStatement(getInitStatement());
+            hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
+            hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
+            hiveCmdBuilder.addStatement(getRedistributeDataStatement());
+            final String cmd = hiveCmdBuilder.toString();
+
+            stepLogger.log("Redistribute table, cmd: ");
+            stepLogger.log(cmd);
+
+            Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+            if (response.getFirst() != 0) {
+                throw new RuntimeException("Failed to redistribute flat hive table");
+            }
+        }
+
+        private KylinConfig getCubeSpecificConfig() {
+            String cubeName = CubingExecutableUtil.getCubeName(getParams());
+            CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = manager.getCube(cubeName);
+            return cube.getConfig();
+        }
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+            KylinConfig config = getCubeSpecificConfig();
+
+            try {
+                int numReducers = determineNumReducer(config);
+                redistributeTable(config, numReducers);
+                return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+
+            } catch (Exception e) {
+                logger.error("job:" + getId() + " execute finished with exception", e);
+                return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+            }
+        }
+
+        public void setInitStatement(String sql) {
+            setParam("HiveInit", sql);
+        }
+
+        public String getInitStatement() {
+            return getParam("HiveInit");
+        }
+
+        public void setSelectRowCountStatement(String sql) {
+            setParam("HiveSelectRowCount", sql);
+        }
+
+        public String getSelectRowCountStatement() {
+            return getParam("HiveSelectRowCount");
+        }
+
+        public void setRedistributeDataStatement(String sql) {
+            setParam("HiveRedistributeData", sql);
+        }
+
+        public String getRedistributeDataStatement() {
+            return getParam("HiveRedistributeData");
+        }
+
+        public void setRowCountOutputDir(String rowCountOutputDir) {
+            setParam("rowCountOutputDir", rowCountOutputDir);
+        }
+
+        public String getRowCountOutputDir() {
+            return getParam("rowCountOutputDir");
+        }
+    }
+
     public static class GarbageCollectionStep extends AbstractExecutable {
         private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);