You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/04 10:56:28 UTC

[kylin] branch master updated: KYLIN-3795 Submit Spark jobs via Apache Livy

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 662a7f9  KYLIN-3795 Submit Spark jobs via Apache Livy
662a7f9 is described below

commit 662a7f95104b3e8c31026cef65192127a2ab2e15
Author: javalife0312 <ja...@126.com>
AuthorDate: Mon Mar 4 10:58:29 2019 +0800

    KYLIN-3795 Submit Spark jobs via Apache Livy
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  24 ++
 .../apache/kylin/common/livy/LivyRestBuilder.java  | 155 ++++++++++++
 .../apache/kylin/common/livy/LivyRestClient.java   | 136 ++++++++++
 .../apache/kylin/common/livy/LivyRestExecutor.java | 122 +++++++++
 .../apache/kylin/common/livy/LivyStateEnum.java    |  25 ++
 .../org/apache/kylin/common/livy/LivyTypeEnum.java |  25 ++
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |   4 +-
 .../engine/spark/SparkBatchMergeJobBuilder2.java   |   4 +-
 .../apache/kylin/engine/spark/SparkExecutable.java |   6 +-
 .../kylin/engine/spark/SparkExecutableFactory.java |  32 +++
 .../kylin/engine/spark/SparkExecutableLivy.java    | 276 +++++++++++++++++++++
 .../kylin/engine/spark/SparkSqlOnLivyBatch.scala   |  52 ++++
 .../source/hive/CreateFlatHiveTableByLivyStep.java | 124 +++++++++
 .../apache/kylin/source/hive/HiveInputBase.java    |  46 +++-
 .../hive/RedistributeFlatHiveTableByLivyStep.java  | 149 +++++++++++
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |   3 +-
 16 files changed, 1172 insertions(+), 11 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6457d0a..81979dc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1372,7 +1372,31 @@ public abstract class KylinConfigBase implements Serializable {
     public boolean isSparkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE));
     }
+    
+    // ============================================================================
+    // ENGINE.LIVY
+    // ============================================================================
+
+    public boolean enableLivy() {
+        return getOptional("kylin.engine.livy-conf.livy.enable", "false").equalsIgnoreCase("true") ? true : false;
+    }
+
+    public String getLivyUrl() {
+        return getOptional("kylin.engine.livy-conf.livy.url");
+    }
 
