You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/06/24 03:11:13 UTC

[kylin] branch master updated: KYLIN-4035 Calculate column cardinality by using spark engine

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 760aefd  KYLIN-4035 Calculate column cardinality by using spark engine
760aefd is described below

commit 760aefddf80f5ba32eb4448355713e4e22b7412d
Author: majie <ma...@163.com>
AuthorDate: Thu Jun 20 00:08:16 2019 +0800

    KYLIN-4035 Calculate column cardinality by using spark engine
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../kylin/engine/spark/SparkColumnCardinality.java | 159 +++++++++++++++++++++
 .../apache/kylin/engine/spark/SparkExecutable.java |  33 +++--
 .../apache/kylin/rest/service/TableService.java    |  24 +++-
 4 files changed, 204 insertions(+), 16 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 56e7b77..4c351c0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1430,6 +1430,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
     }
 
+    public boolean isSparkCardinalityEnabled(){
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
+    }
+
     // ============================================================================
     // ENGINE.LIVY
     // ============================================================================
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkColumnCardinality.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkColumnCardinality.java
new file mode 100644
index 0000000..a87658f
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkColumnCardinality.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SparkColumnCardinality extends AbstractApplication implements Serializable {
+    protected static final Logger logger = LoggerFactory.getLogger(SparkColumnCardinality.class);
+
+    public static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_TABLE_NAME).hasArg()
+            .isRequired(true).withDescription("Table Name").create(BatchConstants.ARG_TABLE_NAME);
+    public static final Option OPTION_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Output").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_PRJ = OptionBuilder.withArgName(BatchConstants.ARG_PROJECT).hasArg()
+            .isRequired(true).withDescription("Project name").create(BatchConstants.ARG_PROJECT);
+    public static final Option OPTION_COLUMN_COUNT = OptionBuilder.withArgName(BatchConstants.CFG_OUTPUT_COLUMN).hasArg()
+            .isRequired(true).withDescription("column count").create(BatchConstants.CFG_OUTPUT_COLUMN);
+
+    private Options options;
+
+    public SparkColumnCardinality() {
+        options = new Options();
+        options.addOption(OPTION_TABLE_NAME);
+        options.addOption(OPTION_OUTPUT);
+        options.addOption(OPTION_PRJ);
+        options.addOption(OPTION_COLUMN_COUNT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String tableName = optionsHelper.getOptionValue(OPTION_TABLE_NAME);
+        String output = optionsHelper.getOptionValue(OPTION_OUTPUT);
+        int columnCnt = Integer.valueOf(optionsHelper.getOptionValue(OPTION_COLUMN_COUNT));
+
+        Class[] kryoClassArray = new Class[]{Class.forName("scala.reflect.ClassTag$$anon$1"),
+                Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey")};
+
+        SparkConf conf = new SparkConf().setAppName("Calculate table:" + tableName);
+        //set spark.sql.catalogImplementation=hive, If it is not set, SparkSession can't read hive metadata, and throw "org.apache.spark.sql.AnalysisException"
+        conf.set("spark.sql.catalogImplementation", "hive");
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            sc.sc().addSparkListener(jobListener);
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(output));
+            // table will be loaded by spark sql, so isSequenceFile set false
+            final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(false, sc, null, tableName);
+            JavaPairRDD<Integer, Long> resultRdd = recordRDD.mapPartitionsToPair(new BuildHllCounter())
+                    .reduceByKey((x, y) -> {
+                        x.merge(y);
+                        return x;
+                    })
+                    .mapToPair(record -> {
+                        return new Tuple2<>(record._1, record._2.getCountEstimate());
+                    })
+                    .sortByKey(true, 1)
+                    .cache();
+
+            if (resultRdd.count() == 0) {
+                ArrayList<Tuple2<Integer, Long>> list = new ArrayList<>();
+                for (int i = 0; i < columnCnt; ++i) {
+                    list.add(new Tuple2<>(i, 0L));
+                }
+                JavaPairRDD<Integer, Long> nullRdd = sc.parallelizePairs(list).repartition(1);
+                nullRdd.saveAsNewAPIHadoopFile(output, IntWritable.class, LongWritable.class, TextOutputFormat.class);
+            } else {
+                resultRdd.saveAsNewAPIHadoopFile(output, IntWritable.class, LongWritable.class, TextOutputFormat.class);
+            }
+        }
+    }
+
+    static class BuildHllCounter implements
+            PairFlatMapFunction<Iterator<String[]>, Integer, HLLCounter> {
+
+        public BuildHllCounter() {
+            logger.info("BuildHllCounter init here.");
+        }
+
+        @Override
+        public Iterator<Tuple2<Integer, HLLCounter>> call(Iterator<String[]> iterator) throws Exception {
+            HashMap<Integer, HLLCounter> hllmap = new HashMap<>();
+            while (iterator.hasNext()) {
+                String[] values = iterator.next();
+                for (int m = 0; m < values.length; ++m) {
+                    String fieldValue = values[m];
+                    if (fieldValue == null) {
+                        fieldValue = "NULL";
+                    }
+                    getHllc(hllmap, m).add(Bytes.toBytes(fieldValue));
+                }
+            }
+            // convert from hashmap to tuple2(scala).
+            List<Tuple2<Integer, HLLCounter>> result = new ArrayList<>();
+            for (Map.Entry<Integer, HLLCounter> entry : hllmap.entrySet()) {
+                result.add(new Tuple2<>(entry.getKey(), entry.getValue()));
+            }
+            return result.iterator();
+        }
+
+        private HLLCounter getHllc(HashMap<Integer, HLLCounter> hllcMap, Integer key) {
+            if (!hllcMap.containsKey(key)) {
+                hllcMap.put(key, new HLLCounter());
+            }
+            return hllcMap.get(key);
+        }
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index a6cbc23..9fd3781 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -56,6 +56,8 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -201,10 +203,21 @@ public class SparkExecutable extends AbstractExecutable {
             return onResumed(sparkJobId, mgr);
         } else {
             String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
-            CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
-            final KylinConfig config = cube.getConfig();
-
-            setAlgorithmLayer();
+            CubeInstance cube;
+            if (cubeName != null) {
+                cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
+            } else {  // Cube name can't be got when loading hive table
+                cube = null;
+            }
+            final KylinConfig config;
+            if (cube != null) {
+                config = cube.getConfig();
+            } else {
+                // when loading hive table, we can't get cube name/config, so we get config from project.
+                String projectName = this.getParam(SparkColumnCardinality.OPTION_PRJ.getOpt());
+                ProjectInstance projectInst = ProjectManager.getInstance(context.getConfig()).getProject(projectName);
+                config = projectInst.getConfig();
+            }
 
             if (KylinConfig.getSparkHome() == null) {
                 throw new NullPointerException();
@@ -229,11 +242,13 @@ public class SparkExecutable extends AbstractExecutable {
             if (StringUtils.isEmpty(jars)) {
                 jars = jobJar;
             }
-
-            String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
-            CubeSegment segment = cube.getSegmentById(segmentID);
-            Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
-            dumpMetadata(segment, mergingSeg);
+            if (cube != null) {
+                setAlgorithmLayer();
+                String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+                CubeSegment segment = cube.getSegmentById(segmentID);
+                Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+                dumpMetadata(segment, mergingSeg);
+            }
 
             StringBuilder stringBuilder = new StringBuilder();
             if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index f5c6d2d..a4c52dc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -45,6 +45,8 @@ import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.spark.SparkColumnCardinality;
+import org.apache.kylin.engine.spark.SparkExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -486,13 +488,21 @@ public class TableService extends BasicService {
         String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
         String param = "-table " + tableName + " -output " + outPath + " -project " + prj;
 
-        MapReduceExecutable step1 = new MapReduceExecutable();
-
-        step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
-        step1.setMapReduceParams(param);
-        step1.setParam("segmentId", tableName);
-
-        job.addTask(step1);
+        if (getConfig().isSparkCardinalityEnabled()) { // use spark engine to calculate cardinality
+            SparkExecutable step1 = new SparkExecutable();
+            step1.setClassName(SparkColumnCardinality.class.getName());
+            step1.setParam(SparkColumnCardinality.OPTION_OUTPUT.getOpt(), outPath);
+            step1.setParam(SparkColumnCardinality.OPTION_PRJ.getOpt(), prj);
+            step1.setParam(SparkColumnCardinality.OPTION_TABLE_NAME.getOpt(), tableName);
+            step1.setParam(SparkColumnCardinality.OPTION_COLUMN_COUNT.getOpt(), String.valueOf(table.getColumnCount()));
+            job.addTask(step1);
+        } else {
+            MapReduceExecutable step1 = new MapReduceExecutable();
+            step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
+            step1.setMapReduceParams(param);
+            step1.setParam("segmentId", tableName);
+            job.addTask(step1);
+        }
 
         HadoopShellExecutable step2 = new HadoopShellExecutable();