You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/25 11:12:38 UTC
[1/2] kylin git commit: KYLIN-1323 give “kylin.hbase.hfile.size.gb” a default value
Repository: kylin
Updated Branches:
refs/heads/1.x-staging 63c59fe9d -> aa8bdde4c
KYLIN-1323 give “kylin.hbase.hfile.size.gb” a default value
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa8bdde4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa8bdde4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa8bdde4
Branch: refs/heads/1.x-staging
Commit: aa8bdde4c540ab321ba6494b2a5f07d8fc186b31
Parents: 07ad877
Author: shaofengshi <sh...@apache.org>
Authored: Thu Feb 25 17:58:10 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Feb 25 18:01:14 2016 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/common/KylinConfig.java | 2 +-
conf/kylin.properties | 4 ++--
.../kylin/job/hadoop/cube/RangeKeyDistributionReducer.java | 9 ++++-----
3 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa8bdde4/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 9c813cd..c58d419 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -730,7 +730,7 @@ public class KylinConfig {
}
public int getHBaseHFileSizeGB() {
- return Integer.parseInt(getOptional(HBASE_HFILE_SIZE_GB, "0"));
+ return Integer.parseInt(getOptional(HBASE_HFILE_SIZE_GB, "5"));
}
public int getHBaseRegionCountMin() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa8bdde4/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index bbcf948..5da3d96 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -84,8 +84,8 @@ kylin.hbase.region.count.min=1
kylin.hbase.region.count.max=500
# The hfile size of GB, smaller hfile leading to the converting hfile MR has more reducers and be faster
-# set to 0 or comment this config to disable this optimization
-kylin.hbase.hfile.size.gb=0
+# set to 0 or comment this config to disable this optimization;
+kylin.hbase.hfile.size.gb=5
# Enable/disable ACL check for cube query
kylin.query.security.enabled=true
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa8bdde4/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
index f9c46bb..e51b4b3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -18,10 +18,6 @@
package org.apache.kylin.job.hadoop.cube;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -30,11 +26,14 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.cube.model.v1.CubeDesc.CubeCapacity;
import org.apache.kylin.job.constant.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* @author ysong1
*
[2/2] kylin git commit: KYLIN-1323 Improve performance of converting
data to hfile
Posted by sh...@apache.org.
KYLIN-1323 Improve performance of converting data to hfile
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07ad8777
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07ad8777
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07ad8777
Branch: refs/heads/1.x-staging
Commit: 07ad8777eaa2788de48290a020590bab80f4d42c
Parents: 63c59fe
Author: sunyerui <su...@gmail.com>
Authored: Mon Feb 22 16:05:17 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Feb 25 18:01:14 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 10 ++++
.../common/util/HBaseRegionSizeCalculator.java | 8 ++++
conf/kylin.properties | 4 ++
.../kylin/job/constant/BatchConstants.java | 1 +
.../apache/kylin/job/cube/CubingJobBuilder.java | 3 +-
.../kylin/job/hadoop/AbstractHadoopJob.java | 2 +-
.../kylin/job/hadoop/cube/CubeHFileJob.java | 37 ++++++++++++++-
.../hadoop/cube/RangeKeyDistributionJob.java | 13 ++++++
.../hadoop/cube/RangeKeyDistributionMapper.java | 9 +++-
.../cube/RangeKeyDistributionReducer.java | 49 +++++++++++++++++---
.../kylin/job/BuildCubeWithEngineTest.java | 42 +++++++++++++++++
11 files changed, 166 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 7203065..9c813cd 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -131,6 +131,7 @@ public class KylinConfig {
public static final String HTABLE_DEFAULT_COMPRESSION_CODEC = "kylin.hbase.default.compression.codec";
+ public static final String HBASE_HFILE_SIZE_GB = "kylin.hbase.hfile.size.gb";
public static final String HBASE_REGION_CUT_SMALL = "kylin.hbase.region.cut.small";
public static final String HBASE_REGION_CUT_MEDIUM = "kylin.hbase.region.cut.medium";
public static final String HBASE_REGION_CUT_LARGE = "kylin.hbase.region.cut.large";
@@ -723,6 +724,15 @@ public class KylinConfig {
kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_PASSWORD, v);
}
+ // for test
+ public void setHBaseHFileSizeGB(int size) {
+ kylinConfig.setProperty(HBASE_HFILE_SIZE_GB, String.valueOf(size));
+ }
+
+ public int getHBaseHFileSizeGB() {
+ return Integer.parseInt(getOptional(HBASE_HFILE_SIZE_GB, "0"));
+ }
+
public int getHBaseRegionCountMin() {
return Integer.parseInt(getOptional(HBASE_REGION_COUNT_MIN, "1"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
index 093ac9e..fccd042 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
@@ -48,6 +48,9 @@ public class HBaseRegionSizeCalculator {
**/
private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ private final Map<byte[], Pair<Integer, Integer>> countMap =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
/**
@@ -92,6 +95,7 @@ public class HBaseRegionSizeCalculator {
long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
sizeMap.put(regionId, regionSizeBytes);
+ countMap.put(regionId, new Pair<>(regionLoad.getStores(), regionLoad.getStorefiles()));
// logger.info("Region " + regionLoad.getNameAsString()
// + " has size " + regionSizeBytes);
@@ -124,4 +128,8 @@ public class HBaseRegionSizeCalculator {
public Map<byte[], Long> getRegionSizeMap() {
return Collections.unmodifiableMap(sizeMap);
}
+
+ public Map<byte[], Pair<Integer, Integer>> getRegionHFileCountMap() {
+ return Collections.unmodifiableMap(countMap);
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index e0727ed..bbcf948 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -83,6 +83,10 @@ kylin.hbase.region.cut.large=100
kylin.hbase.region.count.min=1
kylin.hbase.region.count.max=500
+# The hfile size of GB, smaller hfile leading to the converting hfile MR has more reducers and be faster
+# set to 0 or comment this config to disable this optimization
+kylin.hbase.hfile.size.gb=0
+
# Enable/disable ACL check for cube query
kylin.query.security.enabled=true
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
index 38f4a87..a3c4c32 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
@@ -47,6 +47,7 @@ public interface BatchConstants {
String REGION_NUMBER_MIN = "region.number.min";
String REGION_NUMBER_MAX = "region.number.max";
String REGION_SPLIT_SIZE = "region.split.size";
+ String HFILE_SIZE_GB = "hfile.size.gb";
String CUBE_CAPACITY = "cube.capacity";
String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 80c030f..14bb649 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -372,7 +372,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000");
+ appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000");
appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
createHtableStep.setJobParams(cmd.toString());
@@ -388,6 +388,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
appendMapReduceParameters(cmd, seg);
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000_hfile");
appendExecCmdParameters(cmd, "input", inputPath);
appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 698a978..a7d107d 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -83,7 +83,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
- protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
+ protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions");
protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
protected String name;
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
index 3c1e4a5..d5b83ef 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
@@ -18,17 +18,24 @@
package org.apache.kylin.job.hadoop.cube;
+import java.io.IOException;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
@@ -51,11 +58,14 @@ public class CubeHFileJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
parseOptions(options, args);
+ Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
+
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
@@ -82,8 +92,9 @@ public class CubeHFileJob extends AbstractHadoopJob {
String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
HTable htable = new HTable(conf, tableName);
- //Automatic config !
+ // Automatic config !
HFileOutputFormat.configureIncrementalLoad(job, htable);
+ reconfigurePartitions(conf, partitionFilePath);
// set block replication to 3 for hfiles
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
@@ -101,6 +112,30 @@ public class CubeHFileJob extends AbstractHadoopJob {
}
}
+ /**
+ * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers
+ * @param conf the job configuration
+ * @param path the hfile partition file
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ private void reconfigurePartitions(Configuration conf, Path path) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ if (fs.exists(path)) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) {
+ int partitionCount = 0;
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ partitionCount++;
+ }
+ TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path);
+ // The reduce tasks should be one more than partition keys
+ job.setNumReduceTasks(partitionCount+1);
+ }
+ }
+ }
+
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new CubeHFileJob(), args);
System.exit(exitCode);
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
index 9c50122..a7832fa 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
@@ -20,7 +20,10 @@ package org.apache.kylin.job.hadoop.cube;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -92,13 +95,23 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob {
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
+ int hfileSizeGB = KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB();
DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
+ job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+ job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB));
job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
+ // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class);
+
+ // Passed the sandbox property to mapper, to simulate large dataset
+ if (System.getProperty("useSandbox") != null && System.getProperty("useSandbox").equals("true")) {
+ job.getConfiguration().setBoolean("useSandbox", true);
+ }
return waitForCompletion(job);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
index 33baf45..41856b4 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
@@ -38,9 +38,14 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo
private Text lastKey;
+ private Long scaleFactorForSandbox = 1L;
+
@Override
protected void setup(Context context) throws IOException {
super.publishConfiguration(context.getConfiguration());
+ if (context.getConfiguration().getBoolean("useSandbox", false)) {
+ scaleFactorForSandbox = 1024L;
+ }
}
@Override
@@ -50,8 +55,8 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo
int bytesLength = key.getLength() + value.getLength();
bytesRead += bytesLength;
- if (bytesRead >= ONE_MEGA_BYTES) {
- outputValue.set(bytesRead);
+ if ((bytesRead * scaleFactorForSandbox) >= ONE_MEGA_BYTES) {
+ outputValue.set(bytesRead * scaleFactorForSandbox);
context.write(key, outputValue);
// reset bytesRead
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
index b3ab4db..f9c46bb 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -22,7 +22,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
import org.apache.kylin.common.mr.KylinReducer;
@@ -46,13 +50,23 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
private int minRegionCount = 1;
private int maxRegionCount = 500;
private int cut = 10;
+ private int hfileSizeGB = 1;
private long bytesRead = 0;
private List<Text> gbPoints = new ArrayList<Text>();
+ private String output = null;
@Override
protected void setup(Context context) throws IOException {
super.publishConfiguration(context.getConfiguration());
+ if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) {
+ output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH);
+ }
+
+ if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) {
+ hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB));
+ }
+
if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
}
@@ -65,7 +79,11 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
}
- logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount);
+ logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount
+ + ", min region count=" + minRegionCount + ", hfile size=" + hfileSizeGB);
+
+ // add empty key at position 0
+ gbPoints.add(new Text());
}
@Override
@@ -89,14 +107,31 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
int gbPerRegion = gbPoints.size() / nRegion;
gbPerRegion = Math.max(1, gbPerRegion);
+ if (hfileSizeGB <= 0) {
+ hfileSizeGB = gbPerRegion;
+ }
+ int hfilePerRegion = gbPerRegion / hfileSizeGB;
+ hfilePerRegion = Math.max(1, hfilePerRegion);
System.out.println(nRegion + " regions");
System.out.println(gbPerRegion + " GB per region");
-
- for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
- Text key = gbPoints.get(i);
- outputValue.set(i);
- System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
- context.write(key, outputValue);
+ System.out.println(hfilePerRegion + " hfile per region");
+
+ Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
+ try (SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(
+ hfilePartitionFile.getFileSystem(context.getConfiguration()),
+ context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class)) {
+ int hfileCountInOneRegion = 0;
+ for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
+ hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
+ if (++hfileCountInOneRegion >= hfilePerRegion) {
+ Text key = gbPoints.get(i);
+ outputValue.set(i);
+ System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+ context.write(key, outputValue);
+
+ hfileCountInOneRegion = 0;
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index f02aa7a..78e6ab3 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -21,9 +21,11 @@ package org.apache.kylin.job;
import static org.junit.Assert.assertEquals;
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.List;
+import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -33,11 +35,16 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HBaseRegionSizeCalculator;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -129,9 +136,11 @@ public class BuildCubeWithEngineTest {
}
private void testInner() throws Exception {
+ KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(1);
DeployUtil.prepareTestData("inner", "test_kylin_cube_with_slr_empty");
String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", };
runTestAndAssertSucceed(testCase);
+ KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(0);
}
private void testLeft() throws Exception {
@@ -280,6 +289,7 @@ public class BuildCubeWithEngineTest {
CubingJob job = cubingJobBuilder.buildJob(segment);
jobService.addJob(job);
waitForJob(job.getId());
+ checkHFilesInHBase(segment);
return job.getId();
}
@@ -295,4 +305,36 @@ public class BuildCubeWithEngineTest {
return jobId;
}
+
+ private void checkHFilesInHBase(CubeSegment segment) throws IOException {
+ Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
+ String tableName = segment.getStorageLocationIdentifier();
+ HTable table = new HTable(conf, tableName);
+ HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
+ Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+ long totalSize = 0;
+ for (Long size : sizeMap.values()) {
+ totalSize += size;
+ }
+ if (totalSize == 0) {
+ return;
+ }
+ Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap();
+ // check if there's region contains more than one hfile, which means the hfile config take effects
+ boolean hasMultiHFileRegions = false;
+ for (Pair<Integer, Integer> count : countMap.values()) {
+ // check if hfile count is greater than store count
+ if (count.getSecond() > count.getFirst()) {
+ hasMultiHFileRegions = true;
+ break;
+ }
+ }
+ if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) {
+ throw new IOException("hfile size set to 0, but found region contains more than one hfiles");
+ } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) {
+ throw new IOException("hfile size set greater than 0, but all regions still has only one hfile");
+ }
+ }
+
}
+