You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/04/09 09:27:24 UTC
[kylin] branch master updated: KYLIN-3912: Support cube level
mapreduuce queue config for BeelineHiveClient
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 49cb815 KYLIN-3912: Support cube level mapreduuce queue config for BeelineHiveClient
49cb815 is described below
commit 49cb81505d71fa3eef12524d2760e877dd1fdbdd
Author: Liu Shaohui <li...@xiaomi.com>
AuthorDate: Wed Mar 27 13:42:00 2019 +0800
KYLIN-3912: Support cube level mapreduuce queue config for BeelineHiveClient
---
.../java/org/apache/kylin/common/util/HiveCmdBuilder.java | 1 +
.../apache/kylin/common/util/SourceConfigurationUtil.java | 4 ++++
.../org/apache/kylin/job/execution/AbstractExecutable.java | 14 ++++++++++++++
.../apache/kylin/job/execution/CheckpointExecutable.java | 5 -----
.../main/java/org/apache/kylin/engine/mr/CubingJob.java | 7 ++++++-
.../org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java | 5 -----
.../kylin/source/hive/CreateFlatHiveTableByLivyStep.java | 11 -----------
.../apache/kylin/source/hive/CreateFlatHiveTableStep.java | 10 ----------
.../source/hive/RedistributeFlatHiveTableByLivyStep.java | 10 ----------
.../kylin/source/hive/RedistributeFlatHiveTableStep.java | 10 ----------
10 files changed, 25 insertions(+), 52 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index 8a99906..d3cc46a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -46,6 +46,7 @@ public class HiveCmdBuilder {
public HiveCmdBuilder() {
kylinConfig = KylinConfig.getInstanceFromEnv();
hiveConfProps = SourceConfigurationUtil.loadHiveConfiguration();
+ hiveConfProps.putAll(kylinConfig.getHiveConfigOverride());
}
public String build() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
index 14a6712..38171de 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
@@ -54,6 +54,10 @@ public class SourceConfigurationUtil {
for (Map.Entry<String, String> entry : hiveConfiguration.entrySet()) {
ret.put(HIVE_CONF_PREFIX + entry.getKey(), entry.getValue());
}
+ Map<String, String> overrideConf = KylinConfig.getInstanceFromEnv().getHiveConfigOverride();
+ for (Map.Entry<String, String> entry : overrideConf.entrySet()) {
+ ret.put(HIVE_CONF_PREFIX + entry.getKey(), entry.getValue());
+ }
return ret;
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index b1d45ef..8e1b262 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -33,6 +33,8 @@ import org.apache.kylin.common.util.MailService;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.impl.threadpool.DefaultContext;
@@ -51,6 +53,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
public static final Integer DEFAULT_PRIORITY = 10;
+ public static final String CUBE_NAME = "cubeName";
protected static final String SUBMITTER = "submitter";
protected static final String NOTIFY_LIST = "notify_list";
protected static final String START_TIME = "startTime";
@@ -91,6 +94,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
}
+ public KylinConfig getCubeSpecificConfig() {
+ String cubeName = getCubeName();
+ CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = manager.getCube(cubeName);
+ return cube.getConfig();
+ }
+
private void onExecuteFinishedWithRetry(ExecuteResult result, ExecutableContext executableContext)
throws ExecuteException {
Throwable exception;
@@ -369,6 +379,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return getParam(SUBMITTER);
}
+ public final String getCubeName() {
+ return getParam(CUBE_NAME);
+ }
+
@Override
public final Output getOutput() {
return getManager().getOutput(getId());
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
index 779809a..3d198b4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
@@ -37,7 +37,6 @@ public class CheckpointExecutable extends DefaultChainedExecutable {
private static final String DEPLOY_ENV_NAME = "envName";
private static final String PROJECT_INSTANCE_NAME = "projectName";
- private static final String CUBE_NAME = "cubeName";
private final List<AbstractExecutable> subTasksForCheck = Lists.newArrayList();
@@ -111,10 +110,6 @@ public class CheckpointExecutable extends DefaultChainedExecutable {
setParam(PROJECT_INSTANCE_NAME, name);
}
- public String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
@Override
public int getDefaultPriority() {
return DEFAULT_PRIORITY;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 94fdd11..42db2e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -62,7 +62,6 @@ public class CubingJob extends DefaultChainedExecutable {
public enum AlgorithmEnum {
LAYER, INMEM
}
-
public enum CubingJobTypeEnum {
BUILD("BUILD", 20), OPTIMIZE("OPTIMIZE", 5), MERGE("MERGE", 25), STREAM("STREAM", 30);
@@ -254,6 +253,12 @@ public class CubingJob extends DefaultChainedExecutable {
}
@Override
+ protected void onExecuteStart(ExecutableContext executableContext) {
+ KylinConfig.setAndUnsetThreadLocalConfig(getCubeSpecificConfig());
+ super.onExecuteStart(executableContext);
+ }
+
+ @Override
protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
long time = 0L;
for (AbstractExecutable task : getTasks()) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
index 3ebd7d2..c142ace 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
@@ -37,7 +37,6 @@ public class LookupSnapshotBuildJob extends DefaultChainedExecutable {
private static final String DEPLOY_ENV_NAME = "envName";
private static final String PROJECT_INSTANCE_NAME = "projectName";
- private static final String CUBE_NAME = "cubeName";
private static final String JOB_TYPE = "Lookup ";
@@ -87,10 +86,6 @@ public class LookupSnapshotBuildJob extends DefaultChainedExecutable {
setParam(PROJECT_INSTANCE_NAME, name);
}
- public String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
@Override
public int getDefaultPriority() {
return DEFAULT_PRIORITY;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
index 3549630..eefe1b2 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
@@ -27,9 +27,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.livy.LivyRestBuilder;
import org.apache.kylin.common.livy.LivyRestExecutor;
import org.apache.kylin.common.livy.LivyTypeEnum;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
@@ -85,14 +82,6 @@ public class CreateFlatHiveTableByLivyStep extends AbstractExecutable {
return length;
}
-
- private KylinConfig getCubeSpecificConfig() {
- String cubeName = CubingExecutableUtil.getCubeName(getParams());
- CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = manager.getCube(cubeName);
- return cube.getConfig();
- }
-
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index 21ef940..c295784 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -29,9 +29,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
@@ -85,13 +82,6 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
return length;
}
- private KylinConfig getCubeSpecificConfig() {
- String cubeName = CubingExecutableUtil.getCubeName(getParams());
- CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = manager.getCube(cubeName);
- return cube.getConfig();
- }
-
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
index 4c07324..1166491 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
@@ -22,9 +22,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.livy.LivyRestBuilder;
import org.apache.kylin.common.livy.LivyRestExecutor;
import org.apache.kylin.common.livy.LivyTypeEnum;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
@@ -69,13 +66,6 @@ public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable {
getManager().addJobInfo(getId(), stepLogger.getInfo());
}
- private KylinConfig getCubeSpecificConfig() {
- String cubeName = CubingExecutableUtil.getCubeName(getParams());
- CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = manager.getCube(cubeName);
- return cube.getConfig();
- }
-
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
index 0dfb5bf..847e98f 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
@@ -23,9 +23,6 @@ import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
@@ -67,13 +64,6 @@ public class RedistributeFlatHiveTableStep extends AbstractExecutable {
}
}
- private KylinConfig getCubeSpecificConfig() {
- String cubeName = CubingExecutableUtil.getCubeName(getParams());
- CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = manager.getCube(cubeName);
- return cube.getConfig();
- }
-
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();