You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/12 11:24:49 UTC
[13/25] kylin git commit: KYLIN-2004 Make the creating intermediate
hive table steps configurable
KYLIN-2004 Make the creating intermediate hive table steps configurable
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/233a699f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/233a699f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/233a699f
Branch: refs/heads/1.5.x-CDH5.7
Commit: 233a699f3b6f7a6c64ecf43fb80108b56db61f5f
Parents: d7cbf67
Author: shaofengshi <sh...@apache.org>
Authored: Fri Sep 9 19:04:10 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Sep 10 17:59:46 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 +-
.../org/apache/kylin/job/JoinedFlatTable.java | 48 ++++--
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../kylin/job/execution/AbstractExecutable.java | 2 +-
.../apache/kylin/job/JoinedFlatTableTest.java | 2 +-
.../kylin/metadata/model/DataModelDesc.java | 8 +-
...t_kylin_cube_without_slr_left_join_desc.json | 3 +-
.../kylin/rest/controller/CubeController.java | 2 +-
.../source/hive/CreateFlatHiveTableStep.java | 32 +++-
.../apache/kylin/source/hive/HiveMRInput.java | 169 ++++++++++++++++++-
10 files changed, 234 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2ac9d48..de9051c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -805,7 +805,7 @@ abstract public class KylinConfigBase implements Serializable {
setProperty("kylin.dict.append.cache.size", String.valueOf(cacheSize));
}
- public boolean getTableJoinTypeCheck() {
- return Boolean.valueOf(this.getOptional("kylin.table.join.strong.check", "true"));
+ public String getCreateFlatHiveTableMethod() {
+ return getOptional("kylin.hive.create.flat.table.method", "1");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index b39265d..699d084 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -107,14 +107,14 @@ public class JoinedFlatTable {
return ddl.toString();
}
- public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) {
+ public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig, boolean redistribute) {
StringBuilder sql = new StringBuilder();
sql.append(generateHiveSetStatements(engineConfig));
- sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
+ sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc, redistribute) + ";").append("\n");
return sql.toString();
}
- public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
+ public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean redistribute) {
StringBuilder sql = new StringBuilder();
sql.append("SELECT" + "\n");
String tableAlias;
@@ -129,7 +129,15 @@ public class JoinedFlatTable {
}
appendJoinStatement(flatDesc, sql, tableAliasMap);
appendWhereStatement(flatDesc, sql, tableAliasMap);
- appendDistributeStatement(flatDesc, sql, tableAliasMap);
+ if (redistribute == true) {
+ String redistributeCol = null;
+ TblColRef distDcol = flatDesc.getDistributedBy();
+ if (distDcol != null) {
+ String tblAlias = tableAliasMap.get(distDcol.getTable());
+ redistributeCol = tblAlias + "." + distDcol.getName();
+ }
+ appendDistributeStatement(sql, redistributeCol);
+ }
return sql.toString();
}
@@ -228,14 +236,11 @@ public class JoinedFlatTable {
return result;
}
- private static void appendDistributeStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
- TblColRef distDcol = flatDesc.getDistributedBy();
-
- if (distDcol != null) {
- String tblAlias = tableAliasMap.get(distDcol.getTable());
- sql.append(" DISTRIBUTE BY ").append(tblAlias).append(".").append(distDcol.getName());
+ private static void appendDistributeStatement(StringBuilder sql, String redistributeCol) {
+ if (redistributeCol != null) {
+ sql.append(" DISTRIBUTE BY ").append(redistributeCol).append(";\n");
} else {
- sql.append(" DISTRIBUTE BY RAND()");
+ sql.append(" DISTRIBUTE BY RAND()").append(";\n");
}
}
@@ -280,4 +285,25 @@ public class JoinedFlatTable {
return hiveDataType.toLowerCase();
}
+ public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("set hive.exec.compress.output=false;\n");
+ sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n");
+ return sql.toString();
+ }
+
+ public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+ final String tableName = intermediateTableDesc.getTableName();
+ StringBuilder sql = new StringBuilder();
+ sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName);
+
+ String redistributeCol = null;
+ TblColRef distDcol = intermediateTableDesc.getDistributedBy();
+ if (distDcol != null) {
+ redistributeCol = colName(distDcol.getCanonicalName());
+ }
+ appendDistributeStatement(sql, redistributeCol);
+ return sql.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 6084e7b..893c034 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -56,5 +56,6 @@ public final class ExecutableConstants {
public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
public static final String STEP_NAME_UPDATE_II_INFO = "Update Inverted Index Info";
+ public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Flat Hive Table";
public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Env: ${env_name}</li>" + "<li>Project: ${project_name}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 4dedad1..09f9b54 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -49,7 +49,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected static final String START_TIME = "startTime";
protected static final String END_TIME = "endTime";
- private static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
protected int retry = 0;
private String name;
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index 0faf22a..1fe47f8 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -77,7 +77,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
@Test
public void testGenerateInsertSql() throws IOException {
- String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+ String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()), true);
System.out.println(sqls);
int length = sqls.length();
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 7f5edfe..d04830b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -314,13 +314,7 @@ public class DataModelDesc extends RootPersistentEntity {
}
for (int i = 0; i < fkCols.length; i++) {
if (!fkCols[i].getDatatype().equals(pkCols[i].getDatatype())) {
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final String msg = "Primary key " + lookup.getTable() + "." + pkCols[i].getName() + "." + pkCols[i].getDatatype() + " are not consistent with Foreign key " + this.getFactTable() + "." + fkCols[i].getName() + "." + fkCols[i].getDatatype();
- if (kylinConfig.getTableJoinTypeCheck() == true) {
- throw new IllegalStateException(msg);
- } else {
- logger.warn(msg);
- }
+ logger.warn("Primary key " + lookup.getTable() + "." + pkCols[i].getName() + "." + pkCols[i].getDatatype() + " are not consistent with Foreign key " + this.getFactTable() + "." + fkCols[i].getName() + "." + fkCols[i].getDatatype());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index ca1b35c..0470dc6 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -289,7 +289,8 @@
"engine_type" : 2,
"storage_type" : 2,
"override_kylin_properties": {
- "kylin.job.cubing.inmem.sampling.hll.precision": "16"
+ "kylin.job.cubing.inmem.sampling.hll.precision": "16",
+ "kylin.hive.create.flat.table.method": "2"
},
"partition_date_start": 0
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 7081d02..5397df7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -152,7 +152,7 @@ public class CubeController extends BasicController {
CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.READY);
IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
- String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc);
+ String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc, false);
GeneralResponse repsonse = new GeneralResponse();
repsonse.setProperty("sql", sql);
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index cd32f9c..bcb9a38 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -76,8 +76,11 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
private void createFlatHiveTable(KylinConfig config, int numReducers) throws IOException {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement(getInitStatement());
- hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
- hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); //disable merge
+ boolean useRedistribute = getUseRedistribute();
+ if (useRedistribute == true) {
+ hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
+ hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); //disable merge
+ }
hiveCmdBuilder.addStatement(getCreateTableStatement());
final String cmd = hiveCmdBuilder.toString();
@@ -101,13 +104,20 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();
try {
- long rowCount = readRowCountFromFile();
- if (!config.isEmptySegmentAllowed() && rowCount == 0) {
- stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
- return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+
+ boolean useRedistribute = getUseRedistribute();
+
+ int numReducers = 0;
+ if (useRedistribute == true) {
+ long rowCount = readRowCountFromFile();
+ if (!config.isEmptySegmentAllowed() && rowCount == 0) {
+ stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ }
+
+ numReducers = determineNumReducer(config, rowCount);
}
- int numReducers = determineNumReducer(config, rowCount);
createFlatHiveTable(config, numReducers);
return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
@@ -125,6 +135,14 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
return getParam("HiveInit");
}
+ public void setUseRedistribute(boolean useRedistribute) {
+ setParam("useRedistribute", String.valueOf(useRedistribute));
+ }
+
+ public boolean getUseRedistribute() {
+ return Boolean.valueOf(getParam("useRedistribute"));
+ }
+
public void setCreateTableStatement(String sql) {
setParam("HiveRedistributeData", sql);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/233a699f/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index e3d7879..3ea9af5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -19,8 +19,10 @@
package org.apache.kylin.source.hive;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Set;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,6 +30,11 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -110,16 +117,46 @@ public class HiveMRInput implements IMRInput {
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
+ final KylinConfig kylinConfig = CubeManager.getInstance(conf.getConfig()).getCube(cubeName).getConfig();
+
+ String createFlatTableMethod = kylinConfig.getCreateFlatHiveTableMethod();
+ if ("1".equals(createFlatTableMethod)) {
+ // create flat table first, then count and redistribute
+ jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, false, ""));
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName));
+ } else if ("2".equals(createFlatTableMethod)) {
+ // count from source table first, and then redistribute, suitable for partitioned table
+ final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
+ jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir));
+ jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, true, rowCountOutputDir));
+ } else {
+ throw new IllegalArgumentException("Unknown value for kylin.hive.create.flat.table.method: " + createFlatTableMethod);
+ }
- jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir));
- jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, rowCountOutputDir));
AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
if (task != null) {
jobFlow.addTask(task);
}
}
+ public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
+ StringBuilder hiveInitBuf = new StringBuilder();
+ hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
+ hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
+
+ String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
+
+ RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
+ step.setInitStatement(hiveInitBuf.toString());
+ step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir));
+ step.setRowCountOutputDir(rowCountOutputDir);
+ step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc));
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+ return step;
+ }
+
+
public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) {
final ShellExecutable step = new ShellExecutable();
@@ -174,17 +211,17 @@ public class HiveMRInput implements IMRInput {
return step;
}
- public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, String rowCountOutputDir) {
+ public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, boolean redistribute, String rowCountOutputDir) {
StringBuilder hiveInitBuf = new StringBuilder();
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
- String insertDataHqls;
- insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+ String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf, redistribute);
CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+ step.setUseRedistribute(redistribute);
step.setInitStatement(hiveInitBuf.toString());
step.setRowCountOutputDir(rowCountOutputDir);
step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql + insertDataHqls);
@@ -213,6 +250,126 @@ public class HiveMRInput implements IMRInput {
}
}
+ public static class RedistributeFlatHiveTableStep extends AbstractExecutable {
+ private final BufferedLogger stepLogger = new BufferedLogger(logger);
+
+ private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException {
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ hiveCmdBuilder.addStatement(getInitStatement());
+ hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
+ hiveCmdBuilder.addStatement(getSelectRowCountStatement());
+ final String cmd = hiveCmdBuilder.build();
+
+ stepLogger.log("Compute row count of flat hive table, cmd: ");
+ stepLogger.log(cmd);
+
+ Pair<Integer, String> response = cmdExecutor.execute(cmd, stepLogger);
+ if (response.getFirst() != 0) {
+ throw new RuntimeException("Failed to compute row count of flat hive table");
+ }
+ }
+
+ private long readRowCountFromFile(Path file) throws IOException {
+ FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration());
+ InputStream in = fs.open(file);
+ try {
+ String content = IOUtils.toString(in);
+ return Long.valueOf(content.trim()); // strip the '\n' character
+
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ }
+
+ private int determineNumReducer(KylinConfig config) throws IOException {
+ computeRowCount(config.getCliCommandExecutor());
+
+ Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
+ long rowCount = readRowCountFromFile(rowCountFile);
+ int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+ int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+ numReducers = Math.max(1, numReducers);
+
+ stepLogger.log("total input rows = " + rowCount);
+ stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+ stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
+
+ return numReducers;
+ }
+
+ private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ hiveCmdBuilder.addStatement(getInitStatement());
+ hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
+ hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
+ hiveCmdBuilder.addStatement(getRedistributeDataStatement());
+ final String cmd = hiveCmdBuilder.toString();
+
+ stepLogger.log("Redistribute table, cmd: ");
+ stepLogger.log(cmd);
+
+ Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+ if (response.getFirst() != 0) {
+ throw new RuntimeException("Failed to redistribute flat hive table");
+ }
+ }
+
+ private KylinConfig getCubeSpecificConfig() {
+ String cubeName = CubingExecutableUtil.getCubeName(getParams());
+ CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = manager.getCube(cubeName);
+ return cube.getConfig();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig config = getCubeSpecificConfig();
+
+ try {
+ int numReducers = determineNumReducer(config);
+ redistributeTable(config, numReducers);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+
+ } catch (Exception e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ }
+ }
+
+ public void setInitStatement(String sql) {
+ setParam("HiveInit", sql);
+ }
+
+ public String getInitStatement() {
+ return getParam("HiveInit");
+ }
+
+ public void setSelectRowCountStatement(String sql) {
+ setParam("HiveSelectRowCount", sql);
+ }
+
+ public String getSelectRowCountStatement() {
+ return getParam("HiveSelectRowCount");
+ }
+
+ public void setRedistributeDataStatement(String sql) {
+ setParam("HiveRedistributeData", sql);
+ }
+
+ public String getRedistributeDataStatement() {
+ return getParam("HiveRedistributeData");
+ }
+
+ public void setRowCountOutputDir(String rowCountOutputDir) {
+ setParam("rowCountOutputDir", rowCountOutputDir);
+ }
+
+ public String getRowCountOutputDir() {
+ return getParam("rowCountOutputDir");
+ }
+ }
+
public static class GarbageCollectionStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);