You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/01/23 03:57:15 UTC
[05/14] kylin git commit: KYLIN-2242 write multiple files in
FactDistinctColumnsReducer with MultipleOutputs
KYLIN-2242 write multiple files in FactDistinctColumnsReducer with MultipleOutputs
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7de8aa12
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7de8aa12
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7de8aa12
Branch: refs/heads/master-hbase1.x
Commit: 7de8aa1203a72bad105ed692f7100535939b03af
Parents: c2229c9
Author: kangkaisen <ka...@live.com>
Authored: Sat Dec 17 14:12:48 2016 +0800
Committer: kangkaisen <ka...@163.com>
Committed: Sat Jan 21 23:19:50 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/HadoopUtil.java | 16 ++
.../kylin/engine/mr/JobBuilderSupport.java | 2 +-
.../kylin/engine/mr/common/BatchConstants.java | 9 +-
.../engine/mr/steps/CreateDictionaryJob.java | 43 ++--
.../engine/mr/steps/FactDistinctColumnsJob.java | 32 ++-
.../mr/steps/FactDistinctColumnsReducer.java | 240 +++++++------------
.../engine/mr/steps/SaveStatisticsStep.java | 10 +-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 10 +-
8 files changed, 175 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index bdc4c3e..b9ffe38 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -26,8 +26,10 @@ import java.net.URISyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
import org.apache.kylin.common.KylinConfig;
import org.slf4j.Logger;
@@ -140,4 +142,18 @@ public class HadoopUtil {
}
}
+ public static Path getFilterOnlyPath(FileSystem fs, Path baseDir, final String filter) throws IOException {
+ FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(filter);
+ }
+ });
+
+ if (fileStatus.length == 1) {
+ return fileStatus[0].getPath();
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 696b22a..c34a904 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -171,7 +171,7 @@ public class JobBuilderSupport {
}
public String getStatisticsPath(String jobId) {
- return getRealizationRootPath(jobId) + "/statistics";
+ return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS;
}
// ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 0281539..602b4bb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -53,9 +53,16 @@ public interface BatchConstants {
String CFG_STATISTICS_ENABLED = "statistics.enabled";
String CFG_STATISTICS_OUTPUT = "statistics.ouput";//spell error, for compatibility issue better not change it
String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
- String CFG_STATISTICS_CUBE_ESTIMATION_FILENAME = "cube_statistics.txt";
String CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME = "cuboid_statistics.seq";
+ String CFG_MAPRED_OUTPUT_COMPRESS = "mapred.output.compress";
+
+ String CFG_OUTPUT_COLUMN = "column";
+ String CFG_OUTPUT_DICT = "dict";
+ String CFG_OUTPUT_STATISTICS = "statistics";
+ String CFG_OUTPUT_PARTITION = "partition";
+
+
/**
* command line ARGuments
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 95d8cb1..e5d053b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -18,15 +18,20 @@
package org.apache.kylin.engine.mr.steps;
+import java.io.DataInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.commons.cli.Options;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ByteBufferBackedInputStream;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
@@ -63,21 +68,27 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
@Override
public Dictionary<String> getDictionary(TblColRef col) throws IOException {
- Path dictFile = new Path(factColumnsInputPath, col.getIdentity() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- if (fs.exists(dictFile) == false)
+ Path colDir = new Path(factColumnsInputPath, col.getName());
+ FileSystem fs = HadoopUtil.getFileSystem(colDir.toString());
+
+ Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+ if (dictFile == null) {
return null;
-
- FSDataInputStream is = null;
- try {
- is = fs.open(dictFile);
- String dictClassName = is.readUTF();
- Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
- dict.readFields(is);
- logger.info("DictionaryProvider read dict from file: " + dictFile);
- return dict;
- } finally {
- IOUtils.closeQuietly(is);
+ }
+
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), SequenceFile.Reader.file(dictFile))) {
+ NullWritable key = NullWritable.get();
+ BytesWritable value = new BytesWritable();
+ reader.next(key, value);
+
+ ByteBuffer buffer = new ByteArray(value.getBytes()).asBuffer();
+ try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(buffer))) {
+ String dictClassName = is.readUTF();
+ Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
+ dict.readFields(is);
+ logger.info("DictionaryProvider read dict from file: " + dictFile);
+ return dict;
+ }
}
}
});
http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index ce01eb6..aded600 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -23,11 +23,16 @@ import java.util.List;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
@@ -82,8 +87,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
int uhcReducerCount = cube.getConfig().getUHCReducerCount();
int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
- for(int index : uhcIndex) {
- if(index == 1) {
+ for (int index : uhcIndex) {
+ if (index == 1) {
reducerCount += uhcReducerCount - 1;
}
}
@@ -92,7 +97,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
}
-
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
@@ -117,6 +121,12 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
attachCubeMetadata(cube, job.getConfiguration());
+ /**
+ * don't compress the reducer output so that {@link CreateDictionaryJob} and {@link UpdateCubeInfoAfterBuildStep}
+ * could read the reducer file directly
+ */
+ job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false");
+
return waitForCompletion(job);
} finally {
@@ -138,18 +148,22 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
private void setupReducer(Path output, int numberOfReducers) throws IOException {
job.setReducerClass(FactDistinctColumnsReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
- // important, reducer writes HDFS directly at the moment
- job.setReduceSpeculativeExecution(false);
-
+ //make each reducer output to respective dir
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
+
+
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+ //prevent to create zero-sized default output
+ LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
deletePath(job.getConfiguration(), output);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 711d991..5d2fb72 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -18,27 +18,25 @@
package org.apache.kylin.engine.mr.steps;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
@@ -47,7 +45,7 @@ import org.apache.kylin.dict.IDictionaryBuilder;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -63,14 +61,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
private List<TblColRef> columnList;
- private String statisticsOutput = null;
private List<Long> baseCuboidRowCountInMappers;
protected Map<Long, HLLCounter> cuboidHLLMap = null;
protected long baseCuboidId;
protected CubeDesc cubeDesc;
private long totalRowsBeforeMerge = 0;
private int samplingPercentage;
- private List<ByteArray> colValues;
private TblColRef col = null;
private boolean isStatistics = false;
private KylinConfig cubeConfig;
@@ -88,10 +84,14 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
public static final String DICT_FILE_POSTFIX = ".rldict";
public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
+ private MultipleOutputs mos;
+
@Override
protected void setup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
Configuration conf = context.getConfiguration();
+ mos = new MultipleOutputs(context);
+
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
@@ -109,26 +109,20 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
if (collectStatistics && (taskId == numberOfTasks - 1)) {
// hll
isStatistics = true;
- statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
baseCuboidRowCountInMappers = Lists.newArrayList();
cuboidHLLMap = Maps.newHashMap();
samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
logger.info("Reducer " + taskId + " handling stats");
} else if (collectStatistics && (taskId == numberOfTasks - 2)) {
// partition col
- isStatistics = false;
isPartitionCol = true;
col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
if (col == null) {
logger.info("Do not have partition col. This reducer will keep empty");
}
- colValues = Lists.newLinkedList();
- logger.info("Reducer " + taskId + " handling partition column " + col);
} else {
// normal col
- isStatistics = false;
col = columnList.get(reducerIdToColumnIndex.get(taskId));
- colValues = Lists.newLinkedList();
// local build dict
isReducerLocalBuildDict = config.isReducerLocalBuildDict();
@@ -194,15 +188,13 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
logAFewRows(value);
builder.addValue(value);
} else {
- colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
- if (colValues.size() == 1000000) { //spill every 1 million
- logger.info("spill values to disk...");
- outputDistinctValues(col, colValues, context);
- colValues.clear();
- }
+ byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+ // output written to baseDir/colName/-r-00000 (etc)
+ String fileName = col.getName() + "/";
+ mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
}
}
-
+
rowCount++;
}
@@ -212,162 +204,104 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
}
}
- private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException {
- final Configuration conf = context.getConfiguration();
- final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
- final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
- final Path colDir = new Path(outputPath, col.getIdentity());
- final String fileName = col.getIdentity() + "-" + taskId % uhcReducerCount;
- final Path outputFile = new Path(colDir, fileName);
-
- FSDataOutputStream out = null;
- try {
- if (!fs.exists(colDir)) {
- fs.mkdirs(colDir);
- }
-
- if (fs.exists(outputFile)) {
- out = fs.append(outputFile);
- logger.info("append file " + outputFile);
- } else {
- out = fs.create(outputFile);
- logger.info("create file " + outputFile);
- }
-
- for (ByteArray value : values) {
- out.write(value.array(), value.offset(), value.length());
- out.write('\n');
- }
- } finally {
- IOUtils.closeQuietly(out);
- }
- }
-
- private void outputDict(TblColRef col, Dictionary<String> dict, Context context) throws IOException {
- final String fileName = col.getIdentity() + DICT_FILE_POSTFIX;
- FSDataOutputStream out = getOutputStream(context, fileName);
- try {
- String dictClassName = dict.getClass().getName();
- out.writeUTF(dictClassName);
- dict.write(out);
- logger.info("reducer id is:+" + taskId + " colName:" + col.getName() + " writing dict at file : " + fileName + " dict class:" + dictClassName);
- } finally {
- IOUtils.closeQuietly(out);
- }
- }
-
- private void outputPartitionInfo(Context context) throws IOException {
- final String fileName = col.getIdentity() + PARTITION_COL_INFO_FILE_POSTFIX;
- FSDataOutputStream out = getOutputStream(context, fileName);
- try {
- out.writeLong(timeMinValue);
- out.writeLong(timeMaxValue);
- logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
- } finally {
- IOUtils.closeQuietly(out);
- }
- }
-
- private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws IOException {
- final Configuration conf = context.getConfiguration();
- final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
- final Path outputPath = new Path(conf.get(BatchConstants.CFG_OUTPUT_PATH));
- final Path outputFile = new Path(outputPath, outputFileName);
- if (!fs.exists(outputPath)) {
- fs.mkdirs(outputPath);
- }
- FSDataOutputStream out = fs.create(outputFile);
- return out;
- }
-
@Override
protected void doCleanup(Context context) throws IOException, InterruptedException {
if (isStatistics) {
- // output the hll info
- long grandTotal = 0;
- for (HLLCounter hll : cuboidHLLMap.values()) {
- grandTotal += hll.getCountEstimate();
- }
- double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
-
- int mapperNumber = baseCuboidRowCountInMappers.size();
+ //output the hll info;
+ List<Long> allCuboids = Lists.newArrayList();
+ allCuboids.addAll(cuboidHLLMap.keySet());
+ Collections.sort(allCuboids);
- writeMapperAndCuboidStatistics(context); // for human check
- CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), //
- cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
+ logMapperAndCuboidStatistics(allCuboids); // for human check
+ outputStatistics(allCuboids);
} else if (isPartitionCol) {
// partition col
- if (col != null) {
- outputPartitionInfo(context);
- }
+ outputPartitionInfo();
} else {
// normal col
if (isReducerLocalBuildDict) {
Dictionary<String> dict = builder.build();
- outputDict(col, dict, context);
- } else {
- if (colValues.size() > 0) {
- outputDistinctValues(col, colValues, context);
- colValues.clear();
- }
+ outputDict(col, dict);
}
}
+
+ mos.close();
}
- private void writeMapperAndCuboidStatistics(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
- Path path = new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME);
- FSDataOutputStream out = fs.create(path);
+ private void outputPartitionInfo() throws IOException, InterruptedException {
+ if (col != null) {
+ // output written to baseDir/colName/colName.pci-r-00000 (etc)
+ String partitionFileName = col.getName() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
- try {
- String msg;
+ mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
+ logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+ }
+ }
- List<Long> allCuboids = Lists.newArrayList();
- allCuboids.addAll(cuboidHLLMap.keySet());
- Collections.sort(allCuboids);
+ private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
+ // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+ String dictFileName = col.getName() + "/" + col.getName() + DICT_FILE_POSTFIX;
- msg = "Total cuboid number: \t" + allCuboids.size();
- writeLine(out, msg);
- msg = "Samping percentage: \t" + samplingPercentage;
- writeLine(out, msg);
-
- writeLine(out, "The following statistics are collected based on sampling data.");
- writeLine(out, "Number of Mappers: " + baseCuboidRowCountInMappers.size());
- for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
- if (baseCuboidRowCountInMappers.get(i) > 0) {
- msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i);
- writeLine(out, msg);
- }
- }
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
+ outputStream.writeUTF(dict.getClass().getName());
+ dict.write(outputStream);
- long grantTotal = 0;
- for (long i : allCuboids) {
- grantTotal += cuboidHLLMap.get(i).getCountEstimate();
- msg = "Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate();
- writeLine(out, msg);
- }
+ mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName);
+ }
+ }
- msg = "Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge;
- writeLine(out, msg);
+ private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
+ // output written to baseDir/statistics/statistics-r-00000 (etc)
+ String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
- msg = "After merge, the cube has row count: \t " + grantTotal;
- writeLine(out, msg);
+ ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
- if (grantTotal > 0) {
- msg = "The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal;
- writeLine(out, msg);
- }
+ // mapper overlap ratio at key -1
+ long grandTotal = 0;
+ for (HLLCounter hll : cuboidHLLMap.values()) {
+ grandTotal += hll.getCountEstimate();
+ }
+ double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+ mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
+
+ // mapper number at key -2
+ mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
+
+ // sampling percentage at key 0
+ mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
- } finally {
- IOUtils.closeQuietly(out);
+ for (long i : allCuboids) {
+ valueBuf.clear();
+ cuboidHLLMap.get(i).writeRegisters(valueBuf);
+ valueBuf.flip();
+ mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
}
}
- private void writeLine(FSDataOutputStream out, String msg) throws IOException {
- out.write(msg.getBytes());
- out.write('\n');
+ private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
+ logger.info("Total cuboid number: \t" + allCuboids.size());
+ logger.info("Samping percentage: \t" + samplingPercentage);
+ logger.info("The following statistics are collected based on sampling data.");
+ logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+ for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+ if (baseCuboidRowCountInMappers.get(i) > 0) {
+ logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+ }
+ }
+
+ long grantTotal = 0;
+ for (long i : allCuboids) {
+ grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+ logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+ }
+
+ logger.info("Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge);
+ logger.info("After merge, the cube has row count: \t " + grantTotal);
+ if (grantTotal > 0) {
+ logger.info("The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 2671042..28f99fb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -60,9 +60,11 @@ public class SaveStatisticsStep extends AbstractExecutable {
ResourceStore rs = ResourceStore.getStore(kylinConf);
try {
FileSystem fs = HadoopUtil.getWorkingFileSystem();
- Path statisticsFilePath = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
- if (!fs.exists(statisticsFilePath))
- throw new IOException("File " + statisticsFilePath + " does not exists");
+ Path statisticsDir = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()));
+ Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS);
+ if (statisticsFilePath == null) {
+ throw new IOException("fail to find the statistics file in base dir: " + statisticsDir);
+ }
FSDataInputStream is = fs.open(statisticsFilePath);
try {
@@ -110,7 +112,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold();
logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold);
-
+
// in-mem cubing is good when
// 1) the cluster has enough mapper slots to run in parallel
// 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage
http://git-wip-us.apache.org/repos/asf/kylin/blob/7de8aa12/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index dc80399..81d5c42 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -80,8 +80,13 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
private void updateTimeRange(CubeSegment segment) throws IOException {
final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
- Path outputFile = new Path(factColumnsInputPath, partitionCol.getIdentity() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
- FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString());
+ Path colDir = new Path(factColumnsInputPath, partitionCol.getName());
+ FileSystem fs = HadoopUtil.getFileSystem(colDir.toString());
+ Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
+ if (outputFile == null) {
+ throw new IOException("fail to find the partition file in base dir: " + colDir);
+ }
+
FSDataInputStream is = null;
long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
try {
@@ -97,5 +102,4 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
segment.setDateRangeStart(minValue);
segment.setDateRangeEnd(maxValue);
}
-
}