You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:45 UTC
[33/50] [abbrv] incubator-kylin git commit: KYLIN-750 Merge cube
segments from HBase table, refine code
KYLIN-750 Merge cube segments from HBase table, refine code
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d1e4b9be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d1e4b9be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d1e4b9be
Branch: refs/heads/streaming-localdict
Commit: d1e4b9beb4efb298eb977df72e3a234e1e3adfab
Parents: 73c275e
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Tue May 12 14:07:45 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Tue May 12 14:10:35 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/Array.java | 4 ---
.../java/org/apache/kylin/cube/CubeSegment.java | 22 ++++++++----
.../apache/kylin/job/cube/CubingJobBuilder.java | 1 -
.../job/hadoop/cube/FactDistinctColumnsJob.java | 7 ++--
.../hadoop/cube/FactDistinctColumnsReducer.java | 10 +++---
.../cube/FactDistinctHiveColumnsMapper.java | 2 +-
.../job/hadoop/cube/StorageCleanupJob.java | 37 +++++++++++---------
.../hadoop/cubev2/MapContextGTRecordWriter.java | 4 +--
.../hadoop/cubev2/MergeCuboidFromHBaseJob.java | 10 +++---
.../job/hadoop/cubev2/MergeStatisticsStep.java | 29 +++++++++------
.../kylin/job/hadoop/hbase/CreateHTableJob.java | 23 ++++++------
11 files changed, 80 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/common/src/main/java/org/apache/kylin/common/util/Array.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/Array.java b/common/src/main/java/org/apache/kylin/common/util/Array.java
index 7f64a65..c48bb3d 100644
--- a/common/src/main/java/org/apache/kylin/common/util/Array.java
+++ b/common/src/main/java/org/apache/kylin/common/util/Array.java
@@ -18,10 +18,6 @@
package org.apache.kylin.common.util;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import org.apache.commons.collections.PredicateUtils;
-
import java.util.Arrays;
/*
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 382f5c2..fbc014b 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -18,23 +18,24 @@
package org.apache.kylin.cube;
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
+import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
@@ -335,5 +336,12 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
public void setSnapshots(ConcurrentHashMap<String, String> snapshots) {
this.snapshots = snapshots;
}
-
+
+ public String getStatisticsResourcePath() {
+ return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid());
+ }
+
+ public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
+ return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 6593ed2..e534441 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -443,7 +443,6 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg) + "/part-r-00000");
appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(inMemoryCubing()));
- appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingPercent()));
createHtableStep.setJobParams(cmd.toString());
createHtableStep.setJobClass(CreateHTableJob.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index bc10032..f8863a5 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -19,7 +19,6 @@
package org.apache.kylin.job.hadoop.cube;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -100,7 +99,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
int result = waitForCompletion(job);
- if(Boolean.parseBoolean(statistics_enabled)) {
+ if(result == 0 && Boolean.parseBoolean(statistics_enabled)) {
putStatisticsToResourceStore(statistics_output, newSegment);
}
@@ -145,14 +144,14 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
private void putStatisticsToResourceStore(String statisticsFolder, CubeSegment cubeSegment) throws IOException {
Path statisticsFilePath = new Path(statisticsFolder, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
- FileSystem fs = HadoopUtil.getFileSystem("hdfs:///" + statisticsFilePath);
+ FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
if (!fs.exists(statisticsFilePath))
throw new IOException("File " + statisticsFilePath + " does not exists;");
FSDataInputStream is = fs.open(statisticsFilePath);
try {
// put the statistics to metadata store
- String statisticsFileName = ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment.getUuid() + ".seq";
+ String statisticsFileName = cubeSegment.getStatisticsResourcePath();
ResourceStore rs = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
rs.putResource(statisticsFileName, is, System.currentTimeMillis());
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 5d29e1b..62bbd74 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -52,7 +52,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
private boolean collectStatistics = false;
private String statisticsOutput = null;
private List<Long> baseCuboidRowCountInMappers;
- // private Map<Long, Long> rowCountInCuboids;
protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
protected long baseCuboidId;
protected CubeDesc cubeDesc;
@@ -77,7 +76,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
if (collectStatistics) {
baseCuboidRowCountInMappers = Lists.newArrayList();
-// rowCountInCuboids = Maps.newHashMap();
cuboidHLLMap = Maps.newHashMap();
SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
}
@@ -113,7 +111,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
long cuboidId = 0 - key.get();
for (Text value : values) {
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
ByteArray byteArray = new ByteArray(value.getBytes());
hll.readRegisters(byteArray.asBuffer());
@@ -139,7 +137,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
//output the hll info;
if (collectStatistics) {
writeMapperAndCuboidStatistics(context); // for human check
- writeCuboidStatistics(context.getConfiguration(), statisticsOutput, cuboidHLLMap); // for CreateHTableJob
+ writeCuboidStatistics(context.getConfiguration(), statisticsOutput, cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob
}
}
@@ -195,7 +193,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
}
- public static void writeCuboidStatistics(Configuration conf, String outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap) throws IOException {
+ public static void writeCuboidStatistics(Configuration conf, String outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class),
@@ -205,6 +203,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
allCuboids.addAll(cuboidHLLMap.keySet());
Collections.sort(allCuboids);
+ // persist the sample percentage with key 0
+ writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage)));
ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
try {
for (long i : allCuboids) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index 0b75084..20df4c4 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -80,7 +80,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length];
for (int i = 0; i < cuboidIds.length; i++) {
- allCuboidsHLL[i] = new HyperLogLogPlusCounter(16);
+ allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
}
hf = Hashing.md5();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index 09fba01..2f80f0e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -18,18 +18,6 @@
package org.apache.kylin.job.hadoop.cube;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -43,6 +31,18 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,8 +118,10 @@ public class StorageCleanupJob extends AbstractHadoopJob {
for (CubeInstance cube : cubeMgr.listAllCubes()) {
for (CubeSegment seg : cube.getSegments()) {
String tablename = seg.getStorageLocationIdentifier();
- allTablesNeedToBeDropped.remove(tablename);
- log.info("Remove table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
+ if (allTablesNeedToBeDropped.contains(tablename)) {
+ allTablesNeedToBeDropped.remove(tablename);
+ log.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
+ }
}
}
@@ -127,8 +129,11 @@ public class StorageCleanupJob extends AbstractHadoopJob {
for (IIInstance ii : iiManager.listAllIIs()) {
for (IISegment seg : ii.getSegments()) {
String tablename = seg.getStorageLocationIdentifier();
- allTablesNeedToBeDropped.remove(tablename);
- log.info("Remove table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus());
+
+ if (allTablesNeedToBeDropped.contains(tablename)) {
+ allTablesNeedToBeDropped.remove(tablename);
+ log.info("Exclude table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index 3ba80d1..df3f345 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -23,7 +23,7 @@ import java.util.BitSet;
public class MapContextGTRecordWriter implements IGTRecordWriter {
private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
- protected MapContext mapContext;
+ protected MapContext<?, ?, ImmutableBytesWritable, Text> mapContext;
private Long lastCuboidId;
protected CubeSegment cubeSegment;
protected CubeDesc cubeDesc;
@@ -37,7 +37,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
private Text outputValue = new Text();
long cuboidRowCount = 0;
- public MapContextGTRecordWriter(MapContext mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ public MapContextGTRecordWriter(MapContext<?, ?, ImmutableBytesWritable, Text> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
this.mapContext = mapContext;
this.cubeDesc = cubeDesc;
this.cubeSegment = cubeSegment;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
index 4f50279..b2172bc 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
@@ -18,6 +18,9 @@
package org.apache.kylin.job.hadoop.cubev2;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -28,21 +31,16 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.kylin.common.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.cube.CuboidJob;
-import org.apache.kylin.job.hadoop.cube.CuboidReducer;
-
-import java.util.ArrayList;
-import java.util.List;
/**
* @author shaoshi
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
index 5fe3f96..64ea14c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -78,8 +79,9 @@ public class MergeStatisticsStep extends AbstractExecutable {
ResourceStore rs = ResourceStore.getStore(kylinConf);
try {
+ int averageSamplingPercentage = 0;
for (String segmentId : this.getMergingSegmentIds()) {
- String fileKey = ResourceStore.CUBE_STATISTICS_ROOT + "/" + getCubeName() + "/" + segmentId + ".seq";
+ String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
InputStream is = rs.getResource(fileKey);
File tempFile = null;
FileOutputStream tempFileStream = null;
@@ -99,14 +101,19 @@ public class MergeStatisticsStep extends AbstractExecutable {
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
- ByteArray byteArray = new ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
-
- if (cuboidHLLMap.get(key.get()) != null) {
- cuboidHLLMap.get(key.get()).merge(hll);
+ if (key.get() == 0l) {
+ // sampling percentage;
+ averageSamplingPercentage += Bytes.toInt(value.getBytes());
} else {
- cuboidHLLMap.put(key.get(), hll);
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ if (cuboidHLLMap.get(key.get()) != null) {
+ cuboidHLLMap.get(key.get()).merge(hll);
+ } else {
+ cuboidHLLMap.put(key.get(), hll);
+ }
}
}
} catch (Exception e) {
@@ -116,14 +123,14 @@ public class MergeStatisticsStep extends AbstractExecutable {
IOUtils.closeStream(reader);
}
}
-
- FactDistinctColumnsReducer.writeCuboidStatistics(conf, getMergedStatisticsPath(), cuboidHLLMap);
+ averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
+ FactDistinctColumnsReducer.writeCuboidStatistics(conf, getMergedStatisticsPath(), cuboidHLLMap, averageSamplingPercentage);
Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
FileSystem fs = statisticsFilePath.getFileSystem(conf);
FSDataInputStream is = fs.open(statisticsFilePath);
try {
// put the statistics to metadata store
- String statisticsFileName = ResourceStore.CUBE_STATISTICS_ROOT + "/" + getCubeName() + "/" + newSegment.getUuid() + ".seq";
+ String statisticsFileName = newSegment.getStatisticsResourcePath();
rs.putResource(statisticsFileName, is, System.currentTimeMillis());
} finally {
IOUtils.closeStream(is);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index 6e884d5..8f802b7 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -81,7 +81,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
CubeDesc cubeDesc = null;
String segmentName = null;
KylinConfig kylinConfig;
- private int SAMPING_PERCENTAGE = 5;
@Override
public int run(String[] args) throws Exception {
@@ -92,16 +91,11 @@ public class CreateHTableJob extends AbstractHadoopJob {
options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_STATISTICS_ENABLED);
- options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
parseOptions(options, args);
Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
boolean statistics_enabled = Boolean.parseBoolean(getOptionValue(OPTION_STATISTICS_ENABLED));
- String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
-
- SAMPING_PERCENTAGE = Integer.parseInt(statistics_sampling_percent);
-
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
kylinConfig = KylinConfig.getInstanceFromEnv();
CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
@@ -228,7 +222,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
long totalSizeInM = 0;
ResourceStore rs = ResourceStore.getStore(kylinConfig);
- String fileKey = ResourceStore.CUBE_STATISTICS_ROOT + "/" + cube.getName() + "/" + cubeSegment.getUuid() + ".seq";
+ String fileKey = cubeSegment.getStatisticsResourcePath();
InputStream is = rs.getResource(fileKey);
File tempFile = null;
FileOutputStream tempFileStream = null;
@@ -241,18 +235,23 @@ public class CreateHTableJob extends AbstractHadoopJob {
IOUtils.closeStream(tempFileStream);
}
- FileSystem fs = HadoopUtil.getFileSystem("file:///" +tempFile.getAbsolutePath());
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ int samplingPercentage = 25;
while (reader.next(key, value)) {
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
- ByteArray byteArray = new ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
+ if (key.get() == 0l) {
+ samplingPercentage = Bytes.toInt(value.getBytes());
+ } else {
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
- cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / SAMPING_PERCENTAGE);
+ cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / samplingPercentage);
+ }
}
} catch (Exception e) {