You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/06/24 03:11:13 UTC
[kylin] branch master updated: KYLIN-4035 Calculate column
cardinality by using spark engine
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 760aefd KYLIN-4035 Calculate column cardinality by using spark engine
760aefd is described below
commit 760aefddf80f5ba32eb4448355713e4e22b7412d
Author: majie <ma...@163.com>
AuthorDate: Thu Jun 20 00:08:16 2019 +0800
KYLIN-4035 Calculate column cardinality by using spark engine
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../kylin/engine/spark/SparkColumnCardinality.java | 159 +++++++++++++++++++++
.../apache/kylin/engine/spark/SparkExecutable.java | 33 +++--
.../apache/kylin/rest/service/TableService.java | 24 +++-
4 files changed, 204 insertions(+), 16 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 56e7b77..4c351c0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1430,6 +1430,10 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
}
+ public boolean isSparkCardinalityEnabled(){
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
+ }
+
// ============================================================================
// ENGINE.LIVY
// ============================================================================
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkColumnCardinality.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkColumnCardinality.java
new file mode 100644
index 0000000..a87658f
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkColumnCardinality.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SparkColumnCardinality extends AbstractApplication implements Serializable {
+ protected static final Logger logger = LoggerFactory.getLogger(SparkColumnCardinality.class);
+
+ public static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_TABLE_NAME).hasArg()
+ .isRequired(true).withDescription("Table Name").create(BatchConstants.ARG_TABLE_NAME);
+ public static final Option OPTION_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("Output").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_PRJ = OptionBuilder.withArgName(BatchConstants.ARG_PROJECT).hasArg()
+ .isRequired(true).withDescription("Project name").create(BatchConstants.ARG_PROJECT);
+ public static final Option OPTION_COLUMN_COUNT = OptionBuilder.withArgName(BatchConstants.CFG_OUTPUT_COLUMN).hasArg()
+ .isRequired(true).withDescription("column count").create(BatchConstants.CFG_OUTPUT_COLUMN);
+
+ private Options options;
+
+ public SparkColumnCardinality() {
+ options = new Options();
+ options.addOption(OPTION_TABLE_NAME);
+ options.addOption(OPTION_OUTPUT);
+ options.addOption(OPTION_PRJ);
+ options.addOption(OPTION_COLUMN_COUNT);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String tableName = optionsHelper.getOptionValue(OPTION_TABLE_NAME);
+ String output = optionsHelper.getOptionValue(OPTION_OUTPUT);
+ int columnCnt = Integer.valueOf(optionsHelper.getOptionValue(OPTION_COLUMN_COUNT));
+
+ Class[] kryoClassArray = new Class[]{Class.forName("scala.reflect.ClassTag$$anon$1"),
+ Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey")};
+
+ SparkConf conf = new SparkConf().setAppName("Calculate table:" + tableName);
+ //set spark.sql.catalogImplementation=hive, If it is not set, SparkSession can't read hive metadata, and throw "org.apache.spark.sql.AnalysisException"
+ conf.set("spark.sql.catalogImplementation", "hive");
+ //serialization conf
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+ conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ sc.sc().addSparkListener(jobListener);
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(output));
+ // table will be loaded by spark sql, so isSequenceFile set false
+ final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(false, sc, null, tableName);
+ JavaPairRDD<Integer, Long> resultRdd = recordRDD.mapPartitionsToPair(new BuildHllCounter())
+ .reduceByKey((x, y) -> {
+ x.merge(y);
+ return x;
+ })
+ .mapToPair(record -> {
+ return new Tuple2<>(record._1, record._2.getCountEstimate());
+ })
+ .sortByKey(true, 1)
+ .cache();
+
+ if (resultRdd.count() == 0) {
+ ArrayList<Tuple2<Integer, Long>> list = new ArrayList<>();
+ for (int i = 0; i < columnCnt; ++i) {
+ list.add(new Tuple2<>(i, 0L));
+ }
+ JavaPairRDD<Integer, Long> nullRdd = sc.parallelizePairs(list).repartition(1);
+ nullRdd.saveAsNewAPIHadoopFile(output, IntWritable.class, LongWritable.class, TextOutputFormat.class);
+ } else {
+ resultRdd.saveAsNewAPIHadoopFile(output, IntWritable.class, LongWritable.class, TextOutputFormat.class);
+ }
+ }
+ }
+
+ static class BuildHllCounter implements
+ PairFlatMapFunction<Iterator<String[]>, Integer, HLLCounter> {
+
+ public BuildHllCounter() {
+ logger.info("BuildHllCounter init here.");
+ }
+
+ @Override
+ public Iterator<Tuple2<Integer, HLLCounter>> call(Iterator<String[]> iterator) throws Exception {
+ HashMap<Integer, HLLCounter> hllmap = new HashMap<>();
+ while (iterator.hasNext()) {
+ String[] values = iterator.next();
+ for (int m = 0; m < values.length; ++m) {
+ String fieldValue = values[m];
+ if (fieldValue == null) {
+ fieldValue = "NULL";
+ }
+ getHllc(hllmap, m).add(Bytes.toBytes(fieldValue));
+ }
+ }
+ // convert from hashmap to tuple2(scala).
+ List<Tuple2<Integer, HLLCounter>> result = new ArrayList<>();
+ for (Map.Entry<Integer, HLLCounter> entry : hllmap.entrySet()) {
+ result.add(new Tuple2<>(entry.getKey(), entry.getValue()));
+ }
+ return result.iterator();
+ }
+
+ private HLLCounter getHllc(HashMap<Integer, HLLCounter> hllcMap, Integer key) {
+ if (!hllcMap.containsKey(key)) {
+ hllcMap.put(key, new HLLCounter());
+ }
+ return hllcMap.get(key);
+ }
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index a6cbc23..9fd3781 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -56,6 +56,8 @@ import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
import org.slf4j.LoggerFactory;
/**
@@ -201,10 +203,21 @@ public class SparkExecutable extends AbstractExecutable {
return onResumed(sparkJobId, mgr);
} else {
String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
- CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
- final KylinConfig config = cube.getConfig();
-
- setAlgorithmLayer();
+ CubeInstance cube;
+ if (cubeName != null) {
+ cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
+ } else { // Cube name can't be got when loading hive table
+ cube = null;
+ }
+ final KylinConfig config;
+ if (cube != null) {
+ config = cube.getConfig();
+ } else {
+ // when loading hive table, we can't get cube name/config, so we get config from project.
+ String projectName = this.getParam(SparkColumnCardinality.OPTION_PRJ.getOpt());
+ ProjectInstance projectInst = ProjectManager.getInstance(context.getConfig()).getProject(projectName);
+ config = projectInst.getConfig();
+ }
if (KylinConfig.getSparkHome() == null) {
throw new NullPointerException();
@@ -229,11 +242,13 @@ public class SparkExecutable extends AbstractExecutable {
if (StringUtils.isEmpty(jars)) {
jars = jobJar;
}
-
- String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
- CubeSegment segment = cube.getSegmentById(segmentID);
- Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
- dumpMetadata(segment, mergingSeg);
+ if (cube != null) {
+ setAlgorithmLayer();
+ String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+ CubeSegment segment = cube.getSegmentById(segmentID);
+ Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+ dumpMetadata(segment, mergingSeg);
+ }
StringBuilder stringBuilder = new StringBuilder();
if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index f5c6d2d..a4c52dc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -45,6 +45,8 @@ import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.spark.SparkColumnCardinality;
+import org.apache.kylin.engine.spark.SparkExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
@@ -486,13 +488,21 @@ public class TableService extends BasicService {
String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
String param = "-table " + tableName + " -output " + outPath + " -project " + prj;
- MapReduceExecutable step1 = new MapReduceExecutable();
-
- step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
- step1.setMapReduceParams(param);
- step1.setParam("segmentId", tableName);
-
- job.addTask(step1);
+ if (getConfig().isSparkCardinalityEnabled()) { // use spark engine to calculate cardinality
+ SparkExecutable step1 = new SparkExecutable();
+ step1.setClassName(SparkColumnCardinality.class.getName());
+ step1.setParam(SparkColumnCardinality.OPTION_OUTPUT.getOpt(), outPath);
+ step1.setParam(SparkColumnCardinality.OPTION_PRJ.getOpt(), prj);
+ step1.setParam(SparkColumnCardinality.OPTION_TABLE_NAME.getOpt(), tableName);
+ step1.setParam(SparkColumnCardinality.OPTION_COLUMN_COUNT.getOpt(), String.valueOf(table.getColumnCount()));
+ job.addTask(step1);
+ } else {
+ MapReduceExecutable step1 = new MapReduceExecutable();
+ step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
+ step1.setMapReduceParams(param);
+ step1.setParam("segmentId", tableName);
+ job.addTask(step1);
+ }
HadoopShellExecutable step2 = new HadoopShellExecutable();