You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/23 06:58:12 UTC

[kylin] 02/02: KYLIN-3958 Add quote for backtick

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 47431ba0da7c3cce533f4d9dcd600654f33e0c90
Author: hit-lacus <hi...@126.com>
AuthorDate: Thu May 23 11:31:00 2019 +0800

    KYLIN-3958 Add quote for backtick
---
 .../apache/kylin/common/livy/LivyRestClient.java   |  9 ++++---
 .../src/main/resources/kylin-defaults.properties   |  6 +++++
 .../apache/kylin/source/hive/MRHiveDictUtil.java   | 30 +++++++++++-----------
 3 files changed, 26 insertions(+), 19 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
index 978b99d..e58dcb6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
@@ -36,9 +36,6 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
-/**
- *
- */
 public class LivyRestClient {
 
     private int httpConnectionTimeoutMs = 30000;
@@ -71,7 +68,11 @@ public class LivyRestClient {
         String url = baseUrl + "/batches";
         HttpPost post = newPost(url);
 
-        post.setEntity(new StringEntity(jobJson, "UTF-8"));
+        // Because livy submit job use JDK's ProcessBuilder, here we need to quote backtick
+        // otherwise backtick make livy throw org.apache.spark.sql.catalyst.parser.ParseException
+        String json = jobJson.replace("`", "\\\\`");
+
+        post.setEntity(new StringEntity(json, "UTF-8"));
         HttpResponse response = client.execute(post);
         String content = getContent(response);
         if (response.getStatusLine().getStatusCode() != 201) {
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 62f63ca..8beea6c 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -360,3 +360,9 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
 #kylin.source.jdbc.pass=
 #kylin.source.jdbc.sqoop-home=
 #kylin.source.jdbc.filed-delimiter=|
+
+### Livy with Kylin
+#kylin.engine.livy-conf.livy-enabled=false
+#kylin.engine.livy-conf.livy-url=http://LivyHost:8998
+#kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar
+#kylin.engine.livy-conf.livy-arr.jars=hdfs:///path-to-hadoop-dependency-jar
\ No newline at end of file
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 60191e5..c88033f 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -45,7 +45,6 @@ public class MRHiveDictUtil {
     private static final Logger logger = LoggerFactory.getLogger(MRHiveDictUtil.class);
     protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
 
-
     public enum DictHiveType {
         GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/");
         private String name;
@@ -96,8 +95,9 @@ public class MRHiveDictUtil {
         }
 
         if (index == flatDesc.getAllColumns().size()) {
-            // dictColumn not in flatDesc,need throw Exception
-            index = -1;
+            String msg = "Can not find correct column for " + dictColumn + ", please check 'kylin.dictionary.mr-hive.columns'";
+            logger.error(msg);
+            throw new IllegalArgumentException(msg);
         }
 
         TblColRef col = flatDesc.getAllColumns().get(index);
@@ -114,7 +114,6 @@ public class MRHiveDictUtil {
                 + sql + ";\n";
     }
 
-
     public static String getHiveTableName(IJoinedFlatTableDesc flatDesc, DictHiveType dictHiveType) {
         StringBuffer table = new StringBuffer(flatDesc.getTableName());
         table.append("__");
@@ -126,14 +125,17 @@ public class MRHiveDictUtil {
         sql.append("FROM " + flatDesc.getTableName() + "\n");
     }
 
-    public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls, ExecutableManager executableManager, String jobId) throws Exception{
+    public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls,
+            ExecutableManager executableManager, String jobId) throws IOException {
         final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
         livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
-        String sqlCmd = livyRestBuilder.parseProps();
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(livyRestBuilder.parseProps());
         for (String sql : sqls) {
-            sqlCmd += sql;
+            stringBuilder.append(sql);
         }
-        livyRestBuilder.addArgs(sqlCmd);
+        String args = stringBuilder.toString();
+        livyRestBuilder.addArgs(args);
 
         stepLogger.log("Create and distribute table. ");
         livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
@@ -143,23 +145,21 @@ public class MRHiveDictUtil {
 
         Map<String, String> info = stepLogger.getInfo();
         //get the flat Hive table size
-        Matcher matcher = MRHiveDictUtil.HDFS_LOCATION.matcher(sqlCmd);
+        Matcher matcher = HDFS_LOCATION.matcher(args);
         if (matcher.find()) {
             String hiveFlatTableHdfsUrl = matcher.group(1);
-            long size = MRHiveDictUtil.getFileSize(hiveFlatTableHdfsUrl);
+            long size = getFileSize(hiveFlatTableHdfsUrl);
             info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
-            logger.info("HDFS_Bytes_Writen: " + size);
+            logger.info("HDFS_Bytes_Writen: {}", size);
         }
         executableManager.addJobInfo(jobId, info);
     }
 
-    public static long getFileSize(String hdfsUrl) throws IOException {
+    private static long getFileSize(String hdfsUrl) throws IOException {
         Configuration configuration = new Configuration();
         Path path = new Path(hdfsUrl);
         FileSystem fs = path.getFileSystem(configuration);
         ContentSummary contentSummary = fs.getContentSummary(path);
-        long length = contentSummary.getLength();
-        return length;
+        return contentSummary.getLength();
     }
-
 }