You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/04 10:56:28 UTC
[kylin] branch master updated: KYLIN-3795 Submit Spark jobs via
Apache Livy
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 662a7f9 KYLIN-3795 Submit Spark jobs via Apache Livy
662a7f9 is described below
commit 662a7f95104b3e8c31026cef65192127a2ab2e15
Author: javalife0312 <ja...@126.com>
AuthorDate: Mon Mar 4 10:58:29 2019 +0800
KYLIN-3795 Submit Spark jobs via Apache Livy
---
.../org/apache/kylin/common/KylinConfigBase.java | 24 ++
.../apache/kylin/common/livy/LivyRestBuilder.java | 155 ++++++++++++
.../apache/kylin/common/livy/LivyRestClient.java | 136 ++++++++++
.../apache/kylin/common/livy/LivyRestExecutor.java | 122 +++++++++
.../apache/kylin/common/livy/LivyStateEnum.java | 25 ++
.../org/apache/kylin/common/livy/LivyTypeEnum.java | 25 ++
.../engine/spark/SparkBatchCubingJobBuilder2.java | 4 +-
.../engine/spark/SparkBatchMergeJobBuilder2.java | 4 +-
.../apache/kylin/engine/spark/SparkExecutable.java | 6 +-
.../kylin/engine/spark/SparkExecutableFactory.java | 32 +++
.../kylin/engine/spark/SparkExecutableLivy.java | 276 +++++++++++++++++++++
.../kylin/engine/spark/SparkSqlOnLivyBatch.scala | 52 ++++
.../source/hive/CreateFlatHiveTableByLivyStep.java | 124 +++++++++
.../apache/kylin/source/hive/HiveInputBase.java | 46 +++-
.../hive/RedistributeFlatHiveTableByLivyStep.java | 149 +++++++++++
.../kylin/storage/hbase/steps/HBaseSparkSteps.java | 3 +-
16 files changed, 1172 insertions(+), 11 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6457d0a..81979dc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1372,7 +1372,31 @@ public abstract class KylinConfigBase implements Serializable {
public boolean isSparkSanityCheckEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE));
}
+
+ // ============================================================================
+ // ENGINE.LIVY
+ // ============================================================================
+
+ public boolean enableLivy() {
+ return getOptional("kylin.engine.livy-conf.livy.enable", "false").equalsIgnoreCase("true") ? true : false;
+ }
+
+ public String getLivyUrl() {
+ return getOptional("kylin.engine.livy-conf.livy.url");
+ }
+ public Map<String, String> getLivyKey() {
+ return getPropertiesByPrefix("kylin.engine.livy-conf.livy-key.");
+ }
+
+ public Map<String, String> getLivyArr() {
+ return getPropertiesByPrefix("kylin.engine.livy-conf.livy-arr.");
+ }
+
+ public Map<String, String> getLivyMap() {
+ return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
+ }
+
// ============================================================================
// QUERY
// ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java
new file mode 100644
index 0000000..218b4e4
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.SourceConfigurationUtil;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ *
+ */
+public class LivyRestBuilder {
+ protected static final org.slf4j.Logger logger = LoggerFactory.getLogger(LivyRestBuilder.class);
+
+ final private KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final private Map<String, String> hiveConfProps = SourceConfigurationUtil.loadHiveConfiguration();
+
+ private String url;
+ private LivyTypeEnum livyTypeEnum;
+
+ private Map<String, String> keys;
+ private Map<String, String> arrs;
+ private Map<String, String> maps;
+
+ private ArrayList<String> args = new ArrayList<>();
+
+ public LivyRestBuilder() {
+ url = kylinConfig.getLivyUrl();
+
+ keys = kylinConfig.getLivyKey();
+ arrs = kylinConfig.getLivyArr();
+ maps = kylinConfig.getLivyMap();
+ }
+
+ public String build() throws JSONException {
+ try {
+
+ JSONObject postJson = new JSONObject();
+
+ if (LivyTypeEnum.sql.equals(livyTypeEnum)) {
+ postJson.put("className", "org.apache.kylin.engine.spark.SparkSqlOnLivyBatch");
+ postJson.put("args", args);
+ } else if (LivyTypeEnum.job.equals(livyTypeEnum)) {
+ postJson.put("className", "org.apache.kylin.common.util.SparkEntry");
+ postJson.put("args", args);
+ } else {
+ throw new IllegalArgumentException("unSupport livy type.");
+ }
+
+ //deal conf of key
+ keys.forEach((key, value) -> {
+ try {
+ postJson.put(key, value);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ });
+
+ //deal conf of arr
+ arrs.forEach((key, value) -> {
+ try {
+ postJson.put(key, Lists.newArrayList(value.split(",")));
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ });
+
+ //deal conf of map
+ JSONObject confJson = new JSONObject();
+ maps.forEach((key, value) -> {
+ try {
+ confJson.put(key, value);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ });
+ postJson.put("conf", confJson);
+
+ return postJson.toString();
+ } catch (JSONException e) {
+ e.printStackTrace();
+ throw new JSONException("create livy json error :" + e.getMessage());
+ }
+ }
+
+ public void overwriteHiveProps(Map<String, String> overwrites) {
+ this.hiveConfProps.putAll(overwrites);
+ }
+
+ public String parseProps() {
+ StringBuilder s = new StringBuilder();
+ for (Map.Entry<String, String> prop : hiveConfProps.entrySet()) {
+ s.append("set ");
+ s.append(prop.getKey());
+ s.append("=");
+ s.append(prop.getValue());
+ s.append("; \n");
+ }
+ return s.toString();
+ }
+
+ public void addArgs(String arg) {
+ this.args.add(arg);
+ }
+
+ public void addConf(String key, String value) {
+ this.maps.put(key, value);
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public ArrayList<String> getArgs() {
+ return args;
+ }
+
+ public void setArgs(ArrayList<String> args) {
+ this.args = args;
+ }
+
+ public LivyTypeEnum getLivyTypeEnum() {
+ return this.livyTypeEnum;
+ }
+
+ public void setLivyTypeEnum(LivyTypeEnum livyTypeEnum) {
+ this.livyTypeEnum = livyTypeEnum;
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
new file mode 100644
index 0000000..978b99d
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.kylin.common.KylinConfig;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ *
+ */
+public class LivyRestClient {
+
+ private int httpConnectionTimeoutMs = 30000;
+ private int httpSocketTimeoutMs = 120000;
+
+ protected String baseUrl;
+ protected DefaultHttpClient client;
+
+ final private KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ public LivyRestClient() {
+ init();
+ }
+
+
+ private void init() {
+ final HttpParams httpParams = new BasicHttpParams();
+ HttpConnectionParams.setSoTimeout(httpParams, httpSocketTimeoutMs);
+ HttpConnectionParams.setConnectionTimeout(httpParams, httpConnectionTimeoutMs);
+
+ PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
+ cm.setDefaultMaxPerRoute(config.getRestClientDefaultMaxPerRoute());
+ cm.setMaxTotal(config.getRestClientMaxTotal());
+
+ baseUrl = config.getLivyUrl();
+ client = new DefaultHttpClient(cm, httpParams);
+ }
+
+ public String livySubmitJobBatches(String jobJson) throws IOException {
+ String url = baseUrl + "/batches";
+ HttpPost post = newPost(url);
+
+ post.setEntity(new StringEntity(jobJson, "UTF-8"));
+ HttpResponse response = client.execute(post);
+ String content = getContent(response);
+ if (response.getStatusLine().getStatusCode() != 201) {
+ throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+ }
+ return content;
+ }
+
+
+ public String livyGetJobStatusBatches(String jobId) throws IOException {
+ String url = baseUrl + "/batches/" + jobId;
+ HttpGet get = new HttpGet(url);
+
+ HttpResponse response = client.execute(get);
+ String content = getContent(response);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+ }
+ return content;
+ }
+
+ public String livyDeleteBatches(String jobId) throws IOException {
+ String url = baseUrl + "/batches/" + jobId;
+ HttpDelete delete = new HttpDelete(url);
+
+ HttpResponse response = client.execute(delete);
+ String content = getContent(response);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+ }
+ return content;
+ }
+
+ private HttpPost newPost(String url) {
+ HttpPost post = new HttpPost(url);
+ addHttpHeaders(post);
+ return post;
+ }
+
+ private void addHttpHeaders(HttpRequestBase method) {
+ method.addHeader("Accept", "application/json, text/plain, */*");
+ method.addHeader("Content-Type", "application/json");
+ }
+
+ private String getContent(HttpResponse response) throws IOException {
+ InputStreamReader reader = null;
+ BufferedReader rd = null;
+ StringBuffer result = new StringBuffer();
+ try {
+ reader = new InputStreamReader(response.getEntity().getContent(), "UTF-8");
+ rd = new BufferedReader(reader);
+ String line;
+ while ((line = rd.readLine()) != null) {
+ result.append(line);
+ }
+ } finally {
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(rd);
+ }
+ return result.toString();
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java
new file mode 100644
index 0000000..03d66aa
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+import org.apache.kylin.common.util.Logger;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class LivyRestExecutor {
+
+ protected static final org.slf4j.Logger logger = LoggerFactory.getLogger(LivyRestExecutor.class);
+
+ public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) {
+ try {
+ long startTime = System.currentTimeMillis();
+
+ String dataJson = livyRestBuilder.build();
+
+ logAppender.log("Livy submit Json: ");
+ logAppender.log(dataJson + "\n");
+
+ LivyRestClient restClient = new LivyRestClient();
+ String result = restClient.livySubmitJobBatches(dataJson);
+
+ JSONObject resultJson = new JSONObject(result);
+ String state = resultJson.getString("state");
+ logAppender.log("Livy submit Result: " + state);
+ logger.info("Livy submit Result: {}", state);
+
+ livyLog(resultJson, logAppender);
+
+ final String livyTaskId = resultJson.getString("id");
+ while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state)
+ && !LivyStateEnum.error.toString().equalsIgnoreCase(state)
+ && !LivyStateEnum.dead.toString().equalsIgnoreCase(state)
+ && !LivyStateEnum.success.toString().equalsIgnoreCase(state)) {
+
+ String statusResult = restClient.livyGetJobStatusBatches(livyTaskId);
+ JSONObject stateJson = new JSONObject(statusResult);
+ if (!state.equalsIgnoreCase(stateJson.getString("state"))) {
+ logAppender.log("Livy status Result: " + stateJson.getString("state"));
+ livyLog(stateJson, logAppender);
+ }
+ state = stateJson.getString("state");
+ Thread.sleep(10*1000);
+ }
+ if (!LivyStateEnum.success.toString().equalsIgnoreCase(state)) {
+ logAppender.log("livy start execute failed. state is " + state);
+ logger.info("livy start execute failed. state is {}", state);
+ throw new RuntimeException("livy get status failed. state is " + state);
+ }
+ logAppender.log("costTime : " + (System.currentTimeMillis() - startTime) / 1000 + " s");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("livy execute failed. \n" + e.getMessage());
+ }
+ }
+
+ public String state(String batchId) {
+ try {
+ LivyRestClient restClient = new LivyRestClient();
+ String statusResult = restClient.livyGetJobStatusBatches(batchId);
+ JSONObject stateJson = new JSONObject(statusResult);
+ return stateJson.getString("state");
+ } catch (Exception e) {
+ e.printStackTrace();
+ return "";
+ }
+ }
+
+ public Boolean kill(String batchId) {
+ try {
+ LivyRestClient restClient = new LivyRestClient();
+ String statusResult = restClient.livyDeleteBatches(batchId);
+ JSONObject stateJson = new JSONObject(statusResult);
+ return stateJson.getString("msg").equalsIgnoreCase("deleted")? true: false;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ private void livyLog(JSONObject logInfo, Logger logger) {
+ if (logInfo.has("log")) {
+ try {
+ JSONArray logArray = logInfo.getJSONArray("log");
+
+ for (int i=0;i<logArray.length();i++) {
+ String info = logArray.getString(i);
+ logger.log(info);
+ }
+
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ logInfo.remove("log");
+ logger.log(logInfo.toString());
+ }
+ }
+
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java
new file mode 100644
index 0000000..30c7188
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+/**
+ */
+public enum LivyStateEnum {
+ starting, running, success, dead, error, not_started, idle, busy, shutting_down;
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java
new file mode 100644
index 0000000..e4a44b1
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.livy;
+
+/**
+ */
+public enum LivyTypeEnum {
+ sql, job;
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 3f3c14d..426a73f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -88,7 +88,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
}
public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
- final SparkExecutable sparkExecutable = new SparkExecutable();
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
@@ -115,7 +115,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
}
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
- final SparkExecutable sparkExecutable = new SparkExecutable();
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath);
result.addTask(sparkExecutable);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index eb67fef..21599ff 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -77,7 +77,7 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
}
public SparkExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
- final SparkExecutable sparkExecutable = new SparkExecutable();
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
sparkExecutable.setClassName(SparkMergingDictionary.class.getName());
sparkExecutable.setParam(SparkMergingDictionary.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
@@ -108,7 +108,7 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
String outputPath = getCuboidRootPath(jobID);
- final SparkExecutable sparkExecutable = new SparkExecutable();
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
sparkExecutable.setClassName(SparkCubingMerge.class.getName());
sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 8c4b99d..8535212 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -349,7 +349,7 @@ public class SparkExecutable extends AbstractExecutable {
}
}
- private void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException {
+ protected void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException {
try {
if (mergingSeg == null || mergingSeg.size() == 0) {
attachSegmentMetadataWithDict(segment);
@@ -365,7 +365,7 @@ public class SparkExecutable extends AbstractExecutable {
}
// Spark Cubing can only work in layer algorithm
- private void setAlgorithmLayer() {
+ protected void setAlgorithmLayer() {
ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
CubingJob cubingJob = (CubingJob) execMgr.getJob(this.getParam(JOB_ID));
cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
@@ -443,7 +443,7 @@ public class SparkExecutable extends AbstractExecutable {
this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
}
- private void readCounters(final Map<String, String> info) {
+ protected void readCounters(final Map<String, String> info) {
String counter_save_as = getCounterSaveAs();
if (counter_save_as != null) {
String[] saveAsNames = counter_save_as.split(",");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java
new file mode 100644
index 0000000..bc59c12
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class SparkExecutableFactory {
+
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutableFactory.class);
+
+ public static SparkExecutable instance(boolean livy) {
+ return livy ? new SparkExecutableLivy() : new SparkExecutable();
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
new file mode 100644
index 0000000..d512104
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.livy.LivyRestBuilder;
+import org.apache.kylin.common.livy.LivyRestExecutor;
+import org.apache.kylin.common.livy.LivyStateEnum;
+import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.metadata.model.Segments;
+import org.apache.parquet.Strings;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class SparkExecutableLivy extends SparkExecutable {
+
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutableLivy.class);
+
+ private static final String CLASS_NAME = "className";
+ private static final String JARS = "jars";
+ private static final String JOB_ID = "jobId";
+ private static final String COUNTER_SAVE_AS = "CounterSaveAs";
+ private static final String CONFIG_NAME = "configName";
+
+ public void formatArgs(List<String> args) {
+ //-className must first
+ for (Map.Entry<String, String> entry : getParams().entrySet()) {
+ if (entry.getKey().equals(CLASS_NAME)) {
+ args.add("-" + entry.getKey());
+ args.add(entry.getValue());
+ break;
+ }
+ }
+ for (Map.Entry<String, String> entry : getParams().entrySet()) {
+ if (entry.getKey().equals(CLASS_NAME) || entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)
+ || entry.getKey().equals(COUNTER_SAVE_AS) || entry.getKey().equals(CONFIG_NAME)) {
+ continue;
+ } else {
+ args.add("-" + entry.getKey());
+ args.add(entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ protected void onExecuteStart(ExecutableContext executableContext) {
+ final Output output = getOutput();
+ if (output.getExtra().containsKey(START_TIME)) {
+ final String sparkJobID = output.getExtra().get(ExecutableConstants.SPARK_JOB_ID);
+ if (sparkJobID == null) {
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ return;
+ }
+ try {
+ String status = getAppState(sparkJobID);
+ if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status)
+ || LivyStateEnum.error.name().equalsIgnoreCase(status)
+ || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) {
+ //remove previous mr job info
+ super.onExecuteStart(executableContext);
+ } else {
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ }
+ } catch (IOException e) {
+ logger.warn("error get hadoop status");
+ super.onExecuteStart(executableContext);
+ }
+ } else {
+ super.onExecuteStart(executableContext);
+ }
+ }
+
+ private ExecuteResult onResumed(String appId, ExecutableManager mgr) throws ExecuteException {
+ Map<String, String> info = new HashMap<>();
+ try {
+ logger.info("livy spark_job_id:" + appId + " resumed");
+ info.put(ExecutableConstants.SPARK_JOB_ID, appId);
+
+ while (!isPaused() && !isDiscarded()) {
+ String status = getAppState(appId);
+
+ if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status)
+ || LivyStateEnum.error.name().equalsIgnoreCase(status)
+ || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) {
+ mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, appId + " has failed");
+ return new ExecuteResult(ExecuteResult.State.FAILED, appId + " has failed");
+ }
+
+ if (LivyStateEnum.success.name().equalsIgnoreCase(status)) {
+ mgr.addJobInfo(getId(), info);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, appId + " has finished");
+ }
+
+ Thread.sleep(5000);
+ }
+
+ killAppRetry(appId);
+
+ if (isDiscarded()) {
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, appId + " is discarded");
+ } else {
+ return new ExecuteResult(ExecuteResult.State.STOPPED, appId + " is stopped");
+ }
+
+ } catch (Exception e) {
+ logger.error("error run spark job:", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+
+ }
+
+ @SuppressWarnings("checkstyle:methodlength")
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ ExecutableManager mgr = getManager();
+ Map<String, String> extra = mgr.getOutput(getId()).getExtra();
+ String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID);
+ if (!StringUtils.isEmpty(sparkJobId)) {
+ return onResumed(sparkJobId, mgr);
+ } else {
+ String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
+ CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
+ final KylinConfig config = cube.getConfig();
+
+ setAlgorithmLayer();
+
+ LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
+
+ String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+ CubeSegment segment = cube.getSegmentById(segmentID);
+ Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+ dumpMetadata(segment, mergingSeg);
+
+ Map<String, String> sparkConfs = config.getSparkConfigOverride();
+ String sparkConfigName = getSparkConfigName();
+ if (sparkConfigName != null) {
+ Map<String, String> sparkSpecificConfs = config.getSparkConfigOverrideWithSpecificName(sparkConfigName);
+ sparkConfs.putAll(sparkSpecificConfs);
+ }
+
+ for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
+ if (entry.getKey().equals("spark.submit.deployMode") || entry.getKey().equals("spark.master") || entry.getKey().equals("spark.yarn.archive")) {
+ continue;
+ } else {
+ livyRestBuilder.addConf(entry.getKey(), entry.getValue());
+ }
+ }
+ formatArgs(livyRestBuilder.getArgs());
+
+ final LivyRestExecutor executor = new LivyRestExecutor();
+ final PatternedLogger patternedLogger = new PatternedLogger(logger, (infoKey, info) -> {
+ // only care three properties here
+ if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
+ || ExecutableConstants.YARN_APP_ID.equals(infoKey)
+ || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
+ getManager().addJobInfo(getId(), info);
+ }
+ });
+
+ try {
+ livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);
+ executor.execute(livyRestBuilder, patternedLogger);
+ if (isDiscarded()) {
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
+ }
+ if (isPaused()) {
+ return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
+ }
+ // done, update all properties
+ Map<String, String> joblogInfo = patternedLogger.getInfo();
+ // read counter from hdfs
+ String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+ if (counterOutput != null) {
+ if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) {
+ Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
+ joblogInfo.putAll(counterMap);
+ } else {
+ logger.warn("Spark counter output path not exists: " + counterOutput);
+ }
+ }
+ readCounters(joblogInfo);
+ getManager().addJobInfo(getId(), joblogInfo);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
+
+ } catch (Exception e) {
+ logger.error("error run spark job:", e);
+ // clear SPARK_JOB_ID on job failure.
+ extra = mgr.getOutput(getId()).getExtra();
+ extra.put(ExecutableConstants.SPARK_JOB_ID, "");
+ getManager().addJobInfo(getId(), extra);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+ }
+ }
+ }
+
+ private String getAppState(String appId) throws IOException {
+ LivyRestExecutor executor = new LivyRestExecutor();
+ return executor.state(appId);
+ }
+
+ private void killApp(String appId) throws IOException, InterruptedException {
+ LivyRestExecutor executor = new LivyRestExecutor();
+ executor.kill(appId);
+ }
+
+ private int killAppRetry(String appId) throws IOException, InterruptedException {
+ String status = getAppState(appId);
+ if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status)
+ || LivyStateEnum.error.name().equalsIgnoreCase(status)
+ || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) {
+ logger.warn(appId + "is final state, no need to kill");
+ return 0;
+ }
+
+ killApp(appId);
+
+ status = getAppState(appId);
+ int retry = 0;
+ while (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status)
+ || LivyStateEnum.error.name().equalsIgnoreCase(status)
+ || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status) && retry < 5) {
+ killApp(appId);
+
+ Thread.sleep(1000);
+
+ status = getAppState(appId);
+ retry++;
+ }
+
+ if (Strings.isNullOrEmpty(status)) {
+ logger.info(appId + " killed successfully");
+ return 0;
+ } else {
+ logger.info(appId + " killed failed");
+ return 1;
+ }
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala
new file mode 100644
index 0000000..8ea2cfa
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark
+
+import java.util.Locale
+
+import org.apache.commons.lang.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+object SparkSqlOnLivyBatch extends Logging{
+
+ def main(args: Array[String]) {
+
+ if (args.length != 1) {
+ log.info("Usage: SparkSqlOnLivyBatch <sqlstring>")
+ System.exit(1)
+ }
+
+ val sql : String = args(0)
+ log.info(String.format(Locale.ROOT, "Sql-Info : %s", sql))
+
+ val spark = SparkSession.builder().enableHiveSupport().appName("kylin-sql-livy").getOrCreate()
+
+ val sqlStrings = sql.split(";")
+
+ for (sqlString <- sqlStrings) {
+ var item = sqlString.trim()
+ if (item.length > 0) {
+ if (StringUtils.endsWith(item, "\\")) {
+ item = StringUtils.chop(item)
+ }
+ spark.sql(item)
+ }
+ }
+ }
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
new file mode 100644
index 0000000..3549630
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.livy.LivyRestBuilder;
+import org.apache.kylin.common.livy.LivyRestExecutor;
+import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+public class CreateFlatHiveTableByLivyStep extends AbstractExecutable {
+ private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableByLivyStep.class);
+ protected final PatternedLogger stepLogger = new PatternedLogger(logger);
+ private static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
+
+
+ protected void createFlatHiveTable(KylinConfig config) throws Exception {
+ final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
+ livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+ livyRestBuilder.addArgs(livyRestBuilder.parseProps() + getInitStatement() + getCreateTableStatement());
+
+ stepLogger.log("Create and distribute table. ");
+ livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
+
+ LivyRestExecutor executor = new LivyRestExecutor();
+ executor.execute(livyRestBuilder, stepLogger);
+
+ Map<String, String> info = stepLogger.getInfo();
+ //get the flat Hive table size
+ Matcher matcher = HDFS_LOCATION.matcher(getCreateTableStatement());
+ if (matcher.find()) {
+ String hiveFlatTableHdfsUrl = matcher.group(1);
+ long size = getFileSize(hiveFlatTableHdfsUrl);
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
+ logger.info("HDFS_Bytes_Writen: " + size);
+ }
+ getManager().addJobInfo(getId(), info);
+ }
+
+ private long getFileSize(String hdfsUrl) throws IOException {
+ Configuration configuration = new Configuration();
+ Path path = new Path(hdfsUrl);
+ FileSystem fs = path.getFileSystem(configuration);
+ ContentSummary contentSummary = fs.getContentSummary(path);
+ long length = contentSummary.getLength();
+ return length;
+ }
+
+
+ private KylinConfig getCubeSpecificConfig() {
+ String cubeName = CubingExecutableUtil.getCubeName(getParams());
+ CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = manager.getCube(cubeName);
+ return cube.getConfig();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig config = getCubeSpecificConfig();
+ try {
+ createFlatHiveTable(config);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+
+ } catch (Exception e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e);
+ }
+ }
+
+ public void setInitStatement(String sql) {
+ setParam("HiveInit", sql);
+ }
+
+ public String getInitStatement() {
+ return getParam("HiveInit");
+ }
+
+ public void setCreateTableStatement(String sql) {
+ setParam("HiveRedistributeData", sql);
+ }
+
+ public String getCreateTableStatement() {
+ return getParam("HiveRedistributeData");
+ }
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 2f25e50..4782920 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -44,6 +44,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.JoinTableDesc;
import org.apache.kylin.metadata.model.TableDesc;
@@ -84,8 +85,13 @@ public class HiveInputBase {
// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
- jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
- cubeInstance.getDescriptor()));
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ //jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
+ if (kylinConfig.enableLivy() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) {
+ jobFlow.addTask(createRedistributeFlatHiveTableByLivyStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
+ } else {
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
+ }
}
// special for hive
@@ -97,7 +103,15 @@ public class HiveInputBase {
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
- jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+ if (kylinConfig.enableLivy() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) {
+ jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+ } else {
+ jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+ }
+ //jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
}
protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
@@ -152,6 +166,21 @@ public class HiveInputBase {
return step;
}
+ protected static AbstractExecutable createFlatHiveTableByLivyStep(String hiveInitStatements, String jobWorkingDir,
+ String cubeName, IJoinedFlatTableDesc flatDesc) {
+ //from hive to hive
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
+ String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+
+ CreateFlatHiveTableByLivyStep step = new CreateFlatHiveTableByLivyStep();
+ step.setInitStatement(hiveInitStatements);
+ step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+ return step;
+ }
+
protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
@@ -163,6 +192,17 @@ public class HiveInputBase {
return step;
}
+ protected static AbstractExecutable createRedistributeFlatHiveTableByLivyStep(String hiveInitStatements, String cubeName,
+ IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+ RedistributeFlatHiveTableByLivyStep step = new RedistributeFlatHiveTableByLivyStep();
+ step.setInitStatement(hiveInitStatements);
+ step.setIntermediateTable(flatDesc.getTableName());
+ step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc, cubeDesc));
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+ return step;
+ }
+
protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) {
ShellExecutable step = new ShellExecutable();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
new file mode 100644
index 0000000..4c07324
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.livy.LivyRestBuilder;
+import org.apache.kylin.common.livy.LivyRestExecutor;
+import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+
+public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable {
+ private final PatternedLogger stepLogger = new PatternedLogger(logger);
+
+ private long computeRowCount(String database, String table) throws Exception {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ return hiveClient.getHiveTableRows(database, table);
+ }
+
+ private long getDataSize(String database, String table) throws Exception {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ long size = hiveClient.getHiveTableMeta(database, table).fileSize;
+ return size;
+ }
+
+ private void redistributeTable(KylinConfig config, int numReducers) throws Exception {
+ final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
+ livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+ StringBuffer statement = new StringBuffer();
+ statement.append(livyRestBuilder.parseProps());
+ statement.append(getInitStatement());
+ statement.append("set mapreduce.job.reduces=" + numReducers + ";\n");
+ statement.append("set hive.merge.mapredfiles=false;\n");
+ statement.append(getRedistributeDataStatement());
+ livyRestBuilder.addArgs(statement.toString());
+ final String cmd = livyRestBuilder.toString();
+
+ stepLogger.log("Redistribute table, cmd: ");
+ stepLogger.log(cmd);
+
+ livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql);
+
+ LivyRestExecutor executor = new LivyRestExecutor();
+ executor.execute(livyRestBuilder, stepLogger);
+ getManager().addJobInfo(getId(), stepLogger.getInfo());
+ }
+
+ private KylinConfig getCubeSpecificConfig() {
+ String cubeName = CubingExecutableUtil.getCubeName(getParams());
+ CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = manager.getCube(cubeName);
+ return cube.getConfig();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig config = getCubeSpecificConfig();
+ String intermediateTable = getIntermediateTable();
+ String database, tableName;
+ if (intermediateTable.indexOf(".") > 0) {
+ database = intermediateTable.substring(0, intermediateTable.indexOf("."));
+ tableName = intermediateTable.substring(intermediateTable.indexOf(".") + 1);
+ } else {
+ database = config.getHiveDatabaseForIntermediateTable();
+ tableName = intermediateTable;
+ }
+
+ try {
+ long rowCount = computeRowCount(database, tableName);
+ logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
+ if (rowCount == 0) {
+ if (!config.isEmptySegmentAllowed()) {
+ stepLogger.log("Detect upstream hive table is empty, "
+ + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ } else {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute");
+ }
+ }
+
+ int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+ int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+ numReducers = Math.max(1, numReducers);
+ numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber());
+
+ stepLogger.log("total input rows = " + rowCount);
+ stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+ stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
+
+ redistributeTable(config, numReducers);
+ long dataSize = getDataSize(database, tableName);
+ getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+
+ } catch (Exception e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e);
+ }
+ }
+
+ public void setInitStatement(String sql) {
+ setParam("HiveInit", sql);
+ }
+
+ public String getInitStatement() {
+ return getParam("HiveInit");
+ }
+
+ public void setRedistributeDataStatement(String sql) {
+ setParam("HiveRedistributeData", sql);
+ }
+
+ public String getRedistributeDataStatement() {
+ return getParam("HiveRedistributeData");
+ }
+
+ public String getIntermediateTable() {
+ return getParam("intermediateTable");
+ }
+
+ public void setIntermediateTable(String intermediateTable) {
+ setParam("intermediateTable", intermediateTable);
+ }
+}
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index 86ad0fb..1f35de4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -25,6 +25,7 @@ import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2;
import org.apache.kylin.engine.spark.SparkExecutable;
+import org.apache.kylin.engine.spark.SparkExecutableFactory;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -39,7 +40,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
SparkBatchCubingJobBuilder2 jobBuilder2 = new SparkBatchCubingJobBuilder2(seg, null);
- final SparkExecutable sparkExecutable = new SparkExecutable();
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy());
sparkExecutable.setClassName(SparkCubeHFile.class.getName());
sparkExecutable.setParam(SparkCubeHFile.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkCubeHFile.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());