You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/04/09 09:27:24 UTC

[kylin] branch master updated: KYLIN-3912: Support cube level mapreduuce queue config for BeelineHiveClient

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 49cb815  KYLIN-3912: Support cube level mapreduuce queue config for BeelineHiveClient
49cb815 is described below

commit 49cb81505d71fa3eef12524d2760e877dd1fdbdd
Author: Liu Shaohui <li...@xiaomi.com>
AuthorDate: Wed Mar 27 13:42:00 2019 +0800

    KYLIN-3912: Support cube level mapreduuce queue config for BeelineHiveClient
---
 .../java/org/apache/kylin/common/util/HiveCmdBuilder.java  |  1 +
 .../apache/kylin/common/util/SourceConfigurationUtil.java  |  4 ++++
 .../org/apache/kylin/job/execution/AbstractExecutable.java | 14 ++++++++++++++
 .../apache/kylin/job/execution/CheckpointExecutable.java   |  5 -----
 .../main/java/org/apache/kylin/engine/mr/CubingJob.java    |  7 ++++++-
 .../org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java |  5 -----
 .../kylin/source/hive/CreateFlatHiveTableByLivyStep.java   | 11 -----------
 .../apache/kylin/source/hive/CreateFlatHiveTableStep.java  | 10 ----------
 .../source/hive/RedistributeFlatHiveTableByLivyStep.java   | 10 ----------
 .../kylin/source/hive/RedistributeFlatHiveTableStep.java   | 10 ----------
 10 files changed, 25 insertions(+), 52 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index 8a99906..d3cc46a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -46,6 +46,7 @@ public class HiveCmdBuilder {
     public HiveCmdBuilder() {
         kylinConfig = KylinConfig.getInstanceFromEnv();
         hiveConfProps = SourceConfigurationUtil.loadHiveConfiguration();
+        hiveConfProps.putAll(kylinConfig.getHiveConfigOverride());
     }
 
     public String build() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
index 14a6712..38171de 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
@@ -54,6 +54,10 @@ public class SourceConfigurationUtil {
         for (Map.Entry<String, String> entry : hiveConfiguration.entrySet()) {
             ret.put(HIVE_CONF_PREFIX + entry.getKey(), entry.getValue());
         }
+        Map<String, String> overrideConf = KylinConfig.getInstanceFromEnv().getHiveConfigOverride();
+        for (Map.Entry<String, String> entry : overrideConf.entrySet()) {
+            ret.put(HIVE_CONF_PREFIX + entry.getKey(), entry.getValue());
+        }
         return ret;
     }
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index b1d45ef..8e1b262 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -33,6 +33,8 @@ import org.apache.kylin.common.util.MailService;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.impl.threadpool.DefaultContext;
@@ -51,6 +53,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     public static final Integer DEFAULT_PRIORITY = 10;
 
+    public static final String CUBE_NAME = "cubeName";
     protected static final String SUBMITTER = "submitter";
     protected static final String NOTIFY_LIST = "notify_list";
     protected static final String START_TIME = "startTime";
@@ -91,6 +94,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
     }
 
+    public KylinConfig getCubeSpecificConfig() {
+        String cubeName = getCubeName();
+        CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubeInstance cube = manager.getCube(cubeName);
+        return cube.getConfig();
+    }
+
     private void onExecuteFinishedWithRetry(ExecuteResult result, ExecutableContext executableContext)
             throws ExecuteException {
         Throwable exception;
@@ -369,6 +379,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return getParam(SUBMITTER);
     }
 
