You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/28 11:42:42 UTC

[kylin] branch master updated: KYLIN-4224 Create flat table wich spark sql

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new ae6120b  KYLIN-4224 Create flat table wich spark sql
ae6120b is described below

commit ae6120b849c499956a11814d817fb9506e6dd2d6
Author: weibin0516 <co...@126.com>
AuthorDate: Sun Nov 17 13:15:10 2019 +0800

    KYLIN-4224 Create flat table wich spark sql
---
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/engine/mr/common/BatchConstants.java     |   3 +
 .../kylin/engine/spark/SparkCreatingFlatTable.java |  43 +++++++
 .../apache/kylin/engine/spark/SparkExecutable.java |   7 +-
 .../apache/kylin/engine/spark/SparkSqlBatch.java   | 131 +++++++++++++++++++++
 .../apache/kylin/source/hive/HiveInputBase.java    |  73 +++++++++++-
 webapp/app/partials/jobs/job_steps.html            |   2 +-
 7 files changed, 254 insertions(+), 6 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 2d16cdc..73c5ecc 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -42,6 +42,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
     public static final String STEP_NAME_BUILD_SPARK_UHC_DICTIONARY = "Build UHC Dictionary with spark";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+    public static final String STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK = "Create Intermediate Flat Table With Spark";
     public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d20de2c..3fffad2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -109,6 +109,9 @@ public interface BatchConstants {
     String ARG_HBASE_CONF_PATH = "hbaseConfPath";
     String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
     String ARG_COUNTER_OUTPUT = "counterOutput";
+    String ARG_BASE64_ENCODED_STEP_NAME = "base64StepName";
+    String ARG_SQL_COUNT = "sqlCount";
+    String ARG_BASE64_ENCODED_SQL = "base64EncodedSql";
 
     /**
      * logger and counter
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java
new file mode 100644
index 0000000..2bddd16
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCreatingFlatTable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+public class SparkCreatingFlatTable extends SparkSqlBatch {
+    public static final int SQL_COUNT = 5;
+
+    public SparkCreatingFlatTable() {
+        super();
+
+        for (int i = 0; i < SQL_COUNT; i++) {
+            getOptions().addOption(getSqlOption(i));
+        }
+    }
+
+    public static Option getSqlOption(int index) {
+        return OptionBuilder.withArgName(BatchConstants.ARG_SQL_COUNT + String.valueOf(index))
+                .hasArg()
+                .isRequired(true)
+                .withDescription("Sql0")
+                .create(BatchConstants.ARG_BASE64_ENCODED_SQL + String.valueOf(index));
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index c2045c0..1eeab04 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -246,7 +246,8 @@ public class SparkExecutable extends AbstractExecutable {
             if (StringUtils.isEmpty(jars)) {
                 jars = jobJar;
             }
-            if (cube != null) {
+
+            if (cube != null && !isCreateFlatTable()) {
                 setAlgorithmLayer();
                 String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
                 CubeSegment segment = cube.getSegmentById(segmentID);
@@ -535,4 +536,8 @@ public class SparkExecutable extends AbstractExecutable {
             info.put(saveAsNames[i].trim(), counter);
         }
     }
+
+    private boolean isCreateFlatTable() {
+        return ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK.equals(getName());
+    }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java
new file mode 100644
index 0000000..1881dc8
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlBatch.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import org.apache.spark.sql.SparkSession;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * Execute a batch of spark sql in order, if a sql execution fails, abort and throw an exception,
+ * no longer execute the left sqls.
+ */
+public class SparkSqlBatch extends AbstractApplication implements Serializable {
+    private final Logger logger = LoggerFactory.getLogger(SparkSqlBatch.class);
+    private Options options;
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME)
+            .hasArg()
+            .isRequired(true)
+            .withDescription("Cube Name")
+            .create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_STEP_NAME = OptionBuilder.withArgName(BatchConstants.ARG_BASE64_ENCODED_STEP_NAME)
+            .hasArg()
+            .isRequired(true)
+            .withDescription("Step Name")
+            .create(BatchConstants.ARG_BASE64_ENCODED_STEP_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID)
+            .hasArg()
+            .isRequired(true)
+            .withDescription("Cube Segment Id")
+            .create(BatchConstants.ARG_SEGMENT_ID);
+    public static final Option OPTION_SQL_COUNT = OptionBuilder.withArgName(BatchConstants.ARG_SQL_COUNT)
+            .hasArg()
+            .isRequired(true)
+            .withDescription("Sql count")
+            .create(BatchConstants.ARG_SQL_COUNT);
+
+    public SparkSqlBatch() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_STEP_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_SQL_COUNT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String stepName = base64Decode(optionsHelper.getOptionValue(OPTION_STEP_NAME));
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String sqlCountStr = optionsHelper.getOptionValue(OPTION_SQL_COUNT);
+        logger.info("start execute sql batch job, cubeName: " + cubeName + ", stepName: " +
+                stepName + ", segmentId: " + segmentId + ", sqlCount: " + sqlCountStr);
+
+        int sqlCount = Integer.valueOf(sqlCountStr);
+        if (sqlCount <= 0) {
+            throw new RuntimeException("Count of sqls to execute must be greater than 0, " +
+                    "in fact is " + sqlCountStr);
+        }
+
+        SparkSession sparkSession = getSparkSession(stepName + " for cube: " +
+                cubeName + ", segment " + segmentId);
+        for (int i = 0; i < sqlCount; i++) {
+            String argName = BatchConstants.ARG_BASE64_ENCODED_SQL + String.valueOf(i);
+            Option optionSqlText = OptionBuilder.withArgName(argName)
+                    .hasArg()
+                    .isRequired(true)
+                    .create(argName);
+            String encodedSql = optionsHelper.getOptionValue(optionSqlText);
+            String sqlText = base64Decode(encodedSql).trim();
+            if (null != sqlText && sqlText.endsWith(";")) {
+                sqlText = sqlText.substring(0, sqlText.length() - 1);
+            }
+            logger.info("execute spark sql: " + sqlText);
+            if (i == sqlCount - 1) {
+                sparkSession.sql(sqlText).count();
+            } else {
+                sparkSession.sql(sqlText);
+            }
+        }
+    }
+
+    private SparkSession getSparkSession(String appName) {
+        return SparkSession.builder()
+                .appName(appName)
+                .enableHiveSupport()
+                .getOrCreate();
+    }
+
+    private String base64Decode(String str) throws UnsupportedEncodingException {
+        return new String(
+                Base64.getDecoder().decode(str.getBytes(StandardCharsets.UTF_8)),
+                StandardCharsets.UTF_8
+        );
+    }
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 469722a..881c8d5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -19,14 +19,16 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
+import java.util.Base64;
 import java.util.Objects;
 import java.util.Set;
