You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/24 09:48:18 UTC
[kylin] branch master updated: KYLIN-3442 Fact distinct columns in
Spark
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new a334ee4 KYLIN-3442 Fact distinct columns in Spark
a334ee4 is described below
commit a334ee4ff1f6609ab15bd7d4744206750e77ec21
Author: chao long <wa...@qq.com>
AuthorDate: Fri Aug 24 15:07:17 2018 +0800
KYLIN-3442 Fact distinct columns in Spark
---
engine-spark/pom.xml | 41 +
.../kylin/engine/spark/MultipleOutputsRDD.scala | 114 +++
.../engine/spark/SparkBatchCubingJobBuilder2.java | 34 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 39 +-
.../apache/kylin/engine/spark/SparkExecutable.java | 6 +-
.../kylin/engine/spark/SparkFactDistinct.java | 866 +++++++++++++++++++++
.../org/apache/kylin/engine/spark/SparkUtil.java | 45 +-
kylin-it/pom.xml | 3 -
pom.xml | 21 +
9 files changed, 1123 insertions(+), 46 deletions(-)
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 700aeb5..ceb9337 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -100,6 +100,47 @@
<version>0.9.10</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
new file mode 100644
index 0000000..cb5458d
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.{DataInputBuffer, Writable}
+import org.apache.hadoop.mapred.RawKeyValueIterator
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.counters.GenericCounter
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter
+import org.apache.hadoop.mapreduce.task.{ReduceContextImpl, TaskAttemptContextImpl}
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, TaskAttemptID, TaskType}
+import org.apache.hadoop.util.Progress
+import org.apache.spark._
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+class MultipleOutputsRDD[K, V](self: RDD[(String, (K, V, String))])
+ (implicit kt: ClassTag[K], vt: ClassTag[V]) extends Serializable {
+
+ def saveAsNewAPIHadoopDatasetWithMultipleOutputs(conf: Configuration) {
+ val hadoopConf = conf
+ val job = NewAPIHadoopJob.getInstance(hadoopConf)
+ val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+ val jobtrackerID = formatter.format(new Date())
+ val stageId = self.id
+ val jobConfiguration = job.getConfiguration
+ val wrappedConf = new SerializableWritable(jobConfiguration)
+ val outfmt = job.getOutputFormatClass
+ val jobFormat = outfmt.newInstance
+
+ if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+ jobFormat.checkOutputSpecs(job)
+ }
+
+ val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V, String))]) => {
+ val config = wrappedConf.value
+
+ val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
+ context.attemptNumber)
+ val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
+ val format = outfmt.newInstance
+
+ format match {
+ case c: Configurable => c.setConf(wrappedConf.value)
+ case _ => ()
+ }
+
+ val committer = format.getOutputCommitter(hadoopContext)
+ committer.setupTask(hadoopContext)
+
+ val recordWriter = format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]]
+
+ val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value, attemptId, new InputIterator(itr), new GenericCounter, new GenericCounter,
+ recordWriter, committer, new DummyReporter, null, kt.runtimeClass, vt.runtimeClass)
+
+ // use hadoop MultipleOutputs
+ val writer = new MultipleOutputs(taskInputOutputContext)
+
+ try {
+ while (itr.hasNext) {
+ val pair = itr.next()
+ writer.write(pair._1, pair._2._1, pair._2._2, pair._2._3)
+ }
+ } finally {
+ writer.close()
+ }
+ committer.commitTask(hadoopContext)
+ 1
+ }: Int
+
+ val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
+ val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
+ val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+ jobCommitter.setupJob(jobTaskContext)
+ self.context.runJob(self, writeShard)
+ jobCommitter.commitJob(jobTaskContext)
+ }
+
+ class InputIterator(itr: Iterator[_]) extends RawKeyValueIterator {
+ def getKey: DataInputBuffer = null
+ def getValue: DataInputBuffer = null
+ def getProgress: Progress = null
+ def next = itr.hasNext
+ def close() { }
+ }
+}
+
+object MultipleOutputsRDD {
+ def rddToMultipleOutputsRDD[K, V](rdd: JavaPairRDD[String, (Writable, Writable, String)]) = {
+ new MultipleOutputsRDD(rdd)
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index e545166..5fd7213 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -18,6 +18,9 @@
package org.apache.kylin.engine.spark;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.StringUtil;
@@ -32,9 +35,6 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-
/**
*/
public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
@@ -61,7 +61,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStep(jobId));
+ result.addTask(createFactDistinctColumnsSparkStep(jobId));
if (isEnableUHCDictStep()) {
result.addTask(createBuildUHCDictStep(jobId));
@@ -87,6 +87,32 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
+ public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
+ final SparkExecutable sparkExecutable = new SparkExecutable();
+ final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
+
+ sparkExecutable.setClassName(SparkFactDistinct.class.getName());
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_PATH.getOpt(), tablePath);
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_OUTPUT_PATH.getOpt(), getFactDistinctColumnsPath(jobId));
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkFactDistinct.OPTION_STATS_SAMPLING_PERCENT.getOpt(), String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+
+ sparkExecutable.setJobId(jobId);
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+ sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+
+ sparkExecutable.setJars(jars.toString());
+
+ return sparkExecutable;
+ }
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
final SparkExecutable sparkExecutable = new SparkExecutable();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cb3af31..9f4ae34 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -27,13 +27,10 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeDescManager;
@@ -68,9 +65,6 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,37 +163,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
- final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
-
- if (isSequenceFile) {
- encodedBaseRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
- .map(new Function<Text, String[]>() {
- @Override
- public String[] call(Text text) throws Exception {
- String s = Bytes.toString(text.getBytes(), 0, text.getLength());
- return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
- }
- }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
- } else {
- SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
- final Dataset intermediateTable = sparkSession.table(hiveTable);
- encodedBaseRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
- @Override
- public String[] call(Row row) throws Exception {
- String[] result = new String[row.size()];
- for (int i = 0; i < row.size(); i++) {
- final Object o = row.get(i);
- if (o != null) {
- result[i] = o.toString();
- } else {
- result[i] = null;
- }
- }
- return result;
- }
- }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
-
- }
+ final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
+ .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
Long totalCount = 0L;
if (envConfig.isSparkSanityCheckEnabled()) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 637382c..6122397 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -375,7 +375,11 @@ public class SparkExecutable extends AbstractExecutable {
Set<String> dumpList = new LinkedHashSet<>();
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
dumpList.addAll(segment.getDictionaryPaths());
- dumpList.add(segment.getStatisticsResourcePath());
+ ResourceStore rs = ResourceStore.getStore(segment.getConfig());
+ if (rs.exists(segment.getStatisticsResourcePath())) {
+ // cube statistics is not available for new segment
+ dumpList.add(segment.getStatisticsResourcePath());
+ }
JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(), this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
new file mode 100644
index 0000000..61e2e53
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -0,0 +1,866 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+public class SparkFactDistinct extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkFactDistinct.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
+ .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
+ .withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+ public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+ .withDescription("Hive Intermediate Table").create("hiveTable");
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+
+ private Options options;
+
+ public SparkFactDistinct() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_INPUT_TABLE);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
+
+ Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") };
+
+ SparkConf conf = new SparkConf().setAppName("Fact distinct columns for:" + cubeName + " segment " + segmentId);
+ //serialization conf
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+ conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ sc.sc().addSparkListener(jobListener);
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+
+ final Job job = Job.getInstance(sConf.get());
+
+ final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+ logger.info("RDD Output path: {}", outputPath);
+ logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
+ logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+
+ boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+ // calculate source record bytes size
+ final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+
+ final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+
+ JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
+
+ JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD.groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
+
+ JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD.mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
+
+ // make each reducer output to respective dir
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+ // prevent to create zero-sized default output
+ LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
+ MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+
+ multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+
+ // only work for client mode, not work when spark.submit.deployMode=cluster
+ logger.info("Map input records={}", recordRDD.count());
+ logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+
+ HadoopUtil.deleteHDFSMeta(metaUrl);
+ }
+
+ static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
+ private volatile transient boolean initialized = false;
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int samplingPercent;
+ private CuboidStatCalculator cuboidStatCalculator;
+ private FactDistinctColumnsReducerMapping reducerMapping;
+ private List<TblColRef> allCols;
+ private int[] columnIndex;
+ private DictColDeduper dictColDeduper;
+ private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap;
+ private ByteBuffer tmpbuf;
+ private LongAccumulator bytesWritten;
+
+ public FlatOutputFucntion(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, int samplingPercent, LongAccumulator bytesWritten) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaurl;
+ this.conf = conf;
+ this.samplingPercent = samplingPercent;
+ this.dimensionRangeInfoMap = Maps.newHashMap();
+ this.bytesWritten = bytesWritten;
+ }
+
+ private void init() {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+
+ reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+ tmpbuf = ByteBuffer.allocate(4096);
+
+ int[] rokeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+
+ Long[] cuboidIds = getCuboidIds(cubeSegment);
+
+ Integer[][] cuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, rokeyColumnIndexes.length);
+
+ boolean isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
+
+ HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length, cubeDesc.getConfig().getCubeStatsHLLPrecision());
+
+ cuboidStatCalculator = new CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet, isNewAlgorithm, cuboidsHLL);
+ allCols = reducerMapping.getAllDimDictCols();
+
+ initDictColDeduper(cubeDesc);
+ initColumnIndex(intermediateTableDesc);
+
+ initialized = true;
+ }
+
+ @Override
+ public Iterator<Tuple2<SelfDefineSortableKey, Text>> call(Iterator<String[]> rowIterator) throws Exception {
+ if (initialized == false) {
+ synchronized (SparkFactDistinct.class) {
+ if (initialized == false) {
+ init();
+ }
+ }
+ }
+
+ List<String[]> rows = Lists.newArrayList(rowIterator);
+ List<Tuple2<SelfDefineSortableKey, Text>> result = Lists.newArrayList();
+
+ int rowCount = 0;
+
+ for (String[] row : rows) {
+ bytesWritten.add(countSizeInBytes(row));
+
+ for (int i = 0; i < allCols.size(); i++) {
+ String fieldValue = row[columnIndex[i]];
+ if (fieldValue == null)
+ continue;
+
+ final DataType type = allCols.get(i).getType();
+
+ //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+ if (dictColDeduper.isDictCol(i)) {
+ if (dictColDeduper.add(i, fieldValue)) {
+ addFieldValue(type, i, fieldValue, result);
+ }
+ } else {
+ DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+ if (old == null) {
+ old = new DimensionRangeInfo(fieldValue, fieldValue);
+ dimensionRangeInfoMap.put(i, old);
+ } else {
+ old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+ old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+ }
+ }
+ }
+
+ if (rowCount % 100 < samplingPercent) {
+ cuboidStatCalculator.putRow(row);
+ }
+
+ if (rowCount % 100 == 0) {
+ dictColDeduper.resetIfShortOfMem();
+ }
+
+ rowCount++;
+ }
+
+ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+ HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+ HLLCounter hll;
+
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = cuboidsHLL[i];
+ tmpbuf.clear();
+ tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+ tmpbuf.putLong(cuboidIds[i]);
+ Text outputKey = new Text();
+ Text outputValue = new Text();
+ SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+
+ sortableKey.init(outputKey, (byte) 0);
+
+ result.add(new Tuple2<SelfDefineSortableKey, Text>(sortableKey, outputValue));
+ }
+
+ for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+ DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+ DataType dataType = allCols.get(colIndex).getType();
+ addFieldValue(dataType, colIndex, rangeInfo.getMin(), result);
+ addFieldValue(dataType, colIndex, rangeInfo.getMax(), result);
+ }
+
+ return result.iterator();
+ }
+
+ private boolean isUsePutRowKeyToHllNewAlgorithm(CubeDesc cubeDesc) {
+ boolean isUsePutRowKeyToHllNewAlgorithm;
+ if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+ isUsePutRowKeyToHllNewAlgorithm = false;
+ logger.info("Found KylinVersion: {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+ } else {
+ isUsePutRowKeyToHllNewAlgorithm = true;
+ logger.info(
+ "Found KylinVersion: {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+ cubeDesc.getVersion());
+ }
+ return isUsePutRowKeyToHllNewAlgorithm;
+ }
+
+ private Long[] getCuboidIds(CubeSegment cubeSegment) {
+ Set<Long> cuboidIdSet = Sets.newHashSet(cubeSegment.getCuboidScheduler().getAllCuboidIds());
+ if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSegment)) {
+ // For cube planner, for every prebuilt cuboid, its related row count stats should be calculated
+ // If the precondition for trigger cube planner phase one is satisfied, we need to calculate row count stats for mandatory cuboids.
+ cuboidIdSet.addAll(cubeSegment.getCubeDesc().getMandatoryCuboids());
+ }
+
+ return cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+ }
+
+ private HLLCounter[] getInitCuboidsHLL(int cuboidSize, int hllPrecision) {
+ HLLCounter[] cuboidsHLL = new HLLCounter[cuboidSize];
+ for (int i = 0; i < cuboidSize; i++) {
+ cuboidsHLL[i] = new HLLCounter(hllPrecision, RegisterType.DENSE);
+ }
+ return cuboidsHLL;
+ }
+
+ private void initDictColDeduper(CubeDesc cubeDesc) {
+ // setup dict col deduper
+ dictColDeduper = new DictColDeduper();
+ Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+ for (int i = 0; i < allCols.size(); i++) {
+ if (dictCols.contains(allCols.get(i)))
+ dictColDeduper.setIsDictCol(i);
+ }
+ }
+
+ private void initColumnIndex(CubeJoinedFlatTableEnrich intermediateTableDesc) {
+ columnIndex = new int[allCols.size()];
+ for (int i = 0; i < allCols.size(); i++) {
+ TblColRef colRef = allCols.get(i);
+ int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+ columnIndex[i] = columnIndexOnFlatTbl;
+ }
+ }
+
+ private void addFieldValue(DataType type, Integer colIndex, String value,
+ List<Tuple2<SelfDefineSortableKey, Text>> result) {
+ int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+ tmpbuf.clear();
+ byte[] valueBytes = Bytes.toBytes(value);
+ int size = valueBytes.length + 1;
+ if (size >= tmpbuf.capacity()) {
+ tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+ }
+ tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+ tmpbuf.put(valueBytes);
+
+ Text outputKey = new Text();
+ SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ sortableKey.init(outputKey, type);
+
+ result.add(new Tuple2<SelfDefineSortableKey, Text>(sortableKey, new Text()));
+
+ // log a few rows for troubleshooting
+ if (result.size() < 10) {
+ logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+ }
+ }
+
+ private int countNewSize(int oldSize, int dataSize) {
+ int newSize = oldSize * 2;
+ while (newSize < dataSize) {
+ newSize = newSize * 2;
+ }
+ return newSize;
+ }
+
+ private int countSizeInBytes(String[] row) {
+ int size = 0;
+ for (String s : row) {
+ size += s == null ? 1 : StringUtil.utf8Length(s);
+ size++; // delimiter
+ }
+ return size;
+ }
+ }
+
+ static class CuboidStatCalculator {
+ private final int nRowKey;
+ private final int[] rowkeyColIndex;
+ private final Long[] cuboidIds;
+ private final Integer[][] cuboidsBitSet;
+ private volatile HLLCounter[] cuboidsHLL;
+
+ //about details of the new algorithm, please see KYLIN-2518
+ private final boolean isNewAlgorithm;
+ private final HashFunction hf;
+ private long[] rowHashCodesLong;
+
+ public CuboidStatCalculator(int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet,
+ boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) {
+ this.nRowKey = rowkeyColIndex.length;
+ this.rowkeyColIndex = rowkeyColIndex;
+ this.cuboidIds = cuboidIds;
+ this.cuboidsBitSet = cuboidsBitSet;
+ this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+ if (!isNewAlgorithm) {
+ this.hf = Hashing.murmur3_32();
+ } else {
+ rowHashCodesLong = new long[nRowKey];
+ this.hf = Hashing.murmur3_128();
+ }
+ this.cuboidsHLL = cuboidsHLL;
+ }
+
+ public void putRow(final String[] row) {
+ String[] copyRow = Arrays.copyOf(row, row.length);
+
+ if (isNewAlgorithm) {
+ putRowKeyToHLLNew(copyRow);
+ } else {
+ putRowKeyToHLLOld(copyRow);
+ }
+ }
+
+ private void putRowKeyToHLLOld(String[] row) {
+ //generate hash for each row key column
+ byte[][] rowHashCodes = new byte[nRowKey][];
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[rowkeyColIndex[i]];
+ if (colValue != null) {
+ rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+ } else {
+ rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+ }
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+ hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+ }
+
+ cuboidsHLL[i].add(hc.hash().asBytes());
+ }
+ }
+
+ private void putRowKeyToHLLNew(String[] row) {
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[rowkeyColIndex[i]];
+ if (colValue == null)
+ colValue = "0";
+ byte[] bytes = hc.putString(colValue).hash().asBytes();
+ rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+ long value = 0;
+ for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+ value += rowHashCodesLong[cuboidsBitSet[i][position]];
+ }
+ cuboidsHLL[i].addHashDirectly(value);
+ }
+ }
+
+ public HLLCounter[] getHLLCounters() {
+ return cuboidsHLL;
+ }
+
+ public Long[] getCuboidIds() {
+ return cuboidIds;
+ }
+ }
+
+ static class FactDistinctPartitioner extends Partitioner {
+ private volatile transient boolean initialized = false;
+ private String cubeName;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int totalReducerNum;
+ private FactDistinctColumnsReducerMapping reducerMapping;
+
+ public FactDistinctPartitioner(String cubeName, String metaUrl, SerializableConfiguration conf, int totalReducerNum) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ this.totalReducerNum = totalReducerNum;
+ }
+
+ private void init() {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+ initialized = true;
+ }
+
+ @Override
+ public int numPartitions() {
+ return totalReducerNum;
+ }
+
+ @Override
+ public int getPartition(Object o) {
+ if (initialized == false) {
+ synchronized (SparkFactDistinct.class) {
+ if (initialized == false) {
+ init();
+ }
+ }
+ }
+
+ SelfDefineSortableKey skey = (SelfDefineSortableKey) o;
+ Text key = skey.getText();
+ if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+ Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+ return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
+ } else {
+ return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
+ }
+ }
+ }
+
+ static class MultiOutputFunction implements
+ PairFlatMapFunction<Iterator<Tuple2<SelfDefineSortableKey, Iterable<Text>>>, String, Tuple3<Writable, Writable, String>> {
+ private volatile transient boolean initialized = false;
+ private String DICT_FILE_POSTFIX = ".rldict";
+ private String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
+ private String cubeName;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int samplingPercent;
+ private FactDistinctColumnsReducerMapping reducerMapping;
+ private int taskId;
+ private boolean isStatistics = false;
+ private long baseCuboidId;
+ private List<Long> baseCuboidRowCountInMappers;
+ private Map<Long, HLLCounter> cuboidHLLMap;
+ private TblColRef col;
+ private boolean buildDictInReducer;
+ private IDictionaryBuilder builder;
+ private int rowCount = 0;
+ private long totalRowsBeforeMerge = 0;
+ private KylinConfig cubeConfig;
+ private CubeDesc cubeDesc;
+ private String maxValue = null;
+ private String minValue = null;
+ private List<Tuple2<String, Tuple3<Writable, Writable, String>>> result;
+
+ public MultiOutputFunction(String cubeName, String metaurl, SerializableConfiguration conf, int samplingPercent) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaurl;
+ this.conf = conf;
+ this.samplingPercent = samplingPercent;
+ }
+
+ private void init() throws IOException {
+ taskId = TaskContext.getPartitionId();
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ cubeDesc = cubeInstance.getDescriptor();
+ cubeConfig = cubeInstance.getConfig();
+ reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+ result = Lists.newArrayList();
+
+ if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+ // hll
+ isStatistics = true;
+ baseCuboidId = cubeInstance.getCuboidScheduler().getBaseCuboidId();
+ baseCuboidRowCountInMappers = Lists.newArrayList();
+ cuboidHLLMap = Maps.newHashMap();
+
+ logger.info("Partition " + taskId + " handling stats");
+ } else {
+ // normal col
+ col = reducerMapping.getColForReducer(taskId);
+ Preconditions.checkNotNull(col);
+
+ // local build dict
+ buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
+ if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
+ buildDictInReducer = false;
+ }
+
+ if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+ buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
+ }
+
+ if (buildDictInReducer) {
+ builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+ builder.init(null, 0, null);
+ }
+ logger.info("Partition " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer);
+ }
+
+ initialized = true;
+ }
+
+ private void logAFewRows(String value) {
+ if (rowCount < 10) {
+ logger.info("Received value: " + value);
+ }
+ }
+
+ @Override
+ public Iterator<Tuple2<String, Tuple3<Writable, Writable, String>>> call(
+ Iterator<Tuple2<SelfDefineSortableKey, Iterable<Text>>> tuple2Iterator) throws Exception {
+ if (initialized == false) {
+ synchronized (SparkFactDistinct.class) {
+ if (initialized == false) {
+ init();
+ }
+ }
+ }
+
+ List<Tuple2<SelfDefineSortableKey, Iterable<Text>>> tuples = Lists.newArrayList(tuple2Iterator);
+
+ for (Tuple2<SelfDefineSortableKey, Iterable<Text>> tuple : tuples) {
+ Text key = tuple._1.getText();
+
+ if (isStatistics) {
+ // for hll
+ long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+
+ for (Text value : tuple._2) {
+ HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
+ ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
+ hll.readRegisters(bf);
+
+ totalRowsBeforeMerge += hll.getCountEstimate();
+
+ if (cuboidId == baseCuboidId) {
+ baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+ }
+
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ cuboidHLLMap.get(cuboidId).merge(hll);
+ } else {
+ cuboidHLLMap.put(cuboidId, hll);
+ }
+ }
+
+ } else {
+ String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+ logAFewRows(value);
+ // if dimension col, compute max/min value
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ if (minValue == null || col.getType().compare(minValue, value) > 0) {
+ minValue = value;
+ }
+ if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+ maxValue = value;
+ }
+ }
+
+ //if dict column
+ if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+ if (buildDictInReducer) {
+ builder.addValue(value);
+ } else {
+ byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+ // output written to baseDir/colName/-r-00000 (etc)
+ String fileName = col.getIdentity() + "/";
+ result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(
+ BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
+ NullWritable.get(), new Text(keyBytes), fileName)));
+ }
+ }
+ }
+
+ rowCount++;
+ }
+
+ if (isStatistics) {
+ //output the hll info;
+ List<Long> allCuboids = Lists.newArrayList();
+ allCuboids.addAll(cuboidHLLMap.keySet());
+ Collections.sort(allCuboids);
+
+ logMapperAndCuboidStatistics(allCuboids); // for human check
+ outputStatistics(allCuboids, result);
+ } else {
+ //dimension col
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ outputDimRangeInfo(result);
+ }
+ // dic col
+ if (buildDictInReducer) {
+ Dictionary<String> dict = builder.build();
+ outputDict(col, dict, result);
+ }
+ }
+
+ return result.iterator();
+ }
+
+ private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
+ logger.info("Cuboid number for task: " + taskId + "\t" + allCuboids.size());
+ logger.info("Samping percentage: \t" + samplingPercent);
+ logger.info("The following statistics are collected based on sampling data. ");
+ logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+
+ for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+ if (baseCuboidRowCountInMappers.get(i) > 0) {
+ logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+ }
+ }
+
+ long grantTotal = 0;
+ for (long i : allCuboids) {
+ grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+ logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+ }
+
+ logger.info("Sum of row counts (before merge) is: \t " + totalRowsBeforeMerge);
+ logger.info("After merge, the row count: \t " + grantTotal);
+ }
+
+ private void outputDimRangeInfo(List<Tuple2<String, Tuple3<Writable, Writable, String>>> result) {
+ if (col != null && minValue != null) {
+ // output written to baseDir/colName/colName.dci-r-00000 (etc)
+ String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+ result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+ new Tuple3<Writable, Writable, String>(NullWritable.get(), new Text(minValue.getBytes()),
+ dimRangeFileName)));
+ result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+ new Tuple3<Writable, Writable, String>(NullWritable.get(), new Text(maxValue.getBytes()),
+ dimRangeFileName)));
+ logger.info("write dimension range info for col : " + col.getName() + " minValue:" + minValue
+ + " maxValue:" + maxValue);
+ }
+ }
+
+ private void outputDict(TblColRef col, Dictionary<String> dict, List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+ throws IOException, InterruptedException {
+ // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+ String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos)) {
+ outputStream.writeUTF(dict.getClass().getName());
+ dict.write(outputStream);
+
+ result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_DICT,
+ new Tuple3<Writable, Writable, String>(NullWritable.get(),
+ new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName)));
+ }
+ }
+
+ private void outputStatistics(List<Long> allCuboids, List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+ throws IOException, InterruptedException {
+ // output written to baseDir/statistics/statistics-r-00000 (etc)
+ String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+
+ // mapper overlap ratio at key -1
+ long grandTotal = 0;
+
+ for (HLLCounter hll : cuboidHLLMap.values()) {
+ grandTotal += hll.getCountEstimate();
+ }
+
+ double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+ result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+ new Tuple3<Writable, Writable, String>(new LongWritable(-1),
+ new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName)));
+
+ // mapper number at key -2
+ result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+ new Tuple3<Writable, Writable, String>(new LongWritable(-2),
+ new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName)));
+
+ // sampling percentage at key 0
+ result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+ new Tuple3<Writable, Writable, String>(new LongWritable(0L),
+ new BytesWritable(Bytes.toBytes(samplingPercent)), statisticsFileName)));
+
+ ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+ for (long i : allCuboids) {
+ valueBuf.clear();
+ cuboidHLLMap.get(i).writeRegisters(valueBuf);
+ valueBuf.flip();
+
+ byte[] valueCopy = new byte[valueBuf.limit()];
+ System.arraycopy(valueBuf.array(), 0, valueCopy, 0, valueBuf.limit());
+
+ result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+ new Tuple3<Writable, Writable, String>(new LongWritable(i),
+ new BytesWritable(valueCopy, valueCopy.length), statisticsFileName)));
+ }
+ }
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 31eebc8..82a1a9b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -24,23 +24,31 @@ import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
public class SparkUtil {
@@ -130,6 +138,41 @@ public class SparkUtil {
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
- }
+ }
+
+ public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) {
+ JavaRDD<String[]> recordRDD;
+
+ if (isSequenceFile) {
+ recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+ .map(new Function<Text, String[]>() {
+ @Override
+ public String[] call(Text text) throws Exception {
+ String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+ return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+ }
+ });
+ } else {
+ SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+ final Dataset intermediateTable = sparkSession.table(hiveTable);
+ recordRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+ @Override
+ public String[] call(Row row) throws Exception {
+ String[] result = new String[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ final Object o = row.get(i);
+ if (o != null) {
+ result[i] = o.toString();
+ } else {
+ result[i] = null;
+ }
+ }
+ return result;
+ }
+ });
+ }
+
+ return recordRDD;
+ }
}
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 16bedb5..a3e7e68 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -307,17 +307,14 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
- <version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
- <version>2.11.0</version>
</dependency>
</dependencies>
diff --git a/pom.xml b/pom.xml
index d9b9efe..cd18659 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
<spark.version>2.1.2</spark.version>
<kryo.version>4.0.0</kryo.version>
+ <!-- Scala versions -->
+ <scala.version>2.11.0</scala.version>
+
<!-- <reflections.version>0.9.10</reflections.version> -->
<!-- Calcite Version -->
@@ -895,6 +898,24 @@
<version>${tomcat.version}</version>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>