You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/19 03:05:50 UTC
[37/50] [abbrv] kylin git commit: hive job use overrided MR job
configuration by cube properties
hive job use overrided MR job configuration by cube properties
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ac356f01
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ac356f01
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ac356f01
Branch: refs/heads/master-cdh5.7
Commit: ac356f014d52c4b13ad72e9d6a537e50e9ace5fb
Parents: c92f79a
Author: lijieliang <li...@cmss.chinamobile.com>
Authored: Fri Oct 14 13:01:32 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 17 18:25:26 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/hive/HiveMRInput.java | 24 +++++++++++++++++---
1 file changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ac356f01/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 202e480..2ec1fbb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -35,6 +35,7 @@ import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -60,6 +61,8 @@ import com.google.common.collect.Sets;
public class HiveMRInput implements IMRInput {
+ private static final String MR_OVERRIDE_JOB_QUEUENAME = "mapreduce.job.queuename";
+
@Override
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
return new BatchCubingInputSide(flatDesc);
@@ -154,7 +157,10 @@ public class HiveMRInput implements IMRInput {
StringBuilder hiveInitBuf = new StringBuilder();
hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
-
+ final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
+ if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+ hiveInitBuf.append("SET mapreduce.job.queuename=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
+ }
String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
@@ -172,6 +178,11 @@ public class HiveMRInput implements IMRInput {
final ShellExecutable step = new ShellExecutable();
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
+ if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+ hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
+ kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
+ }
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir));
@@ -187,7 +198,7 @@ public class HiveMRInput implements IMRInput {
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig);
final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
@@ -201,6 +212,10 @@ public class HiveMRInput implements IMRInput {
if (lookupViewsTables.size() == 0) {
return null;
}
+ if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+ hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
+ kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
+ }
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
hiveCmdBuilder.addStatement(useDatabaseHql);
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
@@ -225,7 +240,10 @@ public class HiveMRInput implements IMRInput {
public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, boolean redistribute, String rowCountOutputDir) {
StringBuilder hiveInitBuf = new StringBuilder();
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
-
+ final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
+ if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+ hiveInitBuf.append("SET mapreduce.job.queuename =").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
+ }
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));