You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/03/23 08:15:18 UTC
[kylin] branch master updated: KYLIN-3288 specify
mapreduce.job.queuename when submit sqoop job
This is an automated email from the ASF dual-hosted git repository.
lidong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new a7a0baa KYLIN-3288 specify mapreduce.job.queuename when submit sqoop job
a7a0baa is described below
commit a7a0baa319c8f609be0142e7098141f518b972fb
Author: ZhansShaoxiong <sh...@gmail.com>
AuthorDate: Thu Mar 22 20:58:19 2018 +0800
KYLIN-3288 specify mapreduce.job.queuename when submit sqoop job
---
.../apache/kylin/source/jdbc/JdbcHiveMRInput.java | 20 +++++++++++++++++---
1 file changed, 17 insertions(+), 3 deletions(-)
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 7fc26de..457c832 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -19,6 +19,7 @@
package org.apache.kylin.source.jdbc;
import java.util.List;
+import java.util.Map;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory;
public class JdbcHiveMRInput extends HiveMRInput {
private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
+ private static final String MR_OVERRIDE_QUEUE_KEY = "mapreduce.job.queuename";
+ private static final String DEFAULT_QUEUE = "default";
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
return new BatchCubingInputSide(flatDesc);
@@ -125,6 +128,14 @@ public class JdbcHiveMRInput extends HiveMRInput {
return splitColumn;
}
+ private String getSqoopJobQueueName(KylinConfig config) {
+ Map<String, String> mrConfigOverride = config.getMRConfigOverride();
+ if (mrConfigOverride.containsKey(MR_OVERRIDE_QUEUE_KEY)) {
+ return mrConfigOverride.get(MR_OVERRIDE_QUEUE_KEY);
+ }
+ return DEFAULT_QUEUE;
+ }
+
private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
KylinConfig config = getConfig();
PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
@@ -162,13 +173,16 @@ public class JdbcHiveMRInput extends HiveMRInput {
bquery += " WHERE " + partitionString;
}
- String cmd = String.format(String.format(
+ //related to "kylin.engine.mr.config-override.mapreduce.job.queuename"
+ String queueName = getSqoopJobQueueName(config);
+ String cmd = String.format(
"%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+ + "-Dmapreduce.job.queuename=%s "
+ "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
+ "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
+ "--fields-terminated-by '%s' --num-mappers %d",
- sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
- splitTable, splitColumn, bquery, filedDelimiter, mapperNum));
+ sqoopHome, queueName, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
+ splitTable, splitColumn, bquery, filedDelimiter, mapperNum);
logger.debug(String.format("sqoop cmd:%s", cmd));
CmdStep step = new CmdStep();
step.setCmd(cmd);
--
To stop receiving notification emails like this one, please contact
lidong@apache.org.