You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/20 23:22:37 UTC
[13/50] [abbrv] kylin git commit: KYLIN-2095 Allow cube to override
Hive job configuration by properties
KYLIN-2095 Allow cube to override Hive job configuration by properties
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07e81fd0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07e81fd0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07e81fd0
Branch: refs/heads/KYLIN-1971
Commit: 07e81fd0b744e782d84383e327b1923cfc178d42
Parents: cc2b59f
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 17 22:07:57 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 17 22:07:57 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 ++
.../apache/kylin/source/hive/HiveMRInput.java | 42 ++++++++++++--------
2 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/07e81fd0/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 73ac788..5a06813 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -250,6 +250,10 @@ abstract public class KylinConfigBase implements Serializable {
return getPropertiesByPrefix("kylin.job.mr.config.override.");
}
+ public Map<String, String> getHiveConfigOverride() {
+ return getPropertiesByPrefix("kylin.hive.config.override.");
+ }
+
public String getKylinSparkJobJarPath() {
final String jobJar = getOptional("kylin.job.jar.spark");
if (StringUtils.isNotEmpty(jobJar)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/07e81fd0/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 4ec8d3d..f3fceb1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -20,6 +20,8 @@ package org.apache.kylin.source.hive;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
@@ -61,7 +63,6 @@ import com.google.common.collect.Sets;
public class HiveMRInput implements IMRInput {
- private static final String MR_OVERRIDE_JOB_QUEUENAME = "mapreduce.job.queuename";
@Override
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
@@ -157,9 +158,7 @@ public class HiveMRInput implements IMRInput {
hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
- if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
- hiveInitBuf.append("SET mapreduce.job.queuename=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
- }
+ appendHiveOverrideProperties(kylinConfig, hiveInitBuf);
String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
@@ -178,10 +177,7 @@ public class HiveMRInput implements IMRInput {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
- if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
- hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
- kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
- }
+ appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder);
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir));
@@ -192,6 +188,7 @@ public class HiveMRInput implements IMRInput {
return step;
}
+
public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
ShellExecutable step = new ShellExecutable();
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
@@ -211,10 +208,7 @@ public class HiveMRInput implements IMRInput {
if (lookupViewsTables.size() == 0) {
return null;
}
- if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
- hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
- kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
- }
+ appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder);
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
hiveCmdBuilder.addStatement(useDatabaseHql);
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
@@ -240,9 +234,7 @@ public class HiveMRInput implements IMRInput {
StringBuilder hiveInitBuf = new StringBuilder();
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
- if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
- hiveInitBuf.append("SET mapreduce.job.queuename =").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
- }
+ appendHiveOverrideProperties(kylinConfig, hiveInitBuf);
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
@@ -301,7 +293,7 @@ public class HiveMRInput implements IMRInput {
FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration());
InputStream in = fs.open(file);
try {
- String content = IOUtils.toString(in);
+ String content = IOUtils.toString(in, Charset.defaultCharset());
return Long.valueOf(content.trim()); // strip the '\n' character
} finally {
@@ -490,4 +482,22 @@ public class HiveMRInput implements IMRInput {
}
}
+
+ private static void appendHiveOverrideProperties(final KylinConfig kylinConfig, StringBuilder hiveCmd) {
+ final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride();
+ if (hiveConfOverride.isEmpty() == false) {
+ for (String key : hiveConfOverride.keySet()) {
+ hiveCmd.append("SET ").append(key).append("=").append(hiveConfOverride.get(key)).append(";\n");
+ }
+ }
+ }
+
+ private static void appendHiveOverrideProperties2(final KylinConfig kylinConfig, HiveCmdBuilder hiveCmdBuilder) {
+ final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride();
+ if (hiveConfOverride.isEmpty() == false) {
+ for (String key : hiveConfOverride.keySet()) {
+ hiveCmdBuilder.addStatement("SET " + key + "=" + hiveConfOverride.get(key) + ";\n");
+ }
+ }
+ }
}