+    public Map<String, String> getLivyKey() {
+        return getPropertiesByPrefix("kylin.engine.livy-conf.livy-key.");
+    }
+
+    public Map<String, String> getLivyArr() {
+        return getPropertiesByPrefix("kylin.engine.livy-conf.livy-arr.");
+    }
+
+    public Map<String, String> getLivyMap() {
+        return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
+    }
+    
     // ============================================================================
     // QUERY
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java
new file mode 100644
index 0000000..218b4e4
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.SourceConfigurationUtil;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ *
+ */
+public class LivyRestBuilder {
+    protected static final org.slf4j.Logger logger = LoggerFactory.getLogger(LivyRestBuilder.class);
+
+    final private KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+    final private Map<String, String> hiveConfProps = SourceConfigurationUtil.loadHiveConfiguration();
+
+    private String url;
+    private LivyTypeEnum livyTypeEnum;
+
+    private Map<String, String> keys;
+    private Map<String, String> arrs;
+    private Map<String, String> maps;
+
+    private ArrayList<String> args = new ArrayList<>();
+
+    public LivyRestBuilder() {
+        url = kylinConfig.getLivyUrl();
+
+        keys = kylinConfig.getLivyKey();
+        arrs = kylinConfig.getLivyArr();
+        maps = kylinConfig.getLivyMap();
+    }
+
+    public String build() throws JSONException {
+        try {
+
+            JSONObject postJson = new JSONObject();
+
+            if (LivyTypeEnum.sql.equals(livyTypeEnum)) {
+                postJson.put("className", "org.apache.kylin.engine.spark.SparkSqlOnLivyBatch");
+                postJson.put("args", args);
+            } else if (LivyTypeEnum.job.equals(livyTypeEnum)) {
+                postJson.put("className", "org.apache.kylin.common.util.SparkEntry");
+                postJson.put("args", args);
+            } else {
+                throw new IllegalArgumentException("unSupport livy type.");
+            }
+
+            //deal conf of key
+            keys.forEach((key, value) -> {
+                try {
+                    postJson.put(key, value);
+                } catch (JSONException e) {
+                    e.printStackTrace();
+                }
+            });
+
+            //deal conf of arr
+            arrs.forEach((key, value) -> {
+                try {
+                    postJson.put(key, Lists.newArrayList(value.split(",")));
+                } catch (JSONException e) {
+                    e.printStackTrace();
+                }
+            });
+
+            //deal conf of map
+            JSONObject confJson = new JSONObject();
+            maps.forEach((key, value) -> {
+                try {
+                    confJson.put(key, value);
+                } catch (JSONException e) {
+                    e.printStackTrace();
+                }
+            });
+            postJson.put("conf", confJson);
+
+            return postJson.toString();
+        } catch (JSONException e) {
+            e.printStackTrace();
+            throw new JSONException("create livy json error :" + e.getMessage());
+        }
+    }
+
+    public void overwriteHiveProps(Map<String, String> overwrites) {
+        this.hiveConfProps.putAll(overwrites);
+    }
+
+    public String parseProps() {
+        StringBuilder s = new StringBuilder();
+        for (Map.Entry<String, String> prop : hiveConfProps.entrySet()) {
+            s.append("set ");
+            s.append(prop.getKey());
+            s.append("=");
+            s.append(prop.getValue());
+            s.append("; \n");
+        }
+        return s.toString();
+    }
+
+    public void addArgs(String arg) {
+        this.args.add(arg);
+    }
+
+    public void addConf(String key, String value) {
+        this.maps.put(key, value);
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public ArrayList<String> getArgs() {
+        return args;
+    }
+
+    public void setArgs(ArrayList<String> args) {
+        this.args = args;
+    }
+
+    public LivyTypeEnum getLivyTypeEnum() {
+        return this.livyTypeEnum;
+    }
+
+    public void setLivyTypeEnum(LivyTypeEnum livyTypeEnum) {
+        this.livyTypeEnum = livyTypeEnum;
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
new file mode 100644
index 0000000..978b99d
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.kylin.common.KylinConfig;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ *
+ */
+public class LivyRestClient {
+
+    private int httpConnectionTimeoutMs = 30000;
+    private int httpSocketTimeoutMs = 120000;
+
+    protected String baseUrl;
+    protected DefaultHttpClient client;
+
+    final private KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+    public LivyRestClient() {
+        init();
+    }
+
+
+    private void init() {
+        final HttpParams httpParams = new BasicHttpParams();
+        HttpConnectionParams.setSoTimeout(httpParams, httpSocketTimeoutMs);
+        HttpConnectionParams.setConnectionTimeout(httpParams, httpConnectionTimeoutMs);
+
+        PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
+        cm.setDefaultMaxPerRoute(config.getRestClientDefaultMaxPerRoute());
+        cm.setMaxTotal(config.getRestClientMaxTotal());
+
+        baseUrl = config.getLivyUrl();
+        client = new DefaultHttpClient(cm, httpParams);
+    }
+
+    public String livySubmitJobBatches(String jobJson) throws IOException {
+        String url = baseUrl + "/batches";
+        HttpPost post = newPost(url);
+
+        post.setEntity(new StringEntity(jobJson, "UTF-8"));
+        HttpResponse response = client.execute(post);
+        String content = getContent(response);
+        if (response.getStatusLine().getStatusCode() != 201) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+        return content;
+    }
+
+
+    public String livyGetJobStatusBatches(String jobId) throws IOException {
+        String url = baseUrl + "/batches/" + jobId;
+        HttpGet get = new HttpGet(url);
+
+        HttpResponse response = client.execute(get);
+        String content = getContent(response);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+        return content;
+    }
+
+    public String livyDeleteBatches(String jobId) throws IOException {
+        String url = baseUrl + "/batches/" + jobId;
+        HttpDelete delete = new HttpDelete(url);
+
+        HttpResponse response = client.execute(delete);
+        String content = getContent(response);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+        return content;
+    }
+
+    private HttpPost newPost(String url) {
+        HttpPost post = new HttpPost(url);
+        addHttpHeaders(post);
+        return post;
+    }
+
+    private void addHttpHeaders(HttpRequestBase method) {
+        method.addHeader("Accept", "application/json, text/plain, */*");
+        method.addHeader("Content-Type", "application/json");
+    }
+
+    private String getContent(HttpResponse response) throws IOException {
+        InputStreamReader reader = null;
+        BufferedReader rd = null;
+        StringBuffer result = new StringBuffer();
+        try {
+            reader = new InputStreamReader(response.getEntity().getContent(), "UTF-8");
+            rd = new BufferedReader(reader);
+            String line;
+            while ((line = rd.readLine()) != null) {
+                result.append(line);
+            }
+        } finally {
+            IOUtils.closeQuietly(reader);
+            IOUtils.closeQuietly(rd);
+        }
+        return result.toString();
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java
new file mode 100644
index 0000000..03d66aa
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+import org.apache.kylin.common.util.Logger;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class LivyRestExecutor {
+
+    protected static final org.slf4j.Logger logger = LoggerFactory.getLogger(LivyRestExecutor.class);
+
+    public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) {
+        try {
+            long startTime = System.currentTimeMillis();
+
+            String dataJson = livyRestBuilder.build();
+
+            logAppender.log("Livy submit Json: ");
+            logAppender.log(dataJson + "\n");
+
+            LivyRestClient restClient = new LivyRestClient();
+            String result = restClient.livySubmitJobBatches(dataJson);
+
+            JSONObject resultJson = new JSONObject(result);
+            String state = resultJson.getString("state");
+            logAppender.log("Livy submit Result: " + state);
+            logger.info("Livy submit Result: {}", state);
+
+            livyLog(resultJson, logAppender);
+
+            final String livyTaskId = resultJson.getString("id");
+            while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state)
+                    && !LivyStateEnum.error.toString().equalsIgnoreCase(state)
+                    && !LivyStateEnum.dead.toString().equalsIgnoreCase(state)
+                    && !LivyStateEnum.success.toString().equalsIgnoreCase(state)) {
+
+                String statusResult = restClient.livyGetJobStatusBatches(livyTaskId);
+                JSONObject stateJson = new JSONObject(statusResult);
+                if (!state.equalsIgnoreCase(stateJson.getString("state"))) {
+                    logAppender.log("Livy status Result: " + stateJson.getString("state"));
+                    livyLog(stateJson, logAppender);
+                }
+                state = stateJson.getString("state");
+                Thread.sleep(10*1000);
+            }
+            if (!LivyStateEnum.success.toString().equalsIgnoreCase(state)) {
+                logAppender.log("livy start execute failed. state is " + state);
+                logger.info("livy start execute failed. state is {}", state);
+                throw new RuntimeException("livy get status failed. state is " + state);
+            }
+            logAppender.log("costTime : " + (System.currentTimeMillis() - startTime) / 1000 + " s");
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException("livy execute failed. \n" + e.getMessage());
+        }
+    }
+
+    public String state(String batchId) {
+        try {
+            LivyRestClient restClient = new LivyRestClient();
+            String statusResult = restClient.livyGetJobStatusBatches(batchId);
+            JSONObject stateJson = new JSONObject(statusResult);
+            return stateJson.getString("state");
+        } catch (Exception e) {
+            e.printStackTrace();
+            return "";
+        }
+    }
+
+    public Boolean kill(String batchId) {
+        try {
+            LivyRestClient restClient = new LivyRestClient();
+            String statusResult = restClient.livyDeleteBatches(batchId);
+            JSONObject stateJson = new JSONObject(statusResult);
+            return stateJson.getString("msg").equalsIgnoreCase("deleted")? true: false;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    private void livyLog(JSONObject logInfo, Logger logger) {
+        if (logInfo.has("log")) {
+            try {
+                JSONArray logArray = logInfo.getJSONArray("log");
+
+                for (int i=0;i<logArray.length();i++) {
+                    String info = logArray.getString(i);
+                    logger.log(info);
+                }
+
+            } catch (JSONException e) {
+                e.printStackTrace();
+            }
+            logInfo.remove("log");
+            logger.log(logInfo.toString());
+        }
+    }
+
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java
new file mode 100644
index 0000000..30c7188
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+/**
+ */
+public enum LivyStateEnum {
+    starting, running, success, dead, error, not_started, idle, busy, shutting_down;
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java
new file mode 100644
index 0000000..e4a44b1
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+/**
+ */
+public enum  LivyTypeEnum {
+    sql, job;
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 3f3c14d..426a73f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -88,7 +88,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
     }
 
     public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
-        final SparkExecutable sparkExecutable = new SparkExecutable();
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
         final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
         final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
 
@@ -115,7 +115,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
     }
 
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
-        final SparkExecutable sparkExecutable = new SparkExecutable();
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
         sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
         configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath);
         result.addTask(sparkExecutable);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index eb67fef..21599ff 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -77,7 +77,7 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
     }
 
     public SparkExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
-        final SparkExecutable sparkExecutable = new SparkExecutable();
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
         sparkExecutable.setClassName(SparkMergingDictionary.class.getName());
 
         sparkExecutable.setParam(SparkMergingDictionary.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
@@ -108,7 +108,7 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
         String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
         String outputPath = getCuboidRootPath(jobID);
 
-        final SparkExecutable sparkExecutable = new SparkExecutable();
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
         sparkExecutable.setClassName(SparkCubingMerge.class.getName());
         sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 8c4b99d..8535212 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -349,7 +349,7 @@ public class SparkExecutable extends AbstractExecutable {
         }
     }
 
-    private void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException {
+    protected void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException {
         try {
             if (mergingSeg == null || mergingSeg.size() == 0) {
                 attachSegmentMetadataWithDict(segment);
@@ -365,7 +365,7 @@ public class SparkExecutable extends AbstractExecutable {
     }
 
     // Spark Cubing can only work in layer algorithm
-    private void setAlgorithmLayer() {
+    protected void setAlgorithmLayer() {
         ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
         CubingJob cubingJob = (CubingJob) execMgr.getJob(this.getParam(JOB_ID));
         cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
@@ -443,7 +443,7 @@ public class SparkExecutable extends AbstractExecutable {
                 this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
     }
 
-    private void readCounters(final Map<String, String> info) {
+    protected void readCounters(final Map<String, String> info) {
         String counter_save_as = getCounterSaveAs();
         if (counter_save_as != null) {
             String[] saveAsNames = counter_save_as.split(",");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java
new file mode 100644
index 0000000..bc59c12
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class SparkExecutableFactory {
+
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutableFactory.class);
+
+    public static SparkExecutable instance(boolean livy) {
+        return livy ? new SparkExecutableLivy() : new SparkExecutable();
+    }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
new file mode 100644
index 0000000..d512104
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.livy.LivyRestBuilder;
+import org.apache.kylin.common.livy.LivyRestExecutor;
+import org.apache.kylin.common.livy.LivyStateEnum;
+import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.metadata.model.Segments;
+import org.apache.parquet.Strings;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class SparkExecutableLivy extends SparkExecutable {
+
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutableLivy.class);
+
+    private static final String CLASS_NAME = "className";
+    private static final String JARS = "jars";
+    private static final String JOB_ID = "jobId";
+    private static final String COUNTER_SAVE_AS = "CounterSaveAs";
+    private static final String CONFIG_NAME = "configName";
+
+    public void formatArgs(List<String> args) {
+        //-className must first
+        for (Map.Entry<String, String> entry : getParams().entrySet()) {
+            if (entry.getKey().equals(CLASS_NAME)) {
+                args.add("-" + entry.getKey());
+                args.add(entry.getValue());
+                break;
+            }
+        }
+        for (Map.Entry<String, String> entry : getParams().entrySet()) {
+            if (entry.getKey().equals(CLASS_NAME) || entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)
+                    || entry.getKey().equals(COUNTER_SAVE_AS) || entry.getKey().equals(CONFIG_NAME)) {
+                continue;
+            } else {
+                args.add("-" + entry.getKey());
+                args.add(entry.getValue());
+            }
+        }
+    }
+
+    @Override
+    protected void onExecuteStart(ExecutableContext executableContext) {
+        final Output output = getOutput();
+        if (output.getExtra().containsKey(START_TIME)) {
+            final String sparkJobID = output.getExtra().get(ExecutableConstants.SPARK_JOB_ID);
+            if (sparkJobID == null) {
+                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                return;
+            }
+            try {
+                String status = getAppState(sparkJobID);
+                if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status)
+                        || LivyStateEnum.error.name().equalsIgnoreCase(status)
+                        || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) {
+                    //remove previous mr job info
+                    super.onExecuteStart(executableContext);
+                } else {
+                    getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                }
+            } catch (IOException e) {
+                logger.warn("error get hadoop status");
+                super.onExecuteStart(executableContext);
+            }
+        } else {
+            super.onExecuteStart(executableContext);
+        }
+    }
+
+    private ExecuteResult onResumed(String appId, ExecutableManager mgr) throws ExecuteException {
+        Map<String, String> info = new HashMap<>();
+        try {
+            logger.info("livy spark_job_id:" + appId + " resumed");
+            info.put(ExecutableConstants.SPARK_JOB_ID, appId);
+
+            while (!isPaused() && !isDiscarded()) {
+                String status = getAppState(appId);
+
+                if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status)
+                        || LivyStateEnum.error.name().equalsIgnoreCase(status)
+                        || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) {
+                    mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, appId + " has failed");
+                    return new ExecuteResult(ExecuteResult.State.FAILED, appId + " has failed");
+                }
+
+                if (LivyStateEnum.success.name().equalsIgnoreCase(status)) {
+                    mgr.addJobInfo(getId(), info);
+                    return new ExecuteResult(ExecuteResult.State.SUCCEED, appId + " has finished");
+                }
+
+                Thread.sleep(5000);
+            }
+
+            killAppRetry(appId);
+
+            if (isDiscarded()) {
+                return new ExecuteResult(ExecuteResult.State.DISCARDED, appId + " is discarded");
+            } else {
+                return new ExecuteResult(ExecuteResult.State.STOPPED, appId + " is stopped");
+            }
+
+        } catch (Exception e) {
+            logger.error("error run spark job:", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+
+    }
+
+    @SuppressWarnings("checkstyle:methodlength")
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        ExecutableManager mgr = getManager();
+        Map<String, String> extra = mgr.getOutput(getId()).getExtra();
+        String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID);
+        if (!StringUtils.isEmpty(sparkJobId)) {
+            return onResumed(sparkJobId, mgr);
+        } else {
+            String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
+            CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
+            final KylinConfig config = cube.getConfig();
+
+            setAlgorithmLayer();
+
+            LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
+
+            String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+            CubeSegment segment = cube.getSegmentById(segmentID);
+            Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+            dumpMetadata(segment, mergingSeg);
+
+            Map<String, String> sparkConfs = config.getSparkConfigOverride();
+            String sparkConfigName = getSparkConfigName();
+            if (sparkConfigName != null) {
+                Map<String, String> sparkSpecificConfs = config.getSparkConfigOverrideWithSpecificName(sparkConfigName);
+                sparkConfs.putAll(sparkSpecificConfs);
+            }
+
+            for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
+                if (entry.getKey().equals("spark.submit.deployMode") || entry.getKey().equals("spark.master") || entry.getKey().equals("spark.yarn.archive")) {
+                    continue;
+                } else {
+                    livyRestBuilder.addConf(entry.getKey(), entry.getValue());
+                }
+            }
+            formatArgs(livyRestBuilder.getArgs());
+
+            final LivyRestExecutor executor = new LivyRestExecutor();
+            final PatternedLogger patternedLogger = new PatternedLogger(logger, (infoKey, info) -> {
+                // only care three properties here
+                if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
+                        || ExecutableConstants.YARN_APP_ID.equals(infoKey)
+                        || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
+                    getManager().addJobInfo(getId(), info);
+                }
+            });
+
+            try {
+                livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);
+                executor.execute(livyRestBuilder, patternedLogger);
+                if (isDiscarded()) {
+                    return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
+                }
+                if (isPaused()) {
+                    return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
+                }
+                // done, update all properties
+                Map<String, String> joblogInfo = patternedLogger.getInfo();
+                // read counter from hdfs
+                String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+                if (counterOutput != null) {
+                    if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) {
+                        Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
+                        joblogInfo.putAll(counterMap);
+                    } else {
+                        logger.warn("Spark counter output path not exists: " + counterOutput);
+                    }
+                }
+                readCounters(joblogInfo);
+                getManager().addJobInfo(getId(), joblogInfo);
+                return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
+
+            } catch (Exception e) {
+                logger.error("error run spark job:", e);
+                // clear SPARK_JOB_ID on job failure.
+                extra = mgr.getOutput(getId()).getExtra();
+                extra.put(ExecutableConstants.SPARK_JOB_ID, "");
+                getManager().addJobInfo(getId(), extra);
+                return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+            }
+        }
+    }
+
+    private String getAppState(String appId) throws IOException {
+        LivyRestExecutor executor = new LivyRestExecutor();
+        return executor.state(appId);
+    }
+
+    private void killApp(String appId) throws IOException, InterruptedException {
+        LivyRestExecutor executor = new LivyRestExecutor();
+        executor.kill(appId);
+    }
+
+    private int killAppRetry(String appId) throws IOException, InterruptedException {
+        String status = getAppState(appId);
+        if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status)
+                || LivyStateEnum.error.name().equalsIgnoreCase(status)
+                || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) {
+            logger.warn(appId + "is final state, no need to kill");
+            return 0;
+        }
+
+        killApp(appId);
+
+        status = getAppState(appId);
+        int retry = 0;
+        while (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status)
+                || LivyStateEnum.error.name().equalsIgnoreCase(status)
+                || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status) && retry < 5) {
+            killApp(appId);
+
+            Thread.sleep(1000);
+
+            status = getAppState(appId);
+            retry++;
+        }
+
+        if (Strings.isNullOrEmpty(status)) {
+            logger.info(appId + " killed successfully");
+            return 0;
+        } else {
+            logger.info(appId + " killed failed");
+            return 1;
+        }
+    }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala
new file mode 100644
index 0000000..8ea2cfa
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark
+
+import java.util.Locale
+
+import org.apache.commons.lang.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+object SparkSqlOnLivyBatch extends Logging{
+
+  def main(args: Array[String]) {
+
+    if (args.length != 1) {
+      log.info("Usage: SparkSqlOnLivyBatch <sqlstring>")
+      System.exit(1)
+    }
+
+    val sql : String = args(0)
+    log.info(String.format(Locale.ROOT, "Sql-Info : %s", sql))
+
+    val spark = SparkSession.builder().enableHiveSupport().appName("kylin-sql-livy").getOrCreate()
+
+    val sqlStrings = sql.split(";")
+
+    for (sqlString <- sqlStrings) {
+      var item = sqlString.trim()
+      if (item.length > 0) {
+        if (StringUtils.endsWith(item, "\\")) {
+          item = StringUtils.chop(item)
+        }
+        spark.sql(item)
+      }
+    }
+  }
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
new file mode 100644
index 0000000..3549630
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.livy.LivyRestBuilder;
+import org.apache.kylin.common.livy.LivyRestExecutor;
+import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+public class CreateFlatHiveTableByLivyStep extends AbstractExecutable {
+    private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableByLivyStep.class);
+    protected final PatternedLogger stepLogger = new PatternedLogger(logger);
+    private static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
+
+
+    protected void createFlatHiveTable(KylinConfig config) throws Exception {
+        final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
+        livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+        livyRestBuilder.addArgs(livyRestBuilder.parseProps() + getInitStatement() + getCreateTableStatement());
+
+        stepLogger.log("Create and distribute table. ");
+        livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
+
+        LivyRestExecutor executor = new LivyRestExecutor();
+        executor.execute(livyRestBuilder, stepLogger);
+
+        Map<String, String> info = stepLogger.getInfo();
+        //get the flat Hive table size
+        Matcher matcher = HDFS_LOCATION.matcher(getCreateTableStatement());
+        if (matcher.find()) {
+            String hiveFlatTableHdfsUrl = matcher.group(1);
+            long size = getFileSize(hiveFlatTableHdfsUrl);
+            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
+            logger.info("HDFS_Bytes_Writen: " + size);
+        }
+        getManager().addJobInfo(getId(), info);
+    }
+
+    private long getFileSize(String hdfsUrl) throws IOException {
+        Configuration configuration = new Configuration();
+        Path path = new Path(hdfsUrl);
+        FileSystem fs = path.getFileSystem(configuration);
+        ContentSummary contentSummary = fs.getContentSummary(path);
+        long length = contentSummary.getLength();
+        return length;
+    }
+
+
+    private KylinConfig getCubeSpecificConfig() {
+        String cubeName = CubingExecutableUtil.getCubeName(getParams());
+        CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubeInstance cube = manager.getCube(cubeName);
+        return cube.getConfig();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig config = getCubeSpecificConfig();
+        try {
+            createFlatHiveTable(config);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+
+        } catch (Exception e) {
+            logger.error("job:" + getId() + " execute finished with exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e);
+        }
+    }
+
+    public void setInitStatement(String sql) {
+        setParam("HiveInit", sql);
+    }
+
+    public String getInitStatement() {
+        return getParam("HiveInit");
+    }
+
+    public void setCreateTableStatement(String sql) {
+        setParam("HiveRedistributeData", sql);
+    }
+
+    public String getCreateTableStatement() {
+        return getParam("HiveRedistributeData");
+    }
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 2f25e50..4782920 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -44,6 +44,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -84,8 +85,13 @@ public class HiveInputBase {
 
             // then count and redistribute
             if (cubeConfig.isHiveRedistributeEnabled()) {
-                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
-                        cubeInstance.getDescriptor()));
+                final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                //jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
+                if (kylinConfig.enableLivy() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) {
+                    jobFlow.addTask(createRedistributeFlatHiveTableByLivyStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
+                } else {
+                    jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
+                }
             }
 
             // special for hive
@@ -97,7 +103,15 @@ public class HiveInputBase {
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
             final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
 
-            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+            final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+            if (kylinConfig.enableLivy() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) {
+                jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+            } else {
+                jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+            }
+            //jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
         }
 
         protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
@@ -152,6 +166,21 @@ public class HiveInputBase {
         return step;
     }
 
+    protected static AbstractExecutable createFlatHiveTableByLivyStep(String hiveInitStatements, String jobWorkingDir,
+                                                                      String cubeName, IJoinedFlatTableDesc flatDesc) {
+        //from hive to hive
+        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
+        String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+
+        CreateFlatHiveTableByLivyStep step = new CreateFlatHiveTableByLivyStep();
+        step.setInitStatement(hiveInitStatements);
+        step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
+        CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+        return step;
+    }
+
     protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
             IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
@@ -163,6 +192,17 @@ public class HiveInputBase {
         return step;
     }
 
+    protected static AbstractExecutable createRedistributeFlatHiveTableByLivyStep(String hiveInitStatements, String cubeName,
+                                                                                  IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+        RedistributeFlatHiveTableByLivyStep step = new RedistributeFlatHiveTableByLivyStep();
+        step.setInitStatement(hiveInitStatements);
+        step.setIntermediateTable(flatDesc.getTableName());
+        step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc, cubeDesc));
+        CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+        step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+        return step;
+    }
+
     protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
             String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) {
         ShellExecutable step = new ShellExecutable();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
new file mode 100644
index 0000000..4c07324
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.livy.LivyRestBuilder;
+import org.apache.kylin.common.livy.LivyRestExecutor;
+import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+
+public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable {
+    private final PatternedLogger stepLogger = new PatternedLogger(logger);
+
+    private long computeRowCount(String database, String table) throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        return hiveClient.getHiveTableRows(database, table);
+    }
+
+    private long getDataSize(String database, String table) throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        long size = hiveClient.getHiveTableMeta(database, table).fileSize;
+        return size;
+    }
+
+    private void redistributeTable(KylinConfig config, int numReducers) throws Exception {
+        final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
+        livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+        StringBuffer statement = new StringBuffer();
+        statement.append(livyRestBuilder.parseProps());
+        statement.append(getInitStatement());
+        statement.append("set mapreduce.job.reduces=" + numReducers + ";\n");
+        statement.append("set hive.merge.mapredfiles=false;\n");
+        statement.append(getRedistributeDataStatement());
+        livyRestBuilder.addArgs(statement.toString());
+        final String cmd = livyRestBuilder.toString();
+
+        stepLogger.log("Redistribute table, cmd: ");
+        stepLogger.log(cmd);
+
+        livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
+
+        LivyRestExecutor executor = new LivyRestExecutor();
+        executor.execute(livyRestBuilder, stepLogger);
+        getManager().addJobInfo(getId(), stepLogger.getInfo());
+    }
+
+    private KylinConfig getCubeSpecificConfig() {
+        String cubeName = CubingExecutableUtil.getCubeName(getParams());
+        CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubeInstance cube = manager.getCube(cubeName);
+        return cube.getConfig();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig config = getCubeSpecificConfig();
+        String intermediateTable = getIntermediateTable();
+        String database, tableName;
+        if (intermediateTable.indexOf(".") > 0) {
+            database = intermediateTable.substring(0, intermediateTable.indexOf("."));
+            tableName = intermediateTable.substring(intermediateTable.indexOf(".") + 1);
+        } else {
+            database = config.getHiveDatabaseForIntermediateTable();
+            tableName = intermediateTable;
+        }
+
+        try {
+            long rowCount = computeRowCount(database, tableName);
+            logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
+            if (rowCount == 0) {
+                if (!config.isEmptySegmentAllowed()) {
+                    stepLogger.log("Detect upstream hive table is empty, "
+                            + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
+                    return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+                } else {
+                    return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute");
+                }
+            }
+
+            int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+            int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+            numReducers = Math.max(1, numReducers);
+            numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber());
+
+            stepLogger.log("total input rows = " + rowCount);
+            stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+            stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
+
+            redistributeTable(config, numReducers);
+            long dataSize = getDataSize(database, tableName);
+            getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+
+        } catch (Exception e) {
+            logger.error("job:" + getId() + " execute finished with exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e);
+        }
+    }
+
+    public void setInitStatement(String sql) {
+        setParam("HiveInit", sql);
+    }
+
+    public String getInitStatement() {
+        return getParam("HiveInit");
+    }
+
+    public void setRedistributeDataStatement(String sql) {
+        setParam("HiveRedistributeData", sql);
+    }
+
+    public String getRedistributeDataStatement() {
+        return getParam("HiveRedistributeData");
+    }
+
+    public String getIntermediateTable() {
+        return getParam("intermediateTable");
+    }
+
+    public void setIntermediateTable(String intermediateTable) {
+        setParam("intermediateTable", intermediateTable);
+    }
+}
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index 86ad0fb..1f35de4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -25,6 +25,7 @@ import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2;
 import org.apache.kylin.engine.spark.SparkExecutable;
+import org.apache.kylin.engine.spark.SparkExecutableFactory;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
 
@@ -39,7 +40,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
         String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
 
         SparkBatchCubingJobBuilder2 jobBuilder2 = new SparkBatchCubingJobBuilder2(seg, null);
-        final SparkExecutable sparkExecutable = new SparkExecutable();
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
         sparkExecutable.setClassName(SparkCubeHFile.class.getName());
         sparkExecutable.setParam(SparkCubeHFile.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubeHFile.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());