You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/28 11:42:42 UTC
[kylin] branch master updated: KYLIN-4224 Create flat table wich
spark sql
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new ae6120b KYLIN-4224 Create flat table wich spark sql
ae6120b is described below
commit ae6120b849c499956a11814d817fb9506e6dd2d6
Author: weibin0516 <co...@126.com>
AuthorDate: Sun Nov 17 13:15:10 2019 +0800
KYLIN-4224 Create flat table wich spark sql
---
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../kylin/engine/mr/common/BatchConstants.java | 3 +
.../kylin/engine/spark/SparkCreatingFlatTable.java | 43 +++++++
.../apache/kylin/engine/spark/SparkExecutable.java | 7 +-
.../apache/kylin/engine/spark/SparkSqlBatch.java | 131 +++++++++++++++++++++
.../apache/kylin/source/hive/HiveInputBase.java | 73 +++++++++++-
webapp/app/partials/jobs/job_steps.html | 2 +-
7 files changed, 254 insertions(+), 6 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 2d16cdc..73c5ecc 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -42,6 +42,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
public static final String STEP_NAME_BUILD_SPARK_UHC_DICTIONARY = "Build UHC Dictionary with spark";
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+ public static final String STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK = "Create Intermediate Flat Table With Spark";
public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table";
public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d20de2c..3fffad2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -109,6 +109,9 @@ public interface BatchConstants {
String ARG_HBASE_CONF_PATH = "hbaseConfPath";
String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
String ARG_COUNTER_OUTPUT = "counterOutput";
+ String ARG_BASE64_ENCODED_STEP_NAME = "base64StepName";
+ String ARG_SQL_COUNT = "sqlCount";
+ String ARG_BASE64_ENCODED_SQL = "base64EncodedSql";
/**
* logger and counter
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java
new file mode 100644
index 0000000..2bddd16
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+public class SparkCreatingFlatTable extends SparkSqlBatch {
+ public static final int SQL_COUNT = 5;
+
+ public SparkCreatingFlatTable() {
+ super();
+
+ for (int i = 0; i < SQL_COUNT; i++) {
+ getOptions().addOption(getSqlOption(i));
+ }
+ }
+
+ public static Option getSqlOption(int index) {
+ return OptionBuilder.withArgName(BatchConstants.ARG_SQL_COUNT + String.valueOf(index))
+ .hasArg()
+ .isRequired(true)
+ .withDescription("Sql0")
+ .create(BatchConstants.ARG_BASE64_ENCODED_SQL + String.valueOf(index));
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index c2045c0..1eeab04 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -246,7 +246,8 @@ public class SparkExecutable extends AbstractExecutable {
if (StringUtils.isEmpty(jars)) {
jars = jobJar;
}
- if (cube != null) {
+
+ if (cube != null && !isCreateFlatTable()) {
setAlgorithmLayer();
String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
CubeSegment segment = cube.getSegmentById(segmentID);
@@ -535,4 +536,8 @@ public class SparkExecutable extends AbstractExecutable {
info.put(saveAsNames[i].trim(), counter);
}
}
+
+ private boolean isCreateFlatTable() {
+ return ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK.equals(getName());
+ }
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java
new file mode 100644
index 0000000..1881dc8
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import org.apache.spark.sql.SparkSession;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * Execute a batch of spark sql in order, if a sql execution fails, abort and throw an exception,
+ * no longer execute the left sqls.
+ */
+public class SparkSqlBatch extends AbstractApplication implements Serializable {
+ private final Logger logger = LoggerFactory.getLogger(SparkSqlBatch.class);
+ private Options options;
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME)
+ .hasArg()
+ .isRequired(true)
+ .withDescription("Cube Name")
+ .create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_STEP_NAME = OptionBuilder.withArgName(BatchConstants.ARG_BASE64_ENCODED_STEP_NAME)
+ .hasArg()
+ .isRequired(true)
+ .withDescription("Step Name")
+ .create(BatchConstants.ARG_BASE64_ENCODED_STEP_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID)
+ .hasArg()
+ .isRequired(true)
+ .withDescription("Cube Segment Id")
+ .create(BatchConstants.ARG_SEGMENT_ID);
+ public static final Option OPTION_SQL_COUNT = OptionBuilder.withArgName(BatchConstants.ARG_SQL_COUNT)
+ .hasArg()
+ .isRequired(true)
+ .withDescription("Sql count")
+ .create(BatchConstants.ARG_SQL_COUNT);
+
+ public SparkSqlBatch() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_STEP_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_SQL_COUNT);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String stepName = base64Decode(optionsHelper.getOptionValue(OPTION_STEP_NAME));
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String sqlCountStr = optionsHelper.getOptionValue(OPTION_SQL_COUNT);
+ logger.info("start execute sql batch job, cubeName: " + cubeName + ", stepName: " +
+ stepName + ", segmentId: " + segmentId + ", sqlCount: " + sqlCountStr);
+
+ int sqlCount = Integer.valueOf(sqlCountStr);
+ if (sqlCount <= 0) {
+ throw new RuntimeException("Count of sqls to execute must be greater than 0, " +
+ "in fact is " + sqlCountStr);
+ }
+
+ SparkSession sparkSession = getSparkSession(stepName + " for cube: " +
+ cubeName + ", segment " + segmentId);
+ for (int i = 0; i < sqlCount; i++) {
+ String argName = BatchConstants.ARG_BASE64_ENCODED_SQL + String.valueOf(i);
+ Option optionSqlText = OptionBuilder.withArgName(argName)
+ .hasArg()
+ .isRequired(true)
+ .create(argName);
+ String encodedSql = optionsHelper.getOptionValue(optionSqlText);
+ String sqlText = base64Decode(encodedSql).trim();
+ if (null != sqlText && sqlText.endsWith(";")) {
+ sqlText = sqlText.substring(0, sqlText.length() - 1);
+ }
+ logger.info("execute spark sql: " + sqlText);
+ if (i == sqlCount - 1) {
+ sparkSession.sql(sqlText).count();
+ } else {
+ sparkSession.sql(sqlText);
+ }
+ }
+ }
+
+ private SparkSession getSparkSession(String appName) {
+ return SparkSession.builder()
+ .appName(appName)
+ .enableHiveSupport()
+ .getOrCreate();
+ }
+
+ private String base64Decode(String str) throws UnsupportedEncodingException {
+ return new String(
+ Base64.getDecoder().decode(str.getBytes(StandardCharsets.UTF_8)),
+ StandardCharsets.UTF_8
+ );
+ }
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 469722a..881c8d5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -19,14 +19,16 @@
package org.apache.kylin.source.hive;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
+import java.util.Base64;
import java.util.Objects;
import java.util.Set;
+import java.util.Locale;
+import java.util.Collections;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
@@ -41,6 +43,10 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.IInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.SparkCreatingFlatTable;
+import org.apache.kylin.engine.spark.SparkExecutable;
+import org.apache.kylin.engine.spark.SparkExecutableFactory;
+import org.apache.kylin.engine.spark.SparkSqlBatch;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -254,8 +260,14 @@ public class HiveInputBase {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- if (kylinConfig.isLivyEnabled() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) {
- jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+ if (cubeInstance.getEngineType() == IEngineAware.ID_SPARK) {
+ if (kylinConfig.isLivyEnabled()) {
+ jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements,
+ jobWorkingDir, cubeName, flatDesc));
+ } else {
+ jobFlow.addTask(createFlatHiveTableBySparkSql(hiveInitStatements,
+ jobWorkingDir, cubeName, flatDesc));
+ }
} else {
jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
}
@@ -342,6 +354,59 @@ public class HiveInputBase {
return step;
}
+ protected static AbstractExecutable createFlatHiveTableBySparkSql(String hiveInitStatements,
+ String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) {
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc,
+ jobWorkingDir);
+ String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+
+ KylinConfig config = flatDesc.getSegment().getConfig();
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(config);
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK);
+ sparkExecutable.setClassName(SparkCreatingFlatTable.class.getName());
+
+ sparkExecutable.setParam(SparkSqlBatch.OPTION_CUBE_NAME.getOpt(), cubeName);
+ sparkExecutable.setParam(SparkSqlBatch.OPTION_STEP_NAME.getOpt(),
+ base64EncodeStr(ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK));
+ sparkExecutable.setParam(SparkSqlBatch.OPTION_SEGMENT_ID.getOpt(),
+ flatDesc.getSegment().getName());
+ sparkExecutable.setParam(SparkSqlBatch.OPTION_SQL_COUNT.getOpt(),
+ String.valueOf(SparkCreatingFlatTable.SQL_COUNT));
+
+ sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(0).getOpt(),
+ base64EncodeStr(hiveInitStatements));
+ sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(1).getOpt(),
+ base64EncodeStr(dropTableHql));
+
+ // createTableHql include create table sql and alter table sql
+ String[] sqlArr = createTableHql.trim().split(";");
+ if (2 != sqlArr.length) {
+ throw new RuntimeException("create table hql should combined by a create table sql " +
+ "and a alter sql, but got: " + createTableHql);
+ }
+ sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(2).getOpt(),
+ base64EncodeStr(sqlArr[0]));
+ sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(3).getOpt(),
+ base64EncodeStr(sqlArr[1]));
+
+ sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(4).getOpt(),
+ base64EncodeStr(insertDataHqls));
+
+ StringBuilder jars = new StringBuilder();
+ StringUtil.appendWithSeparator(jars, config.getSparkAdditionalJars());
+ sparkExecutable.setJars(jars.toString());
+
+ return sparkExecutable;
+ }
+
+ private static String base64EncodeStr(String str) {
+ return new String(
+ Base64.getEncoder().encode(str.getBytes(StandardCharsets.UTF_8)),
+ StandardCharsets.UTF_8
+ );
+ }
+
protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
diff --git a/webapp/app/partials/jobs/job_steps.html b/webapp/app/partials/jobs/job_steps.html
index e26c4c3..993b302 100644
--- a/webapp/app/partials/jobs/job_steps.html
+++ b/webapp/app/partials/jobs/job_steps.html
@@ -134,7 +134,7 @@
<a ng-if="step.info.yarn_application_tracking_url"
href="{{step.info.yarn_application_tracking_url}}" target="_blank"
- tooltip="MRJob">
+ tooltip="Job">
<i class="ace-icon fa fa-tasks yellow bigger-110"></i>
</a>
<a ng-if="config.reference_links && config.reference_links['diagnostic'].link" href="{{config.reference_links['diagnostic'].link + step.info.mr_job_id}}" target="_blank"