You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:50 UTC
[13/50] incubator-kylin git commit: Collect cuboid statistics during
fetching distinct columns;
Collect cuboid statistics during fetching distinct columns;
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d4a271df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d4a271df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d4a271df
Branch: refs/heads/streaming-localdict
Commit: d4a271df9d9b055e44d1a6fc1e3cc3055e14c2bd
Parents: 7360f5b
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Thu Mar 26 23:15:57 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Thu Mar 26 23:15:57 2015 +0800
----------------------------------------------------------------------
.../cube/FactDistinctColumnsMapperBase.java | 5 +-
.../hadoop/cube/FactDistinctColumnsReducer.java | 53 ++++++++++++++------
.../cube/FactDistinctHiveColumnsMapper.java | 51 +++++++++++++------
3 files changed, 77 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d4a271df/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
index c0455ff..9945769 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
@@ -5,6 +5,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.hive.hcatalog.data.HCatRecord;
@@ -23,7 +24,7 @@ import org.apache.kylin.metadata.model.TblColRef;
/**
* Created by Hongbin Ma(Binmahone) on 3/26/15.
*/
-public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, ShortWritable, Text> {
+public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, LongWritable, Text> {
protected String cubeName;
protected CubeInstance cube;
@@ -32,7 +33,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
protected List<TblColRef> columns;
protected ArrayList<Integer> factDictCols;
- protected ShortWritable outputKey = new ShortWritable();
+ protected LongWritable outputKey = new LongWritable();
protected Text outputValue = new Text();
protected int errorRecordCounter =0;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d4a271df/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 2052d08..e1529d3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -19,13 +19,14 @@
package org.apache.kylin.job.hadoop.cube;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.KylinConfig;
@@ -44,17 +45,20 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
/**
* @author yangli9
*/
-public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
+public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text, NullWritable, Text> {
private List<TblColRef> columnList = new ArrayList<TblColRef>();
private boolean collectStatistics = false;
private String statisticsOutput = null;
private List<Long> rowKeyCountInMappers;
- private HyperLogLogPlusCounter totalHll;
+ private Map<Long, Long> rowKeyCountInCuboids;
+ protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
+ protected long baseCuboidId;
@Override
protected void setup(Context context) throws IOException {
@@ -66,23 +70,24 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeDesc cubeDesc = cube.getDescriptor();
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
columnList = baseCuboid.getColumns();
collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
if (collectStatistics) {
- totalHll = new HyperLogLogPlusCounter(16);
rowKeyCountInMappers = Lists.newArrayList();
+ rowKeyCountInCuboids = Maps.newHashMap();
+ cuboidHLLMap = Maps.newHashMap();
}
}
@Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
if (key.get() >= 0) {
- TblColRef col = columnList.get(key.get());
+ TblColRef col = columnList.get((int) key.get());
HashSet<ByteArray> set = new HashSet<ByteArray>();
for (Text textValue : values) {
@@ -105,26 +110,38 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
}
} else {
// for hll
+ long cuboidId = 0 - key.get();
+
for (Text value : values) {
HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
ByteArray byteArray = new ByteArray(value.getBytes());
hll.readRegisters(byteArray.asBuffer());
- rowKeyCountInMappers.add(hll.getCountEstimate());
- // merge the hll with total hll
- totalHll.merge(hll);
+ if (cuboidId > baseCuboidId) {
+ // if this is the summary info from a mapper, record the number before merge
+ rowKeyCountInMappers.add(hll.getCountEstimate());
+ }
+
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ hll.merge(cuboidHLLMap.get(cuboidId));
+ }
+ cuboidHLLMap.put(cuboidId, hll);
}
}
}
protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
+
+ for (Long cuboidId : cuboidHLLMap.keySet()) {
+ rowKeyCountInCuboids.put(cuboidId, cuboidHLLMap.get(cuboidId).getCountEstimate());
+ }
+
//output the hll info;
if (collectStatistics) {
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
- String outputPath = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
- FSDataOutputStream out = fs.create(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION));
+ FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION));
try {
long totalSum = 0;
@@ -141,13 +158,21 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
out.write('\n');
- msg = "The merged cube segment has " + totalHll.getCountEstimate() + " rows.";
+ long grantTotal = rowKeyCountInCuboids.get(baseCuboidId + 1);
+ msg = "The merged cube has " + grantTotal + " rows.";
out.write(msg.getBytes());
out.write('\n');
- msg = "The compaction rate is " + (totalHll.getCountEstimate()) + "/" + totalSum + " = " + (totalHll.getCountEstimate() * 100.0) / totalSum + "%.";
+ msg = "The compaction rate is " + (grantTotal) + "/" + totalSum + " = " + (grantTotal * 100.0) / totalSum + "%.";
out.write(msg.getBytes());
out.write('\n');
+ out.write('\n');
+
+ for (long i = 0; i < baseCuboidId; i++) {
+ msg = "Cuboid " + i + " has " + rowKeyCountInCuboids.get(i) + " rows.";
+ out.write(msg.getBytes());
+ out.write('\n');
+ }
} finally {
out.close();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d4a271df/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index 64ae353..9e9c096 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -18,11 +18,8 @@
package org.apache.kylin.job.hadoop.cube;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hive.hcatalog.data.HCatRecord;
@@ -30,12 +27,17 @@ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dict.lookup.HiveTableReader;
import org.apache.kylin.job.constant.BatchConstants;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
/**
* @author yangli9
@@ -48,7 +50,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
protected boolean collectStatistics = false;
protected CuboidScheduler cuboidScheduler = null;
protected List<String> rowKeyValues = null;
- protected HyperLogLogPlusCounter hll;
+ protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
+ protected HyperLogLogPlusCounter totalHll = null;
protected int nRowKey;
@Override
@@ -58,11 +61,11 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
schema = HCatInputFormat.getTableSchema(context.getConfiguration());
intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-
collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
if (collectStatistics) {
cuboidScheduler = new CuboidScheduler(cubeDesc);
- hll = new HyperLogLogPlusCounter(16);
+ cuboidHLLMap = Maps.newHashMap();
+ totalHll = new HyperLogLogPlusCounter(16);
rowKeyValues = Lists.newArrayList();
nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
}
@@ -74,7 +77,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
HCatFieldSchema fieldSchema;
for (int i : factDictCols) {
- outputKey.set((short) i);
+ outputKey.set((long) i);
fieldSchema = schema.get(flatTableIndexes[i]);
Object fieldValue = record.get(fieldSchema.getName(), schema);
if (fieldValue == null)
@@ -103,8 +106,13 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
mask = mask >> 1;
}
- String key = StringUtils.join(rowKeyValues, ",");
- hll.add(key);
+ HyperLogLogPlusCounter hll = cuboidHLLMap.get(cuboidId);
+ if (hll == null) {
+ hll = new HyperLogLogPlusCounter(16);
+ cuboidHLLMap.put(cuboidId, hll);
+ }
+
+ hll.add(StringUtils.join(rowKeyValues, ","));
Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
for (Long childId : children) {
@@ -116,11 +124,22 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (collectStatistics) {
- // output hll to reducer, key is -1
- // keyBuf = Bytes.toBytes(-1);
- outputKey.set((short) -1);
+
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ for (Long cuboidId : cuboidHLLMap.keySet()) {
+ HyperLogLogPlusCounter hll = cuboidHLLMap.get(cuboidId);
+ totalHll.merge(hll); // merge each cuboid's counter to the total hll
+ outputKey.set(0 - cuboidId);
+ ByteBuffer hllBuf = ByteBuffer.allocate(64 * 1024);
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array());
+ context.write(outputKey, outputValue);
+ }
+
+ //output the total hll for this mapper;
+ outputKey.set(0 - baseCuboidId - 1);
ByteBuffer hllBuf = ByteBuffer.allocate(64 * 1024);
- hll.writeRegisters(hllBuf);
+ totalHll.writeRegisters(hllBuf);
outputValue.set(hllBuf.array());
context.write(outputKey, outputValue);
}