You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/04/28 07:07:18 UTC
[kylin] branch master updated: KYLIN-3847 Flink cubing step : fact
distinct
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 3b4db4a KYLIN-3847 Flink cubing step : fact distinct
3b4db4a is described below
commit 3b4db4aacf425fe6e706efbd240116823435348a
Author: harveyyue <yw...@126.com>
AuthorDate: Tue Mar 10 13:47:56 2020 +0800
KYLIN-3847 Flink cubing step : fact distinct
---
.../org/apache/kylin/common/KylinConfigBase.java | 16 +-
.../org/apache/kylin/common/util/StringUtil.java | 3 +
.../engine/flink/FlinkBatchCubingJobBuilder2.java | 34 +-
.../engine/flink/FlinkFactDistinctColumns.java | 357 +++++++++++++
.../engine/flink/HadoopMultipleOutputFormat.java | 114 +++++
.../engine/mr/steps/FactDistinctColumnsBase.java | 552 +++++++++++++++++++++
6 files changed, 1069 insertions(+), 7 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 773254d..8eefc02 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1560,15 +1560,15 @@ public abstract class KylinConfigBase implements Serializable {
}
public boolean isSparkFactDistinctEnable() {
- return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "true"));
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", FALSE));
}
public boolean isSparkUHCDictionaryEnable() {
- return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", "false"));
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", FALSE));
}
public boolean isSparkCardinalityEnabled() {
- return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", FALSE));
}
public int getSparkOutputMaxSize() {
@@ -1576,15 +1576,19 @@ public abstract class KylinConfigBase implements Serializable {
}
public boolean isSparkDimensionDictionaryEnabled() {
- return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false"));
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", FALSE));
+ }
+
+ public boolean isSparCreateHiveTableViaSparkEnable() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", FALSE));
}
public boolean isFlinkSanityCheckEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
}
- public boolean isSparCreateHiveTableViaSparkEnable() {
- return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", "false"));
+ public boolean isFlinkFactDistinctEnable() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.flink-fact-distinct", FALSE));
}
// ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
index 5dde9cf..80545dc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
@@ -220,4 +220,7 @@ public class StringUtil {
return a == null ? b == null : a.equals(b);
}
+ public static boolean isEmpty(String str) {
+ return str == null || str.length() == 0;
+ }
}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
index 925334c..b2db499 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
@@ -61,7 +61,12 @@ public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStep(jobId));
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ if (config.isFlinkFactDistinctEnable()) {
+ result.addTask(createFactDistinctColumnsFlinkStep(jobId));
+ } else {
+ result.addTask(createFactDistinctColumnsStep(jobId));
+ }
if (isEnableUHCDictStep()) {
result.addTask(createBuildUHCDictStep(jobId));
@@ -87,6 +92,33 @@ public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
+ public FlinkExecutable createFactDistinctColumnsFlinkStep(String jobId) {
+ final FlinkExecutable flinkExecutable = new FlinkExecutable();
+ final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
+
+ flinkExecutable.setClassName(FlinkFactDistinctColumns.class.getName());
+ flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
+ flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+ flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_INPUT_PATH.getOpt(), tablePath);
+ flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_OUTPUT_PATH.getOpt(), getFactDistinctColumnsPath(jobId));
+ flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_STATS_SAMPLING_PERCENT.getOpt(), String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+
+ flinkExecutable.setJobId(jobId);
+ flinkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+ flinkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES, getCounterOutputPath(jobId));
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getFlinkAdditionalJars());
+
+ flinkExecutable.setJars(jars.toString());
+
+ return flinkExecutable;
+ }
+
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
final FlinkExecutable flinkExecutable = new FlinkExecutable();
flinkExecutable.setClassName(FlinkCubingByLayer.class.getName());
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java
new file mode 100644
index 0000000..b1787d8
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsBase;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class FlinkFactDistinctColumns extends AbstractApplication {
+ protected static final Logger logger = LoggerFactory.getLogger(FlinkFactDistinctColumns.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+ .withDescription("Hive Intermediate Table").create("hiveTable");
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUTPUT)
+ .hasArg().isRequired(true).withDescription("Counter output path").create(BatchConstants.ARG_COUNTER_OUTPUT);
+ public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
+ .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
+ .withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+ public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+ .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
+
+ private Options options;
+
+ public FlinkFactDistinctColumns() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_INPUT_TABLE);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+ options.addOption(OPTION_COUNTER_PATH);
+ options.addOption(OPTION_ENABLE_OBJECT_REUSE);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
+ int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
+ String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+ Job job = Job.getInstance();
+ FileSystem fs = HadoopUtil.getWorkingFileSystem(job.getConfiguration());
+ HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+ KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+
+ final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+ final int totalReducer = reducerMapping.getTotalReducerNum();
+
+ logger.info("getTotalReducerNum: {}", totalReducer);
+ logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+ logger.info("counter path {}", counterPath);
+
+ boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+ // calculate source record bytes size
+ final String bytesWrittenName = "byte-writer-counter";
+ final String recordCounterName = "record-counter";
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ if (!StringUtil.isEmpty(enableObjectReuseOptValue) &&
+ enableObjectReuseOptValue.equalsIgnoreCase("true")) {
+ env.getConfig().enableObjectReuse();
+ }
+
+ DataSet<String[]> recordDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
+
+ // read record from flat table
+ // output:
+ // 1, statistic
+ // 2, field value of dict col
+ // 3, min/max field value of not dict col
+ DataSet<Tuple2<SelfDefineSortableKey, Text>> flatOutputDataSet = recordDataSet.mapPartition(
+ new FlatOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent,
+ bytesWrittenName, recordCounterName));
+
+ // repartition data, make each reducer handle only one col data or the statistic data
+ DataSet<Tuple2<SelfDefineSortableKey, Text>> partitionDataSet = flatOutputDataSet
+ .partitionCustom(new FactDistinctColumnPartitioner(cubeName, metaUrl, sConf), 0)
+ .setParallelism(totalReducer);
+
+ // multiple output result
+ // 1, CFG_OUTPUT_COLUMN: field values of dict col, which will not be built in reducer, like globalDictCol
+ // 2, CFG_OUTPUT_DICT: dictionary object built in reducer
+ // 3, CFG_OUTPUT_STATISTICS: cube statistic: hll of cuboids ...
+ // 4, CFG_OUTPUT_PARTITION: dimension value range(min,max)
+ DataSet<Tuple2<String, Tuple3<Writable, Writable, String>>> outputDataSet = partitionDataSet
+ .mapPartition(new MultiOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent))
+ .setParallelism(totalReducer);
+
+ // make each reducer output to respective dir
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+ NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+ NullWritable.class, ArrayPrimitiveWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+ LongWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+ NullWritable.class, LongWritable.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
+ FileOutputFormat.setCompressOutput(job, false);
+
+ // prevent to create zero-sized default output
+ LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
+ outputDataSet.output(new HadoopMultipleOutputFormat(new LazyOutputFormat(), job));
+
+ JobExecutionResult jobExecutionResult =
+ env.execute("Fact distinct columns for:" + cubeName + " segment " + segmentId);
+ Map<String, Object> accumulatorResults = jobExecutionResult.getAllAccumulatorResults();
+ Long recordCount = (Long) accumulatorResults.get(recordCounterName);
+ Long bytesWritten = (Long) accumulatorResults.get(bytesWrittenName);
+ logger.info("Map input records={}", recordCount);
+ logger.info("HDFS Read: {} HDFS Write", bytesWritten);
+ logger.info("HDFS: Number of bytes written=" + FlinkBatchCubingJobBuilder2.getFileSize(outputPath, fs));
+
+ Map<String, String> counterMap = Maps.newHashMap();
+ counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
+ counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten));
+
+ // save counter to hdfs
+ HadoopUtil.writeToSequenceFile(job.getConfiguration(), counterPath, counterMap);
+ }
+
+
+ static class FlatOutputMapPartitionFunction extends
+ RichMapPartitionFunction<String[], Tuple2<SelfDefineSortableKey, Text>> {
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int samplingPercentage;
+ private String bytesWrittenName;
+ private String recordCounterName;
+ private LongCounter bytesWrittenCounter;
+ private LongCounter recordCounter;
+
+ private FactDistinctColumnsBase base;
+
+ public FlatOutputMapPartitionFunction(SerializableConfiguration conf, String cubeName, String segmentId,
+ String metaUrl, int samplingPercentage, String bytesWrittenName,
+ String recordCounterName) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ this.samplingPercentage = samplingPercentage;
+ this.bytesWrittenName = bytesWrittenName;
+ this.recordCounterName = recordCounterName;
+ this.bytesWrittenCounter = new LongCounter();
+ this.recordCounter = new LongCounter();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ getRuntimeContext().addAccumulator(bytesWrittenName, bytesWrittenCounter);
+ getRuntimeContext().addAccumulator(recordCounterName, recordCounter);
+ base = new FactDistinctColumnsBase(cubeName, segmentId, metaUrl, conf, samplingPercentage);
+ base.setupMap();
+ }
+
+ @Override
+ public void mapPartition(Iterable<String[]> values, Collector<Tuple2<SelfDefineSortableKey, Text>> out) throws Exception {
+ FactDistinctColumnsBase.Visitor visitor = new FactDistinctColumnsBase.Visitor<SelfDefineSortableKey, Text>() {
+ @Override
+ public void collect(String namedOutput, SelfDefineSortableKey key, Text value, String outputPath) {
+ out.collect(new Tuple2<>(key, value));
+ }
+ };
+
+ for (String[] row : values) {
+ bytesWrittenCounter.add(base.countSizeInBytes(row));
+ recordCounter.add(1L);
+ base.map(row, visitor);
+ }
+
+ base.postMap(visitor);
+ }
+ }
+
+ static class FactDistinctColumnPartitioner implements Partitioner<SelfDefineSortableKey> {
+ private transient volatile boolean initialized = false;
+ private String cubeName;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private transient FactDistinctColumnsReducerMapping reducerMapping;
+
+ public FactDistinctColumnPartitioner(String cubeName, String metaUrl, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ private void init() {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+ initialized = true;
+ }
+ }
+
+ @Override
+ public int partition(SelfDefineSortableKey key, int numPartitions) {
+ if (initialized == false) {
+ synchronized (FlinkFactDistinctColumns.class) {
+ if (initialized == false) {
+ init();
+ }
+ }
+ }
+
+ Text keyText = key.getText();
+ if (keyText.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+ Long cuboidId = Bytes.toLong(keyText.getBytes(), 1, Bytes.SIZEOF_LONG);
+ return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
+ } else {
+ return BytesUtil.readUnsigned(keyText.getBytes(), 0, 1);
+ }
+ }
+ }
+
+ static class MultiOutputMapPartitionFunction extends
+ RichMapPartitionFunction<Tuple2<SelfDefineSortableKey, Text>, Tuple2<String, Tuple3<Writable, Writable, String>>> {
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int samplingPercentage;
+
+ private FactDistinctColumnsBase base;
+
+ public MultiOutputMapPartitionFunction(SerializableConfiguration conf, String cubeName, String segmentId,
+ String metaUrl, int samplingPercentage) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ this.samplingPercentage = samplingPercentage;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ int taskId = getRuntimeContext().getIndexOfThisSubtask();
+ base = new FactDistinctColumnsBase(cubeName, segmentId, metaUrl, conf, samplingPercentage);
+ base.setupReduce(taskId);
+ }
+
+ @Override
+ public void mapPartition(Iterable<Tuple2<SelfDefineSortableKey, Text>> values,
+ Collector<Tuple2<String, Tuple3<Writable, Writable, String>>> out) throws Exception {
+ FactDistinctColumnsBase.Visitor visitor = new FactDistinctColumnsBase.Visitor<Writable, Writable>() {
+ @Override
+ public void collect(String namedOutput, Writable key, Writable value, String outputPath) {
+ out.collect(new Tuple2<>(namedOutput, new Tuple3<>(key, value, outputPath)));
+ }
+ };
+
+ for (Tuple2<SelfDefineSortableKey, Text> value : values) {
+ base.reduce(new Pair<>(value.f0, value.f1), visitor);
+ }
+
+ base.postReduce(visitor);
+ }
+ }
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java
new file mode 100644
index 0000000..0d25028
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.flink;
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter;
+import org.apache.hadoop.util.Progress;
+
+import java.io.IOException;
+
+public class HadoopMultipleOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<String, Tuple3<K, V, String>>> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected static final Object OPEN_MULTIPLE_MUTEX = new Object();
+ protected static final Object CLOSE_MULTIPLE_MUTEX = new Object();
+
+ protected MultipleOutputs writer;
+
+ public HadoopMultipleOutputFormat(OutputFormat<K, V> mapreduceOutputFormat, Job job) {
+ super(mapreduceOutputFormat, job);
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ super.open(taskNumber, numTasks);
+
+ synchronized (OPEN_MULTIPLE_MUTEX) {
+ try {
+ TaskInputOutputContext taskInputOutputContext = new ReduceContextImpl(configuration,
+ context.getTaskAttemptID(), new InputIterator(), new GenericCounter(), new GenericCounter(),
+ recordWriter, outputCommitter, new DummyReporter(), null,
+ BytesWritable.class, BytesWritable.class);
+ this.writer = new MultipleOutputs(taskInputOutputContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create MultipleOutputs.", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ synchronized (CLOSE_MULTIPLE_MUTEX) {
+ try {
+ this.writer.close();
+ } catch (InterruptedException e) {
+ throw new IOException("Could not close MultipleOutputs.", e);
+ }
+ }
+ }
+
+ @Override
+ public void writeRecord(Tuple2<String, Tuple3<K, V, String>> record) throws IOException {
+ try {
+ this.writer.write(record.f0, record.f1.f0, record.f1.f1, record.f1.f2);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static class InputIterator implements RawKeyValueIterator {
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return null;
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return null;
+ }
+
+ @Override
+ public Progress getProgress() {
+ return null;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java
new file mode 100644
index 0000000..26f0da3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.CuboidStatCalculator;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FactDistinctColumnsBase {
+ private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsBase.class);
+
+ private String cubeName;
+ private String segmentId;
+ private int samplingPercentage;
+ private KylinConfig envConfig;
+
+ // map
+ protected CubeInstance cube;
+ protected CubeSegment cubeSeg;
+ protected CubeDesc cubeDesc;
+ protected long baseCuboidId;
+ protected List<TblColRef> allCols;
+ protected CubeJoinedFlatTableEnrich intermediateTableDesc;
+ protected int[] columnIndex;
+ protected FactDistinctColumnsReducerMapping reducerMapping;
+ protected int nRowKey;
+ private Integer[][] allCuboidsBitSet = null;
+ private HLLCounter[] allCuboidsHLL = null;
+ private Long[] cuboidIds;
+ private int rowCount = 0;
+ private DictColDeduper dictColDeduper;
+ private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
+ private CuboidStatCalculator[] cuboidStatCalculators;
+ private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+ private ByteBuffer tmpbuf;
+ private Text outputKey;
+ private Text outputValue;
+ private Text emptyText;
+
+ // reduce
+ public static final String DICT_FILE_POSTFIX = ".rldict";
+ public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
+ private int taskId;
+ private boolean isStatistics = false;
+ private List<Long> baseCuboidRowCountInMappers;
+ protected Map<Long, HLLCounter> cuboidHLLMap = null;
+ private TblColRef col = null;
+ private long totalRowsBeforeMerge = 0;
+ // local build dict
+ private boolean buildDictInReducer;
+ private IDictionaryBuilder builder;
+ private String maxValue = null;
+ private String minValue = null;
+
+
+ public FactDistinctColumnsBase(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf,
+ int samplingPercentage) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.samplingPercentage = samplingPercentage;
+ this.envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ }
+
+ public void setupMap() {
+ outputKey = new Text();
+ outputValue = new Text();
+ emptyText = new Text();
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(envConfig)) {
+ cube = CubeManager.getInstance(envConfig).getCube(cubeName);
+ cubeSeg = cube.getSegmentById(segmentId);
+ cubeDesc = cube.getDescriptor();
+ baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+ allCols = reducerMapping.getAllDimDictCols();
+
+ intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
+ columnIndex = new int[allCols.size()];
+ for (int i = 0; i < allCols.size(); i++) {
+ TblColRef colRef = allCols.get(i);
+ int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+ columnIndex[i] = columnIndexOnFlatTbl;
+ }
+
+ tmpbuf = ByteBuffer.allocate(4096);
+ nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+ Set<Long> cuboidIdSet = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+ if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSeg)) {
+ // For cube planner, for every prebuilt cuboid, its related row count stats should be calculated
+ // If the precondition for trigger cube planner phase one is satisfied, we need to calculate row count stats for mandatory cuboids.
+ cuboidIdSet.addAll(cubeSeg.getCubeDesc().getMandatoryCuboids());
+ }
+ cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+ allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
+
+ allCuboidsHLL = new HLLCounter[cuboidIds.length];
+ for (int i = 0; i < cuboidIds.length; i++) {
+ allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
+ }
+
+ //for KYLIN-2518 backward compatibility
+ boolean isUsePutRowKeyToHllNewAlgorithm;
+ if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+ isUsePutRowKeyToHllNewAlgorithm = false;
+ logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+ } else {
+ isUsePutRowKeyToHllNewAlgorithm = true;
+ logger.info(
+ "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+ cubeDesc.getVersion());
+ }
+
+ int calculatorNum = getStatsThreadNum(cuboidIds.length);
+ cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
+ int splitSize = cuboidIds.length / calculatorNum;
+ if (splitSize <= 0) {
+ splitSize = 1;
+ }
+ for (int i = 0; i < calculatorNum; i++) {
+ HLLCounter[] cuboidsHLLSplit;
+ Integer[][] cuboidsBitSetSplit;
+ Long[] cuboidIdSplit;
+ int start = i * splitSize;
+ if (start >= cuboidIds.length) {
+ break;
+ }
+ int end = (i + 1) * splitSize;
+ if (i == calculatorNum - 1) {// last split
+ end = cuboidIds.length;
+ }
+
+ cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end);
+ cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end);
+ cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
+ CuboidStatCalculator calculator = new CuboidStatCalculator(i,
+ intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit,
+ isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
+ cuboidStatCalculators[i] = calculator;
+ calculator.start();
+ }
+
+ // setup dict col deduper
+ dictColDeduper = new DictColDeduper();
+ Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+ for (int i = 0; i < allCols.size(); i++) {
+ if (dictCols.contains(allCols.get(i)))
+ dictColDeduper.setIsDictCol(i);
+ }
+ }
+ }
+
+ private int getStatsThreadNum(int cuboidNum) {
+ int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatsCalculator();
+ if (unitNum <= 0) {
+ logger.warn("config from getCuboidNumberPerStatsCalculator() " + unitNum + " is should larger than 0");
+ logger.info("Will use single thread for cuboid statistics calculation");
+ return 1;
+ }
+
+ int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatsCalculatorMaxNumber();
+ int calculatorNum = (cuboidNum - 1) / unitNum + 1;
+ if (calculatorNum > maxCalculatorNum) {
+ calculatorNum = maxCalculatorNum;
+ }
+ return calculatorNum;
+ }
+
+ private int countNewSize(int oldSize, int dataSize) {
+ int newSize = oldSize * 2;
+ while (newSize < dataSize) {
+ newSize = newSize * 2;
+ }
+ return newSize;
+ }
+
+ private void writeFieldValue(DataType type, Integer colIndex, String value, Visitor visitor) {
+ int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+ tmpbuf.clear();
+ byte[] valueBytes = Bytes.toBytes(value);
+ int size = valueBytes.length + 1;
+ if (size >= tmpbuf.capacity()) {
+ tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+ }
+ tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+ tmpbuf.put(valueBytes);
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ sortableKey.init(outputKey, type);
+ visitor.collect(null, sortableKey, emptyText, null);
+ // log a few rows for troubleshooting
+ if (rowCount < 10) {
+ logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+ }
+ }
+
+ private void putRowKeyToHLL(String[] row) {
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ cuboidStatCalculator.putRow(row);
+ }
+ }
+
+ public long countSizeInBytes(String[] row) {
+ int size = 0;
+ for (String s : row) {
+ size += s == null ? 1 : StringUtil.utf8Length(s);
+ size++; // delimiter
+ }
+ return size;
+ }
+
+ public void map(String[] row, Visitor visitor) {
+ for (int i = 0; i < allCols.size(); i++) {
+ int colIndex = columnIndex[i];
+ int rowSize = row.length;
+ String fieldValue = " ";
+ if (colIndex <= rowSize - 1) {
+ fieldValue = row[colIndex];
+ } else {
+ logger.debug("colIndex:" + colIndex + " is more than rowSize: " + rowSize + " -1, so set empty value.");
+ }
+ if (fieldValue == null)
+ continue;
+
+ final DataType type = allCols.get(i).getType();
+
+ //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+ if (dictColDeduper.isDictCol(i)) {
+ if (dictColDeduper.add(i, fieldValue)) {
+ writeFieldValue(type, i, fieldValue, visitor);
+ }
+ } else {
+ DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+ if (old == null) {
+ old = new DimensionRangeInfo(fieldValue, fieldValue);
+ dimensionRangeInfoMap.put(i, old);
+ } else {
+ old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+ old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+ }
+ }
+
+ if (rowCount % 100 < samplingPercentage) {
+ putRowKeyToHLL(row);
+ }
+
+ if (rowCount % 100 == 0) {
+ dictColDeduper.resetIfShortOfMem();
+ }
+
+ rowCount++;
+ }
+ }
+
+ public void postMap(Visitor visitor) throws IOException {
+ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ cuboidStatCalculator.waitForCompletion();
+ }
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+ HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+ HLLCounter hll;
+
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = cuboidsHLL[i];
+ tmpbuf.clear();
+ tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+ tmpbuf.putLong(cuboidIds[i]);
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+ sortableKey.init(outputKey, (byte) 0);
+ visitor.collect(null, sortableKey, outputValue, null);
+ }
+ }
+ for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+ DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+ DataType dataType = allCols.get(colIndex).getType();
+ writeFieldValue(dataType, colIndex, rangeInfo.getMin(), visitor);
+ writeFieldValue(dataType, colIndex, rangeInfo.getMax(), visitor);
+ }
+ }
+
+
+ public void setupReduce(int taskId) throws IOException {
+ this.taskId = taskId;
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(envConfig)) {
+ cube = CubeManager.getInstance(envConfig).getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+ logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId));
+
+ if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+ // hll
+ isStatistics = true;
+ baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+ baseCuboidRowCountInMappers = Lists.newArrayList();
+ cuboidHLLMap = Maps.newHashMap();
+ logger.info("Reducer " + taskId + " handling stats");
+ } else {
+ // normal col
+ col = reducerMapping.getColForReducer(taskId);
+ Preconditions.checkNotNull(col);
+
+ // local build dict
+ buildDictInReducer = envConfig.isBuildDictInReducerEnabled();
+ if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
+ buildDictInReducer = false;
+ }
+ if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+ buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
+ }
+ if (buildDictInReducer) {
+ builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+ builder.init(null, 0, null);
+ }
+ logger.info("Reducer " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer);
+ }
+ }
+ }
+
+ public void reduce(Pair<SelfDefineSortableKey, Text> kv, Visitor visitor) throws IOException {
+ if (isStatistics) {
+ // for hll
+ long cuboidId = Bytes.toLong(kv.getFirst().getText().getBytes(), 1, Bytes.SIZEOF_LONG);
+ HLLCounter hll = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+ ByteBuffer bf = ByteBuffer.wrap(kv.getSecond().getBytes(), 0, kv.getSecond().getLength());
+ hll.readRegisters(bf);
+
+ totalRowsBeforeMerge += hll.getCountEstimate();
+
+ if (cuboidId == baseCuboidId) {
+ baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+ }
+
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ cuboidHLLMap.get(cuboidId).merge(hll);
+ } else {
+ cuboidHLLMap.put(cuboidId, hll);
+ }
+ } else {
+ String value = Bytes.toString(kv.getFirst().getText().getBytes(), 1, kv.getFirst().getText().getLength() - 1);
+ logAFewRows(value);
+ // if dimension col, compute max/min value
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) &&
+ col.getType().needCompare()) {
+ if (minValue == null || col.getType().compare(minValue, value) > 0) {
+ minValue = value;
+ }
+ if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+ maxValue = value;
+ }
+ }
+
+ // if dict column
+ if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+ if (buildDictInReducer) {
+ builder.addValue(value);
+ } else {
+ byte[] keyBytes = Bytes.copy(kv.getFirst().getText().getBytes(), 1, kv.getFirst().getText().getLength() - 1);
+ // output written to baseDir/colName/-r-00000 (etc)
+ String fileName = col.getIdentity() + "/";
+ visitor.collect(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+ }
+ }
+ rowCount++;
+ }
+ }
+
+ public void postReduce(Visitor visitor) throws IOException {
+ if (isStatistics) {
+ //output the hll info;
+ List<Long> allCuboids = Lists.newArrayList();
+ allCuboids.addAll(cuboidHLLMap.keySet());
+ Collections.sort(allCuboids);
+
+ logMapperAndCuboidStatistics(allCuboids); // for human check
+ outputStatistics(allCuboids, visitor);
+ } else {
+ // dimension col
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ outputDimRangeInfo(visitor);
+ }
+ // dic col
+ if (buildDictInReducer) {
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(envConfig)) {
+ Dictionary<String> dict = builder.build();
+ outputDict(col, dict, visitor);
+ }
+ }
+ }
+ }
+
+ private void logAFewRows(String value) {
+ if (rowCount < 10) {
+ logger.info("Received value: " + value);
+ }
+ }
+
+ private void outputDimRangeInfo(Visitor visitor) throws IOException {
+ if (col != null && minValue != null) {
+ // output written to baseDir/colName/colName.dci-r-00000 (etc)
+ String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+ visitor.collect(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(),
+ new Text(minValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
+ visitor.collect(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(),
+ new Text(maxValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
+ logger.info("write dimension range info for col : " + col.getName() + " minValue:" + minValue
+ + " maxValue:" + maxValue);
+ }
+ }
+
+ private void outputDict(TblColRef col, Dictionary<String> dict, Visitor visitor) throws IOException {
+ // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+ String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(baos)) {
+ outputStream.writeUTF(dict.getClass().getName());
+ dict.write(outputStream);
+
+ visitor.collect(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(),
+ new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
+ }
+ }
+
+ private void outputStatistics(List<Long> allCuboids, Visitor visitor) throws IOException {
+ // output written to baseDir/statistics/statistics-r-00000 (etc)
+ String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+
+ ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+ // mapper overlap ratio at key -1
+ long grandTotal = 0;
+ for (HLLCounter hll : cuboidHLLMap.values()) {
+ grandTotal += hll.getCountEstimate();
+ }
+ double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+ visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1),
+ new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
+
+ // mapper number at key -2
+ visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2),
+ new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
+
+ // sampling percentage at key 0
+ visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L),
+ new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
+
+ for (long i : allCuboids) {
+ valueBuf.clear();
+ cuboidHLLMap.get(i).writeRegisters(valueBuf);
+ valueBuf.flip();
+ visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i),
+ new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
+ }
+ }
+
+ private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
+ logger.info("Cuboid number for task: " + taskId + "\t" + allCuboids.size());
+ logger.info("Samping percentage: \t" + samplingPercentage);
+ logger.info("The following statistics are collected based on sampling data.");
+ logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+
+ for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+ if (baseCuboidRowCountInMappers.get(i) > 0) {
+ logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+ }
+ }
+
+ long grantTotal = 0;
+ for (long i : allCuboids) {
+ grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+ logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+ }
+
+ logger.info("Sum of row counts (before merge) is: \t " + totalRowsBeforeMerge);
+ logger.info("After merge, the row count: \t " + grantTotal);
+ }
+
+ public abstract static class Visitor<K, V> {
+ public abstract void collect(String namedOutput, K key, V value, String outputPath);
+ }
+}