+    public final String getCubeName() {
+        return getParam(CUBE_NAME);
+    }
+
     @Override
     public final Output getOutput() {
         return getManager().getOutput(getId());
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
index 779809a..3d198b4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
@@ -37,7 +37,6 @@ public class CheckpointExecutable extends DefaultChainedExecutable {
 
     private static final String DEPLOY_ENV_NAME = "envName";
     private static final String PROJECT_INSTANCE_NAME = "projectName";
-    private static final String CUBE_NAME = "cubeName";
 
     private final List<AbstractExecutable> subTasksForCheck = Lists.newArrayList();
 
@@ -111,10 +110,6 @@ public class CheckpointExecutable extends DefaultChainedExecutable {
         setParam(PROJECT_INSTANCE_NAME, name);
     }
 
-    public String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
     @Override
     public int getDefaultPriority() {
         return DEFAULT_PRIORITY;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 94fdd11..42db2e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -62,7 +62,6 @@ public class CubingJob extends DefaultChainedExecutable {
     public enum AlgorithmEnum {
         LAYER, INMEM
     }
-
     public enum CubingJobTypeEnum {
         BUILD("BUILD", 20), OPTIMIZE("OPTIMIZE", 5), MERGE("MERGE", 25), STREAM("STREAM", 30);
 
@@ -254,6 +253,12 @@ public class CubingJob extends DefaultChainedExecutable {
     }
 
     @Override
+    protected void onExecuteStart(ExecutableContext executableContext) {
+        KylinConfig.setAndUnsetThreadLocalConfig(getCubeSpecificConfig());
+        super.onExecuteStart(executableContext);
+    }
+
+    @Override
     protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
         long time = 0L;
         for (AbstractExecutable task : getTasks()) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
index 3ebd7d2..c142ace 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
@@ -37,7 +37,6 @@ public class LookupSnapshotBuildJob extends DefaultChainedExecutable {
 
     private static final String DEPLOY_ENV_NAME = "envName";
     private static final String PROJECT_INSTANCE_NAME = "projectName";
-    private static final String CUBE_NAME = "cubeName";
 
     private static final String JOB_TYPE = "Lookup ";
 
@@ -87,10 +86,6 @@ public class LookupSnapshotBuildJob extends DefaultChainedExecutable {
         setParam(PROJECT_INSTANCE_NAME, name);
     }
 
-    public String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
     @Override
     public int getDefaultPriority() {
         return DEFAULT_PRIORITY;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
index 3549630..eefe1b2 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
@@ -27,9 +27,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.livy.LivyRestBuilder;
 import org.apache.kylin.common.livy.LivyRestExecutor;
 import org.apache.kylin.common.livy.LivyTypeEnum;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -85,14 +82,6 @@ public class CreateFlatHiveTableByLivyStep extends AbstractExecutable {
         return length;
     }
 
-
-    private KylinConfig getCubeSpecificConfig() {
-        String cubeName = CubingExecutableUtil.getCubeName(getParams());
-        CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        CubeInstance cube = manager.getCube(cubeName);
-        return cube.getConfig();
-    }
-
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index 21ef940..c295784 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -29,9 +29,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -85,13 +82,6 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
         return length;
     }
 
-    private KylinConfig getCubeSpecificConfig() {
-        String cubeName = CubingExecutableUtil.getCubeName(getParams());
-        CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        CubeInstance cube = manager.getCube(cubeName);
-        return cube.getConfig();
-    }
-
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
index 4c07324..1166491 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
@@ -22,9 +22,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.livy.LivyRestBuilder;
 import org.apache.kylin.common.livy.LivyRestExecutor;
 import org.apache.kylin.common.livy.LivyTypeEnum;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -69,13 +66,6 @@ public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable {
         getManager().addJobInfo(getId(), stepLogger.getInfo());
     }
 
-    private KylinConfig getCubeSpecificConfig() {
-        String cubeName = CubingExecutableUtil.getCubeName(getParams());
-        CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        CubeInstance cube = manager.getCube(cubeName);
-        return cube.getConfig();
-    }
-
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
index 0dfb5bf..847e98f 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
@@ -23,9 +23,6 @@ import java.io.IOException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -67,13 +64,6 @@ public class RedistributeFlatHiveTableStep extends AbstractExecutable {
         }
     }
 
-    private KylinConfig getCubeSpecificConfig() {
-        String cubeName = CubingExecutableUtil.getCubeName(getParams());
-        CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        CubeInstance cube = manager.getCube(cubeName);
-        return cube.getConfig();
-    }
-
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();