You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/23 06:58:11 UTC
[kylin] 01/02: KYLIN-3958 MrHive-Dict support build by livy
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit f15b1f8a7ce05416dc1df2ee8a2ab629d1c00873
Author: javalife0312 <ja...@126.com>
AuthorDate: Sun May 19 18:22:17 2019 +0800
KYLIN-3958 MrHive-Dict support build by livy
---
.../apache/kylin/common/util/HiveCmdBuilder.java | 4 ++
.../source/hive/CreateFlatHiveTableByLivyStep.java | 50 +++-----------------
.../kylin/source/hive/CreateMrHiveDictStep.java | 17 +++++--
.../apache/kylin/source/hive/MRHiveDictUtil.java | 55 ++++++++++++++++++++++
.../hive/RedistributeFlatHiveTableByLivyStep.java | 17 ++-----
5 files changed, 81 insertions(+), 62 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index d3cc46a..10b8234 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -138,6 +138,10 @@ public class HiveCmdBuilder {
statements.add(statement);
}
+ public List<String> getStatements() {
+ return statements;
+ }
+
public void addStatements(String[] stats) {
for (String s : stats) {
statements.add(s);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
index eefe1b2..e8c2d79 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
@@ -19,67 +19,29 @@
package org.apache.kylin.source.hive;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import com.google.common.collect.ImmutableList;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.livy.LivyRestBuilder;
-import org.apache.kylin.common.livy.LivyRestExecutor;
-import org.apache.kylin.common.livy.LivyTypeEnum;
import org.apache.kylin.job.common.PatternedLogger;
-import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecuteResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
/**
*
*/
public class CreateFlatHiveTableByLivyStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableByLivyStep.class);
protected final PatternedLogger stepLogger = new PatternedLogger(logger);
- private static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
-
protected void createFlatHiveTable(KylinConfig config) throws Exception {
- final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
- livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
- livyRestBuilder.addArgs(livyRestBuilder.parseProps() + getInitStatement() + getCreateTableStatement());
-
- stepLogger.log("Create and distribute table. ");
- livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
-
- LivyRestExecutor executor = new LivyRestExecutor();
- executor.execute(livyRestBuilder, stepLogger);
-
- Map<String, String> info = stepLogger.getInfo();
- //get the flat Hive table size
- Matcher matcher = HDFS_LOCATION.matcher(getCreateTableStatement());
- if (matcher.find()) {
- String hiveFlatTableHdfsUrl = matcher.group(1);
- long size = getFileSize(hiveFlatTableHdfsUrl);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
- logger.info("HDFS_Bytes_Writen: " + size);
- }
- getManager().addJobInfo(getId(), info);
- }
-
- private long getFileSize(String hdfsUrl) throws IOException {
- Configuration configuration = new Configuration();
- Path path = new Path(hdfsUrl);
- FileSystem fs = path.getFileSystem(configuration);
- ContentSummary contentSummary = fs.getContentSummary(path);
- long length = contentSummary.getLength();
- return length;
+ ImmutableList<String> sqls = ImmutableList.of(getInitStatement(), getCreateTableStatement());
+ ExecutableManager executableManager = getManager();
+ String jobId = getId();
+ MRHiveDictUtil.runLivySqlJob(stepLogger, config, sqls, executableManager, jobId);
}
@Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index dd3007d..cdff04a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -18,6 +18,7 @@
package org.apache.kylin.source.hive;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.HiveCmdBuilder;
@@ -30,6 +31,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IEngineAware;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -124,10 +126,17 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
stepLogger.log("MR-Hive dict, cmd: " + cmd);
- Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
- getManager().addJobInfo(getId(), stepLogger.getInfo());
- if (response.getFirst() != 0) {
- throw new RuntimeException("Failed to create mr hive dict, error code " + response.getFirst());
+ CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = manager.getCube(getCubeName());
+
+ if (config.isLivyEnabled() && cube.getEngineType() == IEngineAware.ID_SPARK) {
+ MRHiveDictUtil.runLivySqlJob(stepLogger, config, ImmutableList.copyOf(hiveCmdBuilder.getStatements()), getManager(), getId());
+ } else {
+ Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+ if (response.getFirst() != 0) {
+ throw new RuntimeException("Failed to create mr hive dict, error code " + response.getFirst());
+ }
+ getManager().addJobInfo(getId(), stepLogger.getInfo());
}
if (getIsLock()) {
String pathName = getLockPathName();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 18912b9..60191e5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -18,14 +18,33 @@
package org.apache.kylin.source.hive;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.livy.LivyRestBuilder;
+import org.apache.kylin.common.livy.LivyRestExecutor;
+import org.apache.kylin.common.livy.LivyTypeEnum;
import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class MRHiveDictUtil {
private static final Logger logger = LoggerFactory.getLogger(MRHiveDictUtil.class);
+ protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
+
public enum DictHiveType {
GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/");
@@ -107,4 +126,40 @@ public class MRHiveDictUtil {
sql.append("FROM " + flatDesc.getTableName() + "\n");
}
+ public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls, ExecutableManager executableManager, String jobId) throws Exception{
+ final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
+ livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+ String sqlCmd = livyRestBuilder.parseProps();
+ for (String sql : sqls) {
+ sqlCmd += sql;
+ }
+ livyRestBuilder.addArgs(sqlCmd);
+
+ stepLogger.log("Create and distribute table. ");
+ livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
+
+ LivyRestExecutor executor = new LivyRestExecutor();
+ executor.execute(livyRestBuilder, stepLogger);
+
+ Map<String, String> info = stepLogger.getInfo();
+ //get the flat Hive table size
+ Matcher matcher = MRHiveDictUtil.HDFS_LOCATION.matcher(sqlCmd);
+ if (matcher.find()) {
+ String hiveFlatTableHdfsUrl = matcher.group(1);
+ long size = MRHiveDictUtil.getFileSize(hiveFlatTableHdfsUrl);
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
+ logger.info("HDFS_Bytes_Writen: " + size);
+ }
+ executableManager.addJobInfo(jobId, info);
+ }
+
+ public static long getFileSize(String hdfsUrl) throws IOException {
+ Configuration configuration = new Configuration();
+ Path path = new Path(hdfsUrl);
+ FileSystem fs = path.getFileSystem(configuration);
+ ContentSummary contentSummary = fs.getContentSummary(path);
+ long length = contentSummary.getLength();
+ return length;
+ }
+
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
index 1166491..92bc068 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
@@ -18,10 +18,8 @@
package org.apache.kylin.source.hive;
+import com.google.common.collect.ImmutableList;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.livy.LivyRestBuilder;
-import org.apache.kylin.common.livy.LivyRestExecutor;
-import org.apache.kylin.common.livy.LivyTypeEnum;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
@@ -45,25 +43,16 @@ public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable {
}
private void redistributeTable(KylinConfig config, int numReducers) throws Exception {
- final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
- livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
StringBuffer statement = new StringBuffer();
- statement.append(livyRestBuilder.parseProps());
statement.append(getInitStatement());
statement.append("set mapreduce.job.reduces=" + numReducers + ";\n");
statement.append("set hive.merge.mapredfiles=false;\n");
statement.append(getRedistributeDataStatement());
- livyRestBuilder.addArgs(statement.toString());
- final String cmd = livyRestBuilder.toString();
stepLogger.log("Redistribute table, cmd: ");
- stepLogger.log(cmd);
+ stepLogger.log(statement.toString());
- livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
-
- LivyRestExecutor executor = new LivyRestExecutor();
- executor.execute(livyRestBuilder, stepLogger);
- getManager().addJobInfo(getId(), stepLogger.getInfo());
+ MRHiveDictUtil.runLivySqlJob(stepLogger, config, ImmutableList.of(statement.toString()), getManager(), getId());
}
@Override