You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/23 06:58:12 UTC
[kylin] 02/02: KYLIN-3958 Add quote for backtick
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 47431ba0da7c3cce533f4d9dcd600654f33e0c90
Author: hit-lacus <hi...@126.com>
AuthorDate: Thu May 23 11:31:00 2019 +0800
KYLIN-3958 Add quote for backtick
---
.../apache/kylin/common/livy/LivyRestClient.java | 9 ++++---
.../src/main/resources/kylin-defaults.properties | 6 +++++
.../apache/kylin/source/hive/MRHiveDictUtil.java | 30 +++++++++++-----------
3 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
index 978b99d..e58dcb6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
@@ -36,9 +36,6 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
-/**
- *
- */
public class LivyRestClient {
private int httpConnectionTimeoutMs = 30000;
@@ -71,7 +68,11 @@ public class LivyRestClient {
String url = baseUrl + "/batches";
HttpPost post = newPost(url);
- post.setEntity(new StringEntity(jobJson, "UTF-8"));
+ // Because livy submit job use JDK's ProcessBuilder, here we need to quote backtick
+ // otherwise backtick make livy throw org.apache.spark.sql.catalyst.parser.ParseException
+ String json = jobJson.replace("`", "\\\\`");
+
+ post.setEntity(new StringEntity(json, "UTF-8"));
HttpResponse response = client.execute(post);
String content = getContent(response);
if (response.getStatusLine().getStatusCode() != 201) {
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 62f63ca..8beea6c 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -360,3 +360,9 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
#kylin.source.jdbc.pass=
#kylin.source.jdbc.sqoop-home=
#kylin.source.jdbc.filed-delimiter=|
+
+### Livy with Kylin
+#kylin.engine.livy-conf.livy-enabled=false
+#kylin.engine.livy-conf.livy-url=http://LivyHost:8998
+#kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar
+#kylin.engine.livy-conf.livy-arr.jars=hdfs:///path-to-hadoop-dependency-jar
\ No newline at end of file
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 60191e5..c88033f 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -45,7 +45,6 @@ public class MRHiveDictUtil {
private static final Logger logger = LoggerFactory.getLogger(MRHiveDictUtil.class);
protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
-
public enum DictHiveType {
GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/");
private String name;
@@ -96,8 +95,9 @@ public class MRHiveDictUtil {
}
if (index == flatDesc.getAllColumns().size()) {
- // dictColumn not in flatDesc,need throw Exception
- index = -1;
+ String msg = "Can not find correct column for " + dictColumn + ", please check 'kylin.dictionary.mr-hive.columns'";
+ logger.error(msg);
+ throw new IllegalArgumentException(msg);
}
TblColRef col = flatDesc.getAllColumns().get(index);
@@ -114,7 +114,6 @@ public class MRHiveDictUtil {
+ sql + ";\n";
}
-
public static String getHiveTableName(IJoinedFlatTableDesc flatDesc, DictHiveType dictHiveType) {
StringBuffer table = new StringBuffer(flatDesc.getTableName());
table.append("__");
@@ -126,14 +125,17 @@ public class MRHiveDictUtil {
sql.append("FROM " + flatDesc.getTableName() + "\n");
}
- public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls, ExecutableManager executableManager, String jobId) throws Exception{
+ public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls,
+ ExecutableManager executableManager, String jobId) throws IOException {
final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
- String sqlCmd = livyRestBuilder.parseProps();
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(livyRestBuilder.parseProps());
for (String sql : sqls) {
- sqlCmd += sql;
+ stringBuilder.append(sql);
}
- livyRestBuilder.addArgs(sqlCmd);
+ String args = stringBuilder.toString();
+ livyRestBuilder.addArgs(args);
stepLogger.log("Create and distribute table. ");
livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
@@ -143,23 +145,21 @@ public class MRHiveDictUtil {
Map<String, String> info = stepLogger.getInfo();
//get the flat Hive table size
- Matcher matcher = MRHiveDictUtil.HDFS_LOCATION.matcher(sqlCmd);
+ Matcher matcher = HDFS_LOCATION.matcher(args);
if (matcher.find()) {
String hiveFlatTableHdfsUrl = matcher.group(1);
- long size = MRHiveDictUtil.getFileSize(hiveFlatTableHdfsUrl);
+ long size = getFileSize(hiveFlatTableHdfsUrl);
info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
- logger.info("HDFS_Bytes_Writen: " + size);
+ logger.info("HDFS_Bytes_Writen: {}", size);
}
executableManager.addJobInfo(jobId, info);
}
- public static long getFileSize(String hdfsUrl) throws IOException {
+ private static long getFileSize(String hdfsUrl) throws IOException {
Configuration configuration = new Configuration();
Path path = new Path(hdfsUrl);
FileSystem fs = path.getFileSystem(configuration);
ContentSummary contentSummary = fs.getContentSummary(path);
- long length = contentSummary.getLength();
- return length;
+ return contentSummary.getLength();
}
-
}