You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/04 04:43:46 UTC
[17/47] kylin git commit: Revert "KYLIN-2242 write multiple files in
FactDistinctColumnsReducer with MultipleOutputs"
Revert "KYLIN-2242 write multiple files in FactDistinctColumnsReducer with MultipleOutputs"
This reverts commit 7de8aa1203a72bad105ed692f7100535939b03af.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7576a09f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7576a09f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7576a09f
Branch: refs/heads/KYLIN-2361
Commit: 7576a09f29b75f2eb80a1b5ca9c65fe5c81b1144
Parents: 7de8aa1
Author: shaofengshi <sh...@apache.org>
Authored: Sun Jan 22 11:38:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Jan 22 11:38:17 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/HadoopUtil.java | 16 --
.../kylin/engine/mr/JobBuilderSupport.java | 2 +-
.../kylin/engine/mr/common/BatchConstants.java | 9 +-
.../engine/mr/steps/CreateDictionaryJob.java | 43 ++--
.../engine/mr/steps/FactDistinctColumnsJob.java | 32 +--
.../mr/steps/FactDistinctColumnsReducer.java | 240 ++++++++++++-------
.../engine/mr/steps/SaveStatisticsStep.java | 10 +-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 10 +-
8 files changed, 187 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index b9ffe38..bdc4c3e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -26,10 +26,8 @@ import java.net.URISyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
import org.apache.kylin.common.KylinConfig;
import org.slf4j.Logger;
@@ -142,18 +140,4 @@ public class HadoopUtil {
}
}
- public static Path getFilterOnlyPath(FileSystem fs, Path baseDir, final String filter) throws IOException {
- FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith(filter);
- }
- });
-
- if (fileStatus.length == 1) {
- return fileStatus[0].getPath();
- } else {
- return null;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index c34a904..696b22a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -171,7 +171,7 @@ public class JobBuilderSupport {
}
public String getStatisticsPath(String jobId) {
- return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+ return getRealizationRootPath(jobId) + "/statistics";
}
// ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 602b4bb..0281539 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -53,16 +53,9 @@ public interface BatchConstants {
String CFG_STATISTICS_ENABLED = "statistics.enabled";
String CFG_STATISTICS_OUTPUT = "statistics.ouput";//spell error, for compatibility issue better not change it
String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
+ String CFG_STATISTICS_CUBE_ESTIMATION_FILENAME = "cube_statistics.txt";
String CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME = "cuboid_statistics.seq";
- String CFG_MAPRED_OUTPUT_COMPRESS = "mapred.output.compress";
-
- String CFG_OUTPUT_COLUMN = "column";
- String CFG_OUTPUT_DICT = "dict";
- String CFG_OUTPUT_STATISTICS = "statistics";
- String CFG_OUTPUT_PARTITION = "partition";
-
-
/**
* command line ARGuments
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index e5d053b..95d8cb1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -18,20 +18,15 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.DataInputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ByteBufferBackedInputStream;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
@@ -68,27 +63,21 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
@Override
public Dictionary<String> getDictionary(TblColRef col) throws IOException {
- Path colDir = new Path(factColumnsInputPath, col.getName());
- FileSystem fs = HadoopUtil.getFileSystem(colDir.toString());
-
- Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
- if (dictFile == null) {
+ Path dictFile = new Path(factColumnsInputPath, col.getIdentity() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ if (fs.exists(dictFile) == false)
return null;
- }
-
- try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), SequenceFile.Reader.file(dictFile))) {
- NullWritable key = NullWritable.get();
- BytesWritable value = new BytesWritable();
- reader.next(key, value);
-
- ByteBuffer buffer = new ByteArray(value.getBytes()).asBuffer();
- try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(buffer))) {
- String dictClassName = is.readUTF();
- Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
- dict.readFields(is);
- logger.info("DictionaryProvider read dict from file: " + dictFile);
- return dict;
- }
+
+ FSDataInputStream is = null;
+ try {
+ is = fs.open(dictFile);
+ String dictClassName = is.readUTF();
+ Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
+ dict.readFields(is);
+ logger.info("DictionaryProvider read dict from file: " + dictFile);
+ return dict;
+ } finally {
+ IOUtils.closeQuietly(is);
}
}
});
http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index aded600..ce01eb6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -23,16 +23,11 @@ import java.util.List;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
@@ -87,8 +82,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
int uhcReducerCount = cube.getConfig().getUHCReducerCount();
int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
- for (int index : uhcIndex) {
- if (index == 1) {
+ for(int index : uhcIndex) {
+ if(index == 1) {
reducerCount += uhcReducerCount - 1;
}
}
@@ -97,6 +92,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
}
+
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
@@ -121,12 +117,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
attachCubeMetadata(cube, job.getConfiguration());
- /**
- * don't compress the reducer output so that {@link CreateDictionaryJob} and {@link UpdateCubeInfoAfterBuildStep}
- * could read the reducer file directly
- */
- job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false");
-
return waitForCompletion(job);
} finally {
@@ -148,22 +138,18 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
private void setupReducer(Path output, int numberOfReducers) throws IOException {
job.setReducerClass(FactDistinctColumnsReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
- //make each reducer output to respective dir
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
-
-
+ // important, reducer writes HDFS directly at the moment
+ job.setReduceSpeculativeExecution(false);
+
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
- //prevent to create zero-sized default output
- LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
-
deletePath(job.getConfiguration(), output);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 5d2fb72..711d991 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -18,25 +18,27 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
@@ -45,7 +47,7 @@ import org.apache.kylin.dict.IDictionaryBuilder;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -61,12 +63,14 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
private List<TblColRef> columnList;
+ private String statisticsOutput = null;
private List<Long> baseCuboidRowCountInMappers;
protected Map<Long, HLLCounter> cuboidHLLMap = null;
protected long baseCuboidId;
protected CubeDesc cubeDesc;
private long totalRowsBeforeMerge = 0;
private int samplingPercentage;
+ private List<ByteArray> colValues;
private TblColRef col = null;
private boolean isStatistics = false;
private KylinConfig cubeConfig;
@@ -84,14 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
public static final String DICT_FILE_POSTFIX = ".rldict";
public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
- private MultipleOutputs mos;
-
@Override
protected void setup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
Configuration conf = context.getConfiguration();
- mos = new MultipleOutputs(context);
-
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
@@ -109,20 +109,26 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
if (collectStatistics && (taskId == numberOfTasks - 1)) {
// hll
isStatistics = true;
+ statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
baseCuboidRowCountInMappers = Lists.newArrayList();
cuboidHLLMap = Maps.newHashMap();
samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
logger.info("Reducer " + taskId + " handling stats");
} else if (collectStatistics && (taskId == numberOfTasks - 2)) {
// partition col
+ isStatistics = false;
isPartitionCol = true;
col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
if (col == null) {
logger.info("Do not have partition col. This reducer will keep empty");
}
+ colValues = Lists.newLinkedList();
+ logger.info("Reducer " + taskId + " handling partition column " + col);
} else {
// normal col
+ isStatistics = false;
col = columnList.get(reducerIdToColumnIndex.get(taskId));
+ colValues = Lists.newLinkedList();
// local build dict
isReducerLocalBuildDict = config.isReducerLocalBuildDict();
@@ -188,13 +194,15 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
logAFewRows(value);
builder.addValue(value);
} else {
- byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
- // output written to baseDir/colName/-r-00000 (etc)
- String fileName = col.getName() + "/";
- mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+ colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
+ if (colValues.size() == 1000000) { //spill every 1 million
+ logger.info("spill values to disk...");
+ outputDistinctValues(col, colValues, context);
+ colValues.clear();
+ }
}
}
-
+
rowCount++;
}
@@ -204,104 +212,162 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
}
}
- @Override
- protected void doCleanup(Context context) throws IOException, InterruptedException {
- if (isStatistics) {
- //output the hll info;
- List<Long> allCuboids = Lists.newArrayList();
- allCuboids.addAll(cuboidHLLMap.keySet());
- Collections.sort(allCuboids);
+ private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException {
+ final Configuration conf = context.getConfiguration();
+ final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
+ final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
+ final Path colDir = new Path(outputPath, col.getIdentity());
+ final String fileName = col.getIdentity() + "-" + taskId % uhcReducerCount;
+ final Path outputFile = new Path(colDir, fileName);
+
+ FSDataOutputStream out = null;
+ try {
+ if (!fs.exists(colDir)) {
+ fs.mkdirs(colDir);
+ }
- logMapperAndCuboidStatistics(allCuboids); // for human check
- outputStatistics(allCuboids);
- } else if (isPartitionCol) {
- // partition col
- outputPartitionInfo();
- } else {
- // normal col
- if (isReducerLocalBuildDict) {
- Dictionary<String> dict = builder.build();
- outputDict(col, dict);
+ if (fs.exists(outputFile)) {
+ out = fs.append(outputFile);
+ logger.info("append file " + outputFile);
+ } else {
+ out = fs.create(outputFile);
+ logger.info("create file " + outputFile);
}
- }
- mos.close();
+ for (ByteArray value : values) {
+ out.write(value.array(), value.offset(), value.length());
+ out.write('\n');
+ }
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
}
- private void outputPartitionInfo() throws IOException, InterruptedException {
- if (col != null) {
- // output written to baseDir/colName/colName.pci-r-00000 (etc)
- String partitionFileName = col.getName() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
+ private void outputDict(TblColRef col, Dictionary<String> dict, Context context) throws IOException {
+ final String fileName = col.getIdentity() + DICT_FILE_POSTFIX;
+ FSDataOutputStream out = getOutputStream(context, fileName);
+ try {
+ String dictClassName = dict.getClass().getName();
+ out.writeUTF(dictClassName);
+ dict.write(out);
+ logger.info("reducer id is:+" + taskId + " colName:" + col.getName() + " writing dict at file : " + fileName + " dict class:" + dictClassName);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
- mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
- mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
+ private void outputPartitionInfo(Context context) throws IOException {
+ final String fileName = col.getIdentity() + PARTITION_COL_INFO_FILE_POSTFIX;
+ FSDataOutputStream out = getOutputStream(context, fileName);
+ try {
+ out.writeLong(timeMinValue);
+ out.writeLong(timeMaxValue);
logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws IOException {
+ final Configuration conf = context.getConfiguration();
+ final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
+ final Path outputPath = new Path(conf.get(BatchConstants.CFG_OUTPUT_PATH));
+ final Path outputFile = new Path(outputPath, outputFileName);
+ if (!fs.exists(outputPath)) {
+ fs.mkdirs(outputPath);
}
+ FSDataOutputStream out = fs.create(outputFile);
+ return out;
}
- private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
- // output written to baseDir/colName/colName.rldict-r-00000 (etc)
- String dictFileName = col.getName() + "/" + col.getName() + DICT_FILE_POSTFIX;
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ if (isStatistics) {
+ // output the hll info
+ long grandTotal = 0;
+ for (HLLCounter hll : cuboidHLLMap.values()) {
+ grandTotal += hll.getCountEstimate();
+ }
+ double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
- outputStream.writeUTF(dict.getClass().getName());
- dict.write(outputStream);
+ int mapperNumber = baseCuboidRowCountInMappers.size();
- mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName);
+ writeMapperAndCuboidStatistics(context); // for human check
+ CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), //
+ cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
+ } else if (isPartitionCol) {
+ // partition col
+ if (col != null) {
+ outputPartitionInfo(context);
+ }
+ } else {
+ // normal col
+ if (isReducerLocalBuildDict) {
+ Dictionary<String> dict = builder.build();
+ outputDict(col, dict, context);
+ } else {
+ if (colValues.size() > 0) {
+ outputDistinctValues(col, colValues, context);
+ colValues.clear();
+ }
+ }
}
}
- private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
- // output written to baseDir/statistics/statistics-r-00000 (etc)
- String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+ private void writeMapperAndCuboidStatistics(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
+ Path path = new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME);
+ FSDataOutputStream out = fs.create(path);
- ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ try {
+ String msg;
- // mapper overlap ratio at key -1
- long grandTotal = 0;
- for (HLLCounter hll : cuboidHLLMap.values()) {
- grandTotal += hll.getCountEstimate();
- }
- double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
- mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
+ List<Long> allCuboids = Lists.newArrayList();
+ allCuboids.addAll(cuboidHLLMap.keySet());
+ Collections.sort(allCuboids);
- // mapper number at key -2
- mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
+ msg = "Total cuboid number: \t" + allCuboids.size();
+ writeLine(out, msg);
+ msg = "Samping percentage: \t" + samplingPercentage;
+ writeLine(out, msg);
+
+ writeLine(out, "The following statistics are collected based on sampling data.");
+ writeLine(out, "Number of Mappers: " + baseCuboidRowCountInMappers.size());
+ for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+ if (baseCuboidRowCountInMappers.get(i) > 0) {
+ msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i);
+ writeLine(out, msg);
+ }
+ }
- // sampling percentage at key 0
- mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
+ long grantTotal = 0;
+ for (long i : allCuboids) {
+ grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+ msg = "Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate();
+ writeLine(out, msg);
+ }
- for (long i : allCuboids) {
- valueBuf.clear();
- cuboidHLLMap.get(i).writeRegisters(valueBuf);
- valueBuf.flip();
- mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
- }
- }
+ msg = "Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge;
+ writeLine(out, msg);
- private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
- logger.info("Total cuboid number: \t" + allCuboids.size());
- logger.info("Samping percentage: \t" + samplingPercentage);
- logger.info("The following statistics are collected based on sampling data.");
- logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+ msg = "After merge, the cube has row count: \t " + grantTotal;
+ writeLine(out, msg);
- for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
- if (baseCuboidRowCountInMappers.get(i) > 0) {
- logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+ if (grantTotal > 0) {
+ msg = "The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal;
+ writeLine(out, msg);
}
- }
- long grantTotal = 0;
- for (long i : allCuboids) {
- grantTotal += cuboidHLLMap.get(i).getCountEstimate();
- logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+ } finally {
+ IOUtils.closeQuietly(out);
}
+ }
+
+ private void writeLine(FSDataOutputStream out, String msg) throws IOException {
+ out.write(msg.getBytes());
+ out.write('\n');
- logger.info("Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge);
- logger.info("After merge, the cube has row count: \t " + grantTotal);
- if (grantTotal > 0) {
- logger.info("The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal);
- }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 28f99fb..2671042 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -60,11 +60,9 @@ public class SaveStatisticsStep extends AbstractExecutable {
ResourceStore rs = ResourceStore.getStore(kylinConf);
try {
FileSystem fs = HadoopUtil.getWorkingFileSystem();
- Path statisticsDir = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()));
- Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS);
- if (statisticsFilePath == null) {
- throw new IOException("fail to find the statistics file in base dir: " + statisticsDir);
- }
+ Path statisticsFilePath = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ if (!fs.exists(statisticsFilePath))
+ throw new IOException("File " + statisticsFilePath + " does not exists");
FSDataInputStream is = fs.open(statisticsFilePath);
try {
@@ -112,7 +110,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold();
logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold);
-
+
// in-mem cubing is good when
// 1) the cluster has enough mapper slots to run in parallel
// 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage
http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 81d5c42..dc80399 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -80,13 +80,8 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
private void updateTimeRange(CubeSegment segment) throws IOException {
final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
- Path colDir = new Path(factColumnsInputPath, partitionCol.getName());
- FileSystem fs = HadoopUtil.getFileSystem(colDir.toString());
- Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
- if (outputFile == null) {
- throw new IOException("fail to find the partition file in base dir: " + colDir);
- }
-
+ Path outputFile = new Path(factColumnsInputPath, partitionCol.getIdentity() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
+ FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString());
FSDataInputStream is = null;
long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
try {
@@ -102,4 +97,5 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
segment.setDateRangeStart(minValue);
segment.setDateRangeEnd(maxValue);
}
+
}