You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:42 UTC
[30/50] [abbrv] incubator-kylin git commit: KYLIN-750 Merge cube
segments from HBase table
KYLIN-750 Merge cube segments from HBase table
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e008e2cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e008e2cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e008e2cc
Branch: refs/heads/streaming-localdict
Commit: e008e2ccdba67a0b4d0ef065d4d0a368ff09c1e7
Parents: 876ac60
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Tue May 12 10:26:04 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Tue May 12 10:26:43 2015 +0800
----------------------------------------------------------------------
.../kylin/common/persistence/ResourceStore.java | 2 +
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../apache/kylin/job/cube/CubingJobBuilder.java | 68 +++++--
.../cardinality/ColumnCardinalityMapper.java | 2 +-
.../job/hadoop/cube/FactDistinctColumnsJob.java | 45 ++++-
.../hadoop/cube/FactDistinctColumnsReducer.java | 67 ++++---
.../hadoop/cubev2/MergeCuboidFromHBaseJob.java | 22 ++-
.../cubev2/MergeCuboidFromHBaseMapper.java | 5 +-
.../job/hadoop/cubev2/MergeStatisticsStep.java | 181 +++++++++++++++++++
.../kylin/job/hadoop/hbase/CreateHTableJob.java | 54 ++++--
10 files changed, 368 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 96153fd..a99ca5d 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -53,6 +53,8 @@ abstract public class ResourceStore {
public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
public static final String STREAMING_RESOURCE_ROOT = "/streaming";
public static final String STREAMING_OUTPUT_RESOURCE_ROOT = "/streaming_output";
+ public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
+
private static ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 50e9c3a..2e5d97a 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -54,6 +54,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
+ public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index a01f462..6593ed2 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -33,6 +33,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.hadoop.cube.*;
import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
@@ -112,18 +113,21 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
mergingCuboidPaths.add(getPathToMerge(merging));
}
- if(this.inMemoryCubing()) {
+ AbstractExecutable convertCuboidToHfileStep;
+
+ if (this.inMemoryCubing()) {
// merge from HTable
- addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
+ convertCuboidToHfileStep = addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
+
+ // bulk load step
+ result.addTask(createBulkLoadStep(mergeSegment, result.getId()));
} else {
// merge cuboid
addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
+ // convert htable
+ convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
}
-
- // convert htable
- AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
-
// update cube info
result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
@@ -148,15 +152,20 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
mergingHTables.add(merging.getStorageLocationIdentifier());
}
- if(this.inMemoryCubing()) {
+
+ AbstractExecutable convertCuboidToHfileStep;
+ if (this.inMemoryCubing()) {
// merge from HTable
- addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
+ convertCuboidToHfileStep = addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
+
+ // bulk load step
+ result.addTask(createBulkLoadStep(seg, result.getId()));
} else {
// merge cuboid
addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
+ // convert htable
+ convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
}
- // convert htable
- AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
// update cube info
result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
@@ -173,12 +182,22 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
}
- void addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
+ AbstractExecutable addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
+ String mergedStatisticsFolder = getStatisticsPath(seg, result.getId());
+ result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
+
+ // create htable step
+ result.addTask(createCreateHTableStep(seg));
+
String formattedTables = StringUtils.join(mergingHTables, ",");
- result.addTask(createMergeCuboidDataFromHBaseStep(seg, formattedTables, mergedCuboidPath));
+ String hFilePath = getHFilePath(seg, result.getId());
+ MapReduceExecutable writeHFileStep = createMergeCuboidDataFromHBaseStep(seg, formattedTables, hFilePath);
+ result.addTask(writeHFileStep);
+
+ return writeHFileStep;
}
Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
@@ -209,7 +228,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
}
} else {
// create htable step
- result.addTask(createCreateHTableStep(seg, jobId));
+ result.addTask(createCreateHTableStep(seg));
baseCuboidStep = createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId());
result.addTask(baseCuboidStep);
// bulk load step
@@ -225,7 +244,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath));
// create htable step
- result.addTask(createCreateHTableStep(seg, jobId));
+ result.addTask(createCreateHTableStep(seg));
// generate hfiles step
final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId);
result.addTask(convertCuboidToHfileStep);
@@ -415,7 +434,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
return rowkeyDistributionStep;
}
- private HadoopShellExecutable createCreateHTableStep(CubeSegment seg, String jobId) {
+ private HadoopShellExecutable createCreateHTableStep(CubeSegment seg) {
HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
StringBuilder cmd = new StringBuilder();
@@ -424,7 +443,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg) + "/part-r-00000");
appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(inMemoryCubing()));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(seg, jobId));
+ appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingPercent()));
createHtableStep.setJobParams(cmd.toString());
createHtableStep.setJobClass(CreateHTableJob.class);
@@ -487,6 +506,18 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
return result;
}
+
+ private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+ MergeStatisticsStep result = new MergeStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setMergedStatisticsPath(mergedStatisticsFolder);
+ return result;
+ }
+
+
private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
@@ -505,7 +536,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
}
- private MapReduceExecutable createMergeCuboidDataFromHBaseStep(CubeSegment seg, String inputPath, String outputPath) {
+ private MapReduceExecutable createMergeCuboidDataFromHBaseStep(CubeSegment seg, String inputPath, String hFilePath) {
MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
StringBuilder cmd = new StringBuilder();
@@ -514,7 +545,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", outputPath);
+ appendExecCmdParameters(cmd, "output", hFilePath);
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index 948f25e..b32baf9 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -89,10 +89,10 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWr
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Iterator<Integer> it = hllcMap.keySet().iterator();
+ ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
while (it.hasNext()) {
int key = it.next();
HyperLogLogPlusCounter hllc = hllcMap.get(key);
- ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
buf.clear();
hllc.writeRegisters(buf);
buf.flip();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 9e4b363..bc10032 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -18,10 +18,12 @@
package org.apache.kylin.job.hadoop.cube;
-import java.io.IOException;
-
import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -31,14 +33,19 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
/**
* @author yangli9
*/
@@ -63,7 +70,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String intermediateTable = getOptionValue(OPTION_TABLE_NAME);;
+ String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
+ ;
String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
@@ -73,6 +81,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
// add metadata to distributed cache
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
+ CubeSegment newSegment = cubeInstance.getSegment(segmentName, SegmentStatusEnum.NEW);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
@@ -82,14 +91,21 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
log.info("Starting: " + job.getJobName());
setJobClasspath(job);
-
+
setupMapper(intermediateTable);
setupReducer(output);
// CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration());
- return waitForCompletion(job);
+ int result = waitForCompletion(job);
+
+ if(Boolean.parseBoolean(statistics_enabled)) {
+ putStatisticsToResourceStore(statistics_output, newSegment);
+ }
+
+ return result;
+
} catch (Exception e) {
logger.error("error in FactDistinctColumnsJob", e);
@@ -105,7 +121,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
HCatInputFormat.setInput(job, dbTableNames[0],
dbTableNames[1]);
-
+
job.setInputFormatClass(HCatInputFormat.class);
job.setMapperClass(FactDistinctHiveColumnsMapper.class);
job.setCombinerClass(FactDistinctColumnsCombiner.class);
@@ -127,6 +143,23 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
deletePath(job.getConfiguration(), output);
}
+ private void putStatisticsToResourceStore(String statisticsFolder, CubeSegment cubeSegment) throws IOException {
+ Path statisticsFilePath = new Path(statisticsFolder, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ FileSystem fs = HadoopUtil.getFileSystem("hdfs:///" + statisticsFilePath);
+ if (!fs.exists(statisticsFilePath))
+ throw new IOException("File " + statisticsFilePath + " does not exists;");
+
+ FSDataInputStream is = fs.open(statisticsFilePath);
+ try {
+ // put the statistics to metadata store
+ String statisticsFileName = ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment.getUuid() + ".seq";
+ ResourceStore rs = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+ rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+ } finally {
+ IOUtils.closeStream(is);
+ }
+ }
+
public static void main(String[] args) throws Exception {
FactDistinctColumnsJob job = new FactDistinctColumnsJob();
int exitCode = ToolRunner.run(job, args);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index c671da9..5d29e1b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -24,24 +24,23 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.*;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.mr.KylinReducer;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.TblColRef;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.*;
/**
@@ -53,7 +52,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
private boolean collectStatistics = false;
private String statisticsOutput = null;
private List<Long> baseCuboidRowCountInMappers;
- private Map<Long, Long> rowCountInCuboids;
+ // private Map<Long, Long> rowCountInCuboids;
protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
protected long baseCuboidId;
protected CubeDesc cubeDesc;
@@ -78,7 +77,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
if (collectStatistics) {
baseCuboidRowCountInMappers = Lists.newArrayList();
- rowCountInCuboids = Maps.newHashMap();
+// rowCountInCuboids = Maps.newHashMap();
cuboidHLLMap = Maps.newHashMap();
SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
}
@@ -113,23 +112,23 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
// for hll
long cuboidId = 0 - key.get();
- for (Text value : values) {
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
- ByteArray byteArray = new ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
+ for (Text value : values) {
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
- totalRowsBeforeMerge += hll.getCountEstimate();
+ totalRowsBeforeMerge += hll.getCountEstimate();
- if (cuboidId == baseCuboidId) {
- baseCuboidRowCountInMappers.add(hll.getCountEstimate());
- }
+ if (cuboidId == baseCuboidId) {
+ baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+ }
- if (cuboidHLLMap.get(cuboidId) != null) {
- cuboidHLLMap.get(cuboidId).merge(hll);
- } else {
- cuboidHLLMap.put(cuboidId, hll);
- }
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ cuboidHLLMap.get(cuboidId).merge(hll);
+ } else {
+ cuboidHLLMap.put(cuboidId, hll);
}
+ }
}
}
@@ -139,12 +138,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
//output the hll info;
if (collectStatistics) {
- for (Long cuboidId : cuboidHLLMap.keySet()) {
- rowCountInCuboids.put(cuboidId, cuboidHLLMap.get(cuboidId).getCountEstimate());
- }
-
writeMapperAndCuboidStatistics(context); // for human check
- writeCuboidStatistics(context); // for CreateHTableJob
+ writeCuboidStatistics(context.getConfiguration(), statisticsOutput, cuboidHLLMap); // for CreateHTableJob
}
}
@@ -157,7 +152,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
String msg;
List<Long> allCuboids = new ArrayList<Long>();
- allCuboids.addAll(rowCountInCuboids.keySet());
+ allCuboids.addAll(cuboidHLLMap.keySet());
Collections.sort(allCuboids);
msg = "Total cuboid number: \t" + allCuboids.size();
@@ -175,8 +170,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
long grantTotal = 0;
for (long i : allCuboids) {
- grantTotal += rowCountInCuboids.get(i);
- msg = "Cuboid " + i + " row count is: \t " + rowCountInCuboids.get(i);
+ grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+ msg = "Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate();
writeLine(out, msg);
}
@@ -200,19 +195,23 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
}
- private void writeCuboidStatistics(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- Path seqFilePath = new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ public static void writeCuboidStatistics(Configuration conf, String outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap) throws IOException {
+ Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class),
- SequenceFile.Writer.valueClass(LongWritable.class));
+ SequenceFile.Writer.valueClass(BytesWritable.class));
List<Long> allCuboids = new ArrayList<Long>();
- allCuboids.addAll(rowCountInCuboids.keySet());
+ allCuboids.addAll(cuboidHLLMap.keySet());
Collections.sort(allCuboids);
+
+ ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
try {
for (long i : allCuboids) {
- writer.append(new LongWritable(i), new LongWritable((long) (rowCountInCuboids.get(i) *100 / SAMPING_PERCENTAGE)));
+ valueBuf.clear();
+ cuboidHLLMap.get(i).writeRegisters(valueBuf);
+ valueBuf.flip();
+ writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()));
}
} finally {
writer.close();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
index b49af1a..4f50279 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
@@ -22,7 +22,11 @@ import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.kylin.common.util.Bytes;
import org.apache.hadoop.io.Text;
@@ -55,10 +59,12 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
options.addOption(OPTION_SEGMENT_NAME);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_HTABLE_NAME);
parseOptions(options, args);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+ String htableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
KylinConfig config = KylinConfig.getInstanceFromEnv();
CubeManager cubeMgr = CubeManager.getInstance(config);
CubeInstance cube = cubeMgr.getCube(cubeName);
@@ -87,15 +93,18 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
scans.add(scan);
}
- TableMapReduceUtil.initTableMapperJob(scans, MergeCuboidFromHBaseMapper.class, Text.class,
+ TableMapReduceUtil.initTableMapperJob(scans, MergeCuboidFromHBaseMapper.class, ImmutableBytesWritable.class,
Text.class, job);
+ /*
// Reducer - only one
job.setReducerClass(CuboidReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
+*/
+
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
@@ -103,7 +112,16 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
// add metadata to distributed cache
attachKylinPropsAndMetadata(cube, job.getConfiguration());
- setReduceTaskNum(job, config, cubeName, 0);
+
+ HTable htable = new HTable(conf, htableName);
+ HFileOutputFormat.configureIncrementalLoad(job,
+ htable);
+
+
+ // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class
+ job.setReducerClass(InMemCuboidReducer.class);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(KeyValue.class);
this.deletePath(job.getConfiguration(), output);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
index 7c673da..59fa03d 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
@@ -59,7 +59,7 @@ import java.util.List;
/**
* @author shaoshi
*/
-public class MergeCuboidFromHBaseMapper extends TableMapper<Text, Text> {
+public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
private KylinConfig config;
private String cubeName;
@@ -71,7 +71,8 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<Text, Text> {
private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
// life cycle
- private Text outputKey = new Text();
+// private Text outputKey = new Text();
+ private ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
private byte[] newKeyBuf;
private RowKeySplitter rowKeySplitter;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
new file mode 100644
index 0000000..5fe3f96
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MergeStatisticsStep extends AbstractExecutable {
+
+ private static final String CUBE_NAME = "cubeName";
+ private static final String SEGMENT_ID = "segmentId";
+ private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
+ private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
+ protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
+
+ public MergeStatisticsStep() {
+ super();
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig kylinConf = context.getConfig();
+ final CubeManager mgr = CubeManager.getInstance(kylinConf);
+ final CubeInstance cube = mgr.getCube(getCubeName());
+ final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+
+ Configuration conf = new Configuration();
+ ResourceStore rs = ResourceStore.getStore(kylinConf);
+ try {
+
+ for (String segmentId : this.getMergingSegmentIds()) {
+ String fileKey = ResourceStore.CUBE_STATISTICS_ROOT + "/" + getCubeName() + "/" + segmentId + ".seq";
+ InputStream is = rs.getResource(fileKey);
+ File tempFile = null;
+ FileOutputStream tempFileStream = null;
+ try {
+ tempFile = File.createTempFile(segmentId, ".seq");
+ tempFileStream = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+ } finally {
+ IOUtils.closeStream(is);
+ IOUtils.closeStream(tempFileStream);
+ }
+
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+ LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ if (cuboidHLLMap.get(key.get()) != null) {
+ cuboidHLLMap.get(key.get()).merge(hll);
+ } else {
+ cuboidHLLMap.put(key.get(), hll);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ FactDistinctColumnsReducer.writeCuboidStatistics(conf, getMergedStatisticsPath(), cuboidHLLMap);
+ Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ FileSystem fs = statisticsFilePath.getFileSystem(conf);
+ FSDataInputStream is = fs.open(statisticsFilePath);
+ try {
+ // put the statistics to metadata store
+ String statisticsFileName = ResourceStore.CUBE_STATISTICS_ROOT + "/" + getCubeName() + "/" + newSegment.getUuid() + ".seq";
+ rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+ } finally {
+ IOUtils.closeStream(is);
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to merge cuboid statistics", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+
+ public void setCubeName(String cubeName) {
+ this.setParam(CUBE_NAME, cubeName);
+ }
+
+ private String getCubeName() {
+ return getParam(CUBE_NAME);
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.setParam(SEGMENT_ID, segmentId);
+ }
+
+ private String getSegmentId() {
+ return getParam(SEGMENT_ID);
+ }
+
+ public void setMergingSegmentIds(List<String> ids) {
+ setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
+ }
+
+ private List<String> getMergingSegmentIds() {
+ final String ids = getParam(MERGING_SEGMENT_IS);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public void setMergedStatisticsPath(String path) {
+ setParam(MERGED_STATISTICS_PATH, path);
+ }
+
+ private String getMergedStatisticsPath() {
+ return getParam(MERGED_STATISTICS_PATH);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index cbe8c10..6e884d5 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -33,12 +33,16 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.security.User;
-import org.apache.kylin.common.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -46,7 +50,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.job.tools.DeployCoprocessorCLI;
import org.apache.kylin.job.tools.LZOSupportnessChecker;
@@ -55,7 +58,10 @@ import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -75,6 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
CubeDesc cubeDesc = null;
String segmentName = null;
KylinConfig kylinConfig;
+ private int SAMPING_PERCENTAGE = 5;
@Override
public int run(String[] args) throws Exception {
@@ -85,12 +92,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_STATISTICS_ENABLED);
- options.addOption(OPTION_STATISTICS_OUTPUT);
+ options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
parseOptions(options, args);
Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
boolean statistics_enabled = Boolean.parseBoolean(getOptionValue(OPTION_STATISTICS_ENABLED));
- Path statisticsFilePath = new Path(getOptionValue(OPTION_STATISTICS_OUTPUT), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+
+ String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+
+ SAMPING_PERCENTAGE = Integer.parseInt(statistics_sampling_percent);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
kylinConfig = KylinConfig.getInstanceFromEnv();
@@ -133,7 +143,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
byte[][] splitKeys;
if (statistics_enabled) {
- splitKeys = getSplitsFromCuboidStatistics(conf, statisticsFilePath);
+ splitKeys = getSplitsFromCuboidStatistics(conf);
} else {
splitKeys = getSplits(conf, partitionFilePath);
}
@@ -196,7 +206,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
@SuppressWarnings("deprecation")
- protected byte[][] getSplitsFromCuboidStatistics(Configuration conf, Path statisticsFilePath) throws IOException {
+ protected byte[][] getSplitsFromCuboidStatistics(Configuration conf) throws IOException {
List<Integer> rowkeyColumnSize = Lists.newArrayList();
CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
@@ -217,20 +227,33 @@ public class CreateHTableJob extends AbstractHadoopJob {
Map<Long, Long> cuboidSizeMap = Maps.newHashMap();
long totalSizeInM = 0;
- SequenceFile.Reader reader = null;
-
- FileSystem fs = statisticsFilePath.getFileSystem(conf);
- if (fs.exists(statisticsFilePath) == false) {
- System.err.println("Path " + statisticsFilePath + " not found, no region split, HTable will be one region");
- return null;
+ ResourceStore rs = ResourceStore.getStore(kylinConfig);
+ String fileKey = ResourceStore.CUBE_STATISTICS_ROOT + "/" + cube.getName() + "/" + cubeSegment.getUuid() + ".seq";
+ InputStream is = rs.getResource(fileKey);
+ File tempFile = null;
+ FileOutputStream tempFileStream = null;
+ try {
+ tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
+ tempFileStream = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+ } finally {
+ IOUtils.closeStream(is);
+ IOUtils.closeStream(tempFileStream);
}
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" +tempFile.getAbsolutePath());
+ SequenceFile.Reader reader = null;
try {
- reader = new SequenceFile.Reader(fs, statisticsFilePath, conf);
+ reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- LongWritable value = (LongWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
- cuboidSizeMap.put(key.get(), value.get());
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / SAMPING_PERCENTAGE);
+
}
} catch (Exception e) {
e.printStackTrace();
@@ -263,7 +286,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
List<Long> regionSplit = Lists.newArrayList();
-
long size = 0;
int regionIndex = 0;
int cuboidCount = 0;