You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/17 14:08:19 UTC
[1/3] kylin git commit: delete the blank line code
Repository: kylin
Updated Branches:
refs/heads/master c92f79ad3 -> 07e81fd0b
delete the blank line code
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cc2b59fa
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cc2b59fa
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cc2b59fa
Branch: refs/heads/master
Commit: cc2b59faf1a12e76e6af83ba3a19fbe27ba54cc3
Parents: ac356f0
Author: lijieliang <li...@cmss.chinamobile.com>
Authored: Fri Oct 14 14:28:01 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 17 18:25:26 2016 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/source/hive/HiveMRInput.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/cc2b59fa/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 2ec1fbb..4ec8d3d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -130,7 +130,6 @@ public class HiveMRInput implements IMRInput {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-
final KylinConfig kylinConfig = CubeManager.getInstance(conf.getConfig()).getCube(cubeName).getConfig();
String createFlatTableMethod = kylinConfig.getCreateFlatHiveTableMethod();
[3/3] kylin git commit: KYLIN-2095 Allow cube to override Hive job
configuration by properties
Posted by sh...@apache.org.
KYLIN-2095 Allow cube to override Hive job configuration by properties
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07e81fd0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07e81fd0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07e81fd0
Branch: refs/heads/master
Commit: 07e81fd0b744e782d84383e327b1923cfc178d42
Parents: cc2b59f
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 17 22:07:57 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 17 22:07:57 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 ++
.../apache/kylin/source/hive/HiveMRInput.java | 42 ++++++++++++--------
2 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/07e81fd0/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 73ac788..5a06813 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -250,6 +250,10 @@ abstract public class KylinConfigBase implements Serializable {
return getPropertiesByPrefix("kylin.job.mr.config.override.");
}
+ public Map<String, String> getHiveConfigOverride() {
+ return getPropertiesByPrefix("kylin.hive.config.override.");
+ }
+
public String getKylinSparkJobJarPath() {
final String jobJar = getOptional("kylin.job.jar.spark");
if (StringUtils.isNotEmpty(jobJar)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/07e81fd0/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 4ec8d3d..f3fceb1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -20,6 +20,8 @@ package org.apache.kylin.source.hive;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
@@ -61,7 +63,6 @@ import com.google.common.collect.Sets;
public class HiveMRInput implements IMRInput {
- private static final String MR_OVERRIDE_JOB_QUEUENAME = "mapreduce.job.queuename";
@Override
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
@@ -157,9 +158,7 @@ public class HiveMRInput implements IMRInput {
hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
- if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
- hiveInitBuf.append("SET mapreduce.job.queuename=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
- }
+ appendHiveOverrideProperties(kylinConfig, hiveInitBuf);
String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
@@ -178,10 +177,7 @@ public class HiveMRInput implements IMRInput {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
- if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
- hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
- kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
- }
+ appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder);
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir));
@@ -192,6 +188,7 @@ public class HiveMRInput implements IMRInput {
return step;
}
+
public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
ShellExecutable step = new ShellExecutable();
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
@@ -211,10 +208,7 @@ public class HiveMRInput implements IMRInput {
if (lookupViewsTables.size() == 0) {
return null;
}
- if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
- hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
- kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
- }
+ appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder);
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
hiveCmdBuilder.addStatement(useDatabaseHql);
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
@@ -240,9 +234,7 @@ public class HiveMRInput implements IMRInput {
StringBuilder hiveInitBuf = new StringBuilder();
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
- if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
- hiveInitBuf.append("SET mapreduce.job.queuename =").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
- }
+ appendHiveOverrideProperties(kylinConfig, hiveInitBuf);
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
@@ -301,7 +293,7 @@ public class HiveMRInput implements IMRInput {
FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration());
InputStream in = fs.open(file);
try {
- String content = IOUtils.toString(in);
+ String content = IOUtils.toString(in, Charset.defaultCharset());
return Long.valueOf(content.trim()); // strip the '\n' character
} finally {
@@ -490,4 +482,22 @@ public class HiveMRInput implements IMRInput {
}
}
+
+ private static void appendHiveOverrideProperties(final KylinConfig kylinConfig, StringBuilder hiveCmd) {
+ final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride();
+ if (hiveConfOverride.isEmpty() == false) {
+ for (String key : hiveConfOverride.keySet()) {
+ hiveCmd.append("SET ").append(key).append("=").append(hiveConfOverride.get(key)).append(";\n");
+ }
+ }
+ }
+
+ private static void appendHiveOverrideProperties2(final KylinConfig kylinConfig, HiveCmdBuilder hiveCmdBuilder) {
+ final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride();
+ if (hiveConfOverride.isEmpty() == false) {
+ for (String key : hiveConfOverride.keySet()) {
+ hiveCmdBuilder.addStatement("SET " + key + "=" + hiveConfOverride.get(key) + ";\n");
+ }
+ }
+ }
}
[2/3] kylin git commit: hive job use overrided MR job configuration
by cube properties
Posted by sh...@apache.org.
hive job use overrided MR job configuration by cube properties
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ac356f01
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ac356f01
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ac356f01
Branch: refs/heads/master
Commit: ac356f014d52c4b13ad72e9d6a537e50e9ace5fb
Parents: c92f79a
Author: lijieliang <li...@cmss.chinamobile.com>
Authored: Fri Oct 14 13:01:32 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 17 18:25:26 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/hive/HiveMRInput.java | 24 +++++++++++++++++---
1 file changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ac356f01/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 202e480..2ec1fbb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -35,6 +35,7 @@ import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -60,6 +61,8 @@ import com.google.common.collect.Sets;
public class HiveMRInput implements IMRInput {
+ private static final String MR_OVERRIDE_JOB_QUEUENAME = "mapreduce.job.queuename";
+
@Override
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
return new BatchCubingInputSide(flatDesc);
@@ -154,7 +157,10 @@ public class HiveMRInput implements IMRInput {
StringBuilder hiveInitBuf = new StringBuilder();
hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
-
+ final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
+ if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+ hiveInitBuf.append("SET mapreduce.job.queuename=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
+ }
String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
@@ -172,6 +178,11 @@ public class HiveMRInput implements IMRInput {
final ShellExecutable step = new ShellExecutable();
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
+ if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+ hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
+ kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
+ }
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir));
@@ -187,7 +198,7 @@ public class HiveMRInput implements IMRInput {
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig);
final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
@@ -201,6 +212,10 @@ public class HiveMRInput implements IMRInput {
if (lookupViewsTables.size() == 0) {
return null;
}
+ if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+ hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
+ kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
+ }
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
hiveCmdBuilder.addStatement(useDatabaseHql);
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
@@ -225,7 +240,10 @@ public class HiveMRInput implements IMRInput {
public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, boolean redistribute, String rowCountOutputDir) {
StringBuilder hiveInitBuf = new StringBuilder();
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
-
+ final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
+ if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+ hiveInitBuf.append("SET mapreduce.job.queuename =").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
+ }
final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));