You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/23 06:58:11 UTC

[kylin] 01/02: KYLIN-3958 MrHive-Dict support build by livy

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f15b1f8a7ce05416dc1df2ee8a2ab629d1c00873
Author: javalife0312 <ja...@126.com>
AuthorDate: Sun May 19 18:22:17 2019 +0800

    KYLIN-3958 MrHive-Dict support build by livy
---
 .../apache/kylin/common/util/HiveCmdBuilder.java   |  4 ++
 .../source/hive/CreateFlatHiveTableByLivyStep.java | 50 +++-----------------
 .../kylin/source/hive/CreateMrHiveDictStep.java    | 17 +++++--
 .../apache/kylin/source/hive/MRHiveDictUtil.java   | 55 ++++++++++++++++++++++
 .../hive/RedistributeFlatHiveTableByLivyStep.java  | 17 ++-----
 5 files changed, 81 insertions(+), 62 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index d3cc46a..10b8234 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -138,6 +138,10 @@ public class HiveCmdBuilder {
         statements.add(statement);
     }
 
+    public List<String> getStatements() {
+        return statements;
+    }
+
     public void addStatements(String[] stats) {
         for (String s : stats) {
             statements.add(s);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
index eefe1b2..e8c2d79 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
@@ -19,67 +19,29 @@
 package org.apache.kylin.source.hive;
 
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import com.google.common.collect.ImmutableList;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.livy.LivyRestBuilder;
-import org.apache.kylin.common.livy.LivyRestExecutor;
-import org.apache.kylin.common.livy.LivyTypeEnum;
 import org.apache.kylin.job.common.PatternedLogger;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 /**
  *
  */
 public class CreateFlatHiveTableByLivyStep extends AbstractExecutable {
     private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableByLivyStep.class);
     protected final PatternedLogger stepLogger = new PatternedLogger(logger);
-    private static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
-
 
     protected void createFlatHiveTable(KylinConfig config) throws Exception {
-        final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
-        livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
-        livyRestBuilder.addArgs(livyRestBuilder.parseProps() + getInitStatement() + getCreateTableStatement());
-
-        stepLogger.log("Create and distribute table. ");
-        livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
-
-        LivyRestExecutor executor = new LivyRestExecutor();
-        executor.execute(livyRestBuilder, stepLogger);
-
-        Map<String, String> info = stepLogger.getInfo();
-        //get the flat Hive table size
-        Matcher matcher = HDFS_LOCATION.matcher(getCreateTableStatement());
-        if (matcher.find()) {
-            String hiveFlatTableHdfsUrl = matcher.group(1);
-            long size = getFileSize(hiveFlatTableHdfsUrl);
-            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
-            logger.info("HDFS_Bytes_Writen: " + size);
-        }
-        getManager().addJobInfo(getId(), info);
-    }
-
-    private long getFileSize(String hdfsUrl) throws IOException {
-        Configuration configuration = new Configuration();
-        Path path = new Path(hdfsUrl);
-        FileSystem fs = path.getFileSystem(configuration);
-        ContentSummary contentSummary = fs.getContentSummary(path);
-        long length = contentSummary.getLength();
-        return length;
+        ImmutableList<String> sqls = ImmutableList.of(getInitStatement(), getCreateTableStatement());
+        ExecutableManager executableManager = getManager();
+        String jobId = getId();
+        MRHiveDictUtil.runLivySqlJob(stepLogger, config, sqls, executableManager, jobId);
     }
 
     @Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index dd3007d..cdff04a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.source.hive;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.util.HiveCmdBuilder;
@@ -30,6 +31,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IEngineAware;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -124,10 +126,17 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
 
             stepLogger.log("MR-Hive dict, cmd: " + cmd);
 
-            Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
-            getManager().addJobInfo(getId(), stepLogger.getInfo());
-            if (response.getFirst() != 0) {
-                throw new RuntimeException("Failed to create mr hive dict, error code " + response.getFirst());
+            CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = manager.getCube(getCubeName());
+
+            if (config.isLivyEnabled() && cube.getEngineType() == IEngineAware.ID_SPARK) {
+                MRHiveDictUtil.runLivySqlJob(stepLogger, config, ImmutableList.copyOf(hiveCmdBuilder.getStatements()), getManager(), getId());
+            } else {
+                Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+                if (response.getFirst() != 0) {
+                    throw new RuntimeException("Failed to create mr hive dict, error code " + response.getFirst());
+                }
+                getManager().addJobInfo(getId(), stepLogger.getInfo());
             }
             if (getIsLock()) {
                 String pathName = getLockPathName();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 18912b9..60191e5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -18,14 +18,33 @@
 
 package org.apache.kylin.source.hive;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.livy.LivyRestBuilder;
+import org.apache.kylin.common.livy.LivyRestExecutor;
+import org.apache.kylin.common.livy.LivyTypeEnum;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public class MRHiveDictUtil {
     private static final Logger logger = LoggerFactory.getLogger(MRHiveDictUtil.class);
+    protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
+
 
     public enum DictHiveType {
         GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/");
@@ -107,4 +126,40 @@ public class MRHiveDictUtil {
         sql.append("FROM " + flatDesc.getTableName() + "\n");
     }
 
+    public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls, ExecutableManager executableManager, String jobId) throws Exception{
+        final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
+        livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+        String sqlCmd = livyRestBuilder.parseProps();
+        for (String sql : sqls) {
+            sqlCmd += sql;
+        }
+        livyRestBuilder.addArgs(sqlCmd);
+
+        stepLogger.log("Create and distribute table. ");
+        livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
+
+        LivyRestExecutor executor = new LivyRestExecutor();
+        executor.execute(livyRestBuilder, stepLogger);
+
+        Map<String, String> info = stepLogger.getInfo();
+        //get the flat Hive table size
+        Matcher matcher = MRHiveDictUtil.HDFS_LOCATION.matcher(sqlCmd);
+        if (matcher.find()) {
+            String hiveFlatTableHdfsUrl = matcher.group(1);
+            long size = MRHiveDictUtil.getFileSize(hiveFlatTableHdfsUrl);
+            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
+            logger.info("HDFS_Bytes_Writen: " + size);
+        }
+        executableManager.addJobInfo(jobId, info);
+    }
+
+    public static long getFileSize(String hdfsUrl) throws IOException {
+        Configuration configuration = new Configuration();
+        Path path = new Path(hdfsUrl);
+        FileSystem fs = path.getFileSystem(configuration);
+        ContentSummary contentSummary = fs.getContentSummary(path);
+        long length = contentSummary.getLength();
+        return length;
+    }
+
 }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
index 1166491..92bc068 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
@@ -18,10 +18,8 @@
 
 package org.apache.kylin.source.hive;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.livy.LivyRestBuilder;
-import org.apache.kylin.common.livy.LivyRestExecutor;
-import org.apache.kylin.common.livy.LivyTypeEnum;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -45,25 +43,16 @@ public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable {
     }
 
     private void redistributeTable(KylinConfig config, int numReducers) throws Exception {
-        final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
-        livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
         StringBuffer statement = new StringBuffer();
-        statement.append(livyRestBuilder.parseProps());
         statement.append(getInitStatement());
         statement.append("set mapreduce.job.reduces=" + numReducers + ";\n");
         statement.append("set hive.merge.mapredfiles=false;\n");
         statement.append(getRedistributeDataStatement());
-        livyRestBuilder.addArgs(statement.toString());
-        final String cmd = livyRestBuilder.toString();
 
         stepLogger.log("Redistribute table, cmd: ");
-        stepLogger.log(cmd);
+        stepLogger.log(statement.toString());
 
-        livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
-
-        LivyRestExecutor executor = new LivyRestExecutor();
-        executor.execute(livyRestBuilder, stepLogger);
-        getManager().addJobInfo(getId(), stepLogger.getInfo());
+        MRHiveDictUtil.runLivySqlJob(stepLogger, config, ImmutableList.of(statement.toString()), getManager(), getId());
     }
 
     @Override