+import java.util.Locale;
+import java.util.Collections;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,6 +43,10 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.SparkCreatingFlatTable;
+import org.apache.kylin.engine.spark.SparkExecutable;
+import org.apache.kylin.engine.spark.SparkExecutableFactory;
+import org.apache.kylin.engine.spark.SparkSqlBatch;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -254,8 +260,14 @@ public class HiveInputBase {
             final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
 
-            if (kylinConfig.isLivyEnabled() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) {
-                jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+            if (cubeInstance.getEngineType() == IEngineAware.ID_SPARK) {
+                if (kylinConfig.isLivyEnabled()) {
+                    jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements,
+                            jobWorkingDir, cubeName, flatDesc));
+                } else {
+                    jobFlow.addTask(createFlatHiveTableBySparkSql(hiveInitStatements,
+                            jobWorkingDir, cubeName, flatDesc));
+                }
             } else {
                 jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
             }
@@ -342,6 +354,59 @@ public class HiveInputBase {
         return step;
     }
 
+    protected static AbstractExecutable createFlatHiveTableBySparkSql(String hiveInitStatements,
+            String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) {
+        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc,
+                jobWorkingDir);
+        String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+
+        KylinConfig config = flatDesc.getSegment().getConfig();
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(config);
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK);
+        sparkExecutable.setClassName(SparkCreatingFlatTable.class.getName());
+
+        sparkExecutable.setParam(SparkSqlBatch.OPTION_CUBE_NAME.getOpt(), cubeName);
+        sparkExecutable.setParam(SparkSqlBatch.OPTION_STEP_NAME.getOpt(),
+                base64EncodeStr(ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK));
+        sparkExecutable.setParam(SparkSqlBatch.OPTION_SEGMENT_ID.getOpt(),
+                flatDesc.getSegment().getName());
+        sparkExecutable.setParam(SparkSqlBatch.OPTION_SQL_COUNT.getOpt(),
+                String.valueOf(SparkCreatingFlatTable.SQL_COUNT));
+
+        sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(0).getOpt(),
+                base64EncodeStr(hiveInitStatements));
+        sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(1).getOpt(),
+                base64EncodeStr(dropTableHql));
+
+        // createTableHql include create table sql and alter table sql
+        String[] sqlArr = createTableHql.trim().split(";");
+        if (2 != sqlArr.length) {
+            throw new RuntimeException("create table hql should combined by a create table sql " +
+                    "and a alter sql, but got: " + createTableHql);
+        }
+        sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(2).getOpt(),
+                base64EncodeStr(sqlArr[0]));
+        sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(3).getOpt(),
+                base64EncodeStr(sqlArr[1]));
+
+        sparkExecutable.setParam(SparkCreatingFlatTable.getSqlOption(4).getOpt(),
+                base64EncodeStr(insertDataHqls));
+
+        StringBuilder jars = new StringBuilder();
+        StringUtil.appendWithSeparator(jars, config.getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+
+        return sparkExecutable;
+    }
+
+    private static String base64EncodeStr(String str) {
+        return new String(
+                Base64.getEncoder().encode(str.getBytes(StandardCharsets.UTF_8)),
+                StandardCharsets.UTF_8
+        );
+    }
+
     protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
             IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
diff --git a/webapp/app/partials/jobs/job_steps.html b/webapp/app/partials/jobs/job_steps.html
index e26c4c3..993b302 100644
--- a/webapp/app/partials/jobs/job_steps.html
+++ b/webapp/app/partials/jobs/job_steps.html
@@ -134,7 +134,7 @@
 
                     <a ng-if="step.info.yarn_application_tracking_url"
                        href="{{step.info.yarn_application_tracking_url}}" target="_blank"
-                       tooltip="MRJob">
+                       tooltip="Job">
                         <i class="ace-icon fa fa-tasks yellow bigger-110"></i>
                     </a>
                     <a ng-if="config.reference_links && config.reference_links['diagnostic'].link" href="{{config.reference_links['diagnostic'].link + step.info.mr_job_id}}" target="_blank"