You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/04 03:02:43 UTC
[02/43] kylin git commit: KYLIN-1323 Improve performance of
converting data to hfile
KYLIN-1323 Improve performance of converting data to hfile
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab4d8909
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab4d8909
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab4d8909
Branch: refs/heads/helix-rebase
Commit: ab4d8909ac85af87d8b8b443044c49a79f9e3ee4
Parents: 66294d3
Author: sunyerui <su...@gmail.com>
Authored: Sun Feb 28 21:02:25 2016 +0800
Committer: sunyerui <su...@gmail.com>
Committed: Sun Feb 28 21:12:28 2016 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 4 +
.../apache/kylin/common/KylinConfigBase.java | 8 ++
.../engine/mr/common/AbstractHadoopJob.java | 2 +-
.../kylin/engine/mr/common/BatchConstants.java | 1 +
.../mr/steps/RangeKeyDistributionJob.java | 115 ----------------
.../mr/steps/RangeKeyDistributionMapper.java | 71 ----------
.../mr/steps/RangeKeyDistributionReducer.java | 100 --------------
kylin-it/pom.xml | 3 +
.../kylin/provision/BuildCubeWithEngine.java | 41 ++++++
.../kylin/storage/hbase/steps/CubeHFileJob.java | 37 ++++-
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 4 +-
.../hbase/steps/RangeKeyDistributionJob.java | 127 +++++++++++++++++
.../hbase/steps/RangeKeyDistributionMapper.java | 76 +++++++++++
.../steps/RangeKeyDistributionReducer.java | 136 +++++++++++++++++++
.../hbase/util/HBaseRegionSizeCalculator.java | 8 ++
.../steps/RangeKeyDistributionJobTest.java | 1 -
.../steps/RangeKeyDistributionMapperTest.java | 1 -
.../steps/RangeKeyDistributionReducerTest.java | 1 -
18 files changed, 443 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 44a282e..b220b2d 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -65,6 +65,10 @@ kylin.hbase.region.cut.small=5
kylin.hbase.region.cut.medium=10
kylin.hbase.region.cut.large=50
+# The hfile size of GB, smaller hfile leading to the converting hfile MR has more reducers and be faster
+# set to 0 or comment this config to disable this optimization
+kylin.hbase.hfile.size.gb=5
+
# Enable/disable ACL check for cube query
kylin.query.security.enabled=true
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7707684..3430e0b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -407,6 +407,14 @@ public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.hbase.region.count.max", "500"));
}
+ public void setHBaseHFileSizeGB(int size) {
+ setProperty("kylin.hbase.hfile.size.gb", String.valueOf(size));
+ }
+
+ public int getHBaseHFileSizeGB() {
+ return Integer.parseInt(getOptional("kylin.hbase.hfile.size.gb", "0"));
+ }
+
public int getScanThreshold() {
return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 7615269..e4eee96 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -83,7 +83,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
- protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
+ protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions");
protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 400a3aa..6943f18 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -42,6 +42,7 @@ public interface BatchConstants {
String REGION_NUMBER_MIN = "region.number.min";
String REGION_NUMBER_MAX = "region.number.max";
String REGION_SPLIT_SIZE = "region.split.size";
+ String HFILE_SIZE_GB = "hfile.size.gb";
String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/";
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
deleted file mode 100644
index 5632fc1..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang, ysong1
- *
- */
-
-public class RangeKeyDistributionJob extends AbstractHadoopJob {
- protected static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
- */
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
-
- parseOptions(options, args);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- setJobClasspath(job);
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
- // job.getConfiguration().set("dfs.block.size", "67108864");
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RangeKeyDistributionMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RangeKeyDistributionReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- this.deletePath(job.getConfiguration(), output);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = cubeMgr.getCube(cubeName);
- DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
- int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
- int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
- int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
-
- job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
- job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
- job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java
deleted file mode 100644
index 47cbc95..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.engine.mr.KylinMapper;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
- private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
-
- private LongWritable outputValue = new LongWritable(0);
-
- private long bytesRead = 0;
-
- private Text lastKey;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- lastKey = key;
-
- int bytesLength = key.getLength() + value.getLength();
- bytesRead += bytesLength;
-
- if (bytesRead >= ONE_MEGA_BYTES) {
- outputValue.set(bytesRead);
- context.write(key, outputValue);
-
- // reset bytesRead
- bytesRead = 0;
- }
-
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- if (lastKey != null) {
- outputValue.set(bytesRead);
- context.write(lastKey, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
deleted file mode 100644
index 68be74e..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
- public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
- private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
-
- private LongWritable outputValue = new LongWritable(0);
-
- private int minRegionCount = 1;
- private int maxRegionCount = 500;
- private int cut = 10;
- private long bytesRead = 0;
- private List<Text> gbPoints = new ArrayList<Text>();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
- cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
- }
-
- if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
- minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
- }
-
- if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
- maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
- }
-
- logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount);
- }
-
- @Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
- for (LongWritable v : values) {
- bytesRead += v.get();
- }
-
- if (bytesRead >= ONE_GIGA_BYTES) {
- gbPoints.add(new Text(key));
- bytesRead = 0; // reset bytesRead
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- int nRegion = Math.round((float) gbPoints.size() / (float) cut);
- nRegion = Math.max(minRegionCount, nRegion);
- nRegion = Math.min(maxRegionCount, nRegion);
-
- int gbPerRegion = gbPoints.size() / nRegion;
- gbPerRegion = Math.max(1, gbPerRegion);
-
- System.out.println(nRegion + " regions");
- System.out.println(gbPerRegion + " GB per region");
-
- for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
- Text key = gbPoints.get(i);
- outputValue.set(i);
- System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
- context.write(key, outputValue);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 6cb44a5..99b650c 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -301,6 +301,7 @@
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
+ <argument>-DuseSandbox=true</argument>
<argument>-Dhdp.version=${hdp.version}</argument>
<argument>-DfastBuildMode=${fastBuildMode}</argument>
<argument>-classpath</argument>
@@ -321,6 +322,7 @@
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
+ <argument>-DuseSandbox=true</argument>
<argument>-Dhdp.version=${hdp.version}</argument>
<argument>-DfastBuildMode=${fastBuildMode}</argument>
<argument>-classpath</argument>
@@ -341,6 +343,7 @@
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
+ <argument>-DuseSandbox=true</argument>
<argument>-Dhdp.version=${hdp.version}</argument>
<argument>-DfastBuildMode=${fastBuildMode}</argument>
<argument>-classpath</argument>
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 28808df..cfefef3 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.List;
+import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -36,11 +37,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -55,6 +59,8 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
@@ -143,8 +149,10 @@ public class BuildCubeWithEngine {
public void build() throws Exception {
DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
+ KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(1);
testInner();
testLeft();
+ KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(0);
}
protected void waitForJob(String jobId) {
@@ -345,6 +353,9 @@ public class BuildCubeWithEngine {
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
+ if (segment.getCubeDesc().getEngineType() == IEngineAware.ID_MR_V1) {
+ checkHFilesInHBase(segment);
+ }
return job.getId();
}
@@ -355,4 +366,34 @@ public class BuildCubeWithEngine {
return exitCode;
}
+ private void checkHFilesInHBase(CubeSegment segment) throws IOException {
+ Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
+ String tableName = segment.getStorageLocationIdentifier();
+ HTable table = new HTable(conf, tableName);
+ HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
+ Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+ long totalSize = 0;
+ for (Long size : sizeMap.values()) {
+ totalSize += size;
+ }
+ if (totalSize == 0) {
+ return;
+ }
+ Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap();
+ // check if there's region contains more than one hfile, which means the hfile config take effects
+ boolean hasMultiHFileRegions = false;
+ for (Pair<Integer, Integer> count : countMap.values()) {
+ // check if hfile count is greater than store count
+ if (count.getSecond() > count.getFirst()) {
+ hasMultiHFileRegions = true;
+ break;
+ }
+ }
+ if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) {
+ throw new IOException("hfile size set to 0, but found region contains more than one hfiles");
+ } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) {
+ throw new IOException("hfile size set greater than 0, but all regions still has only one hfile");
+ }
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 1f0b1a0..a302daf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -18,17 +18,24 @@
package org.apache.kylin.storage.hbase.steps;
+import java.io.IOException;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
@@ -51,11 +58,14 @@ public class CubeHFileJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
parseOptions(options, args);
+ Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
+
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
@@ -82,8 +92,9 @@ public class CubeHFileJob extends AbstractHadoopJob {
String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
HTable htable = new HTable(conf, tableName);
- //Automatic config !
+ // Automatic config !
HFileOutputFormat.configureIncrementalLoad(job, htable);
+ reconfigurePartitions(conf, partitionFilePath);
// set block replication to 3 for hfiles
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
@@ -101,6 +112,30 @@ public class CubeHFileJob extends AbstractHadoopJob {
}
}
+ /**
+ * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers
+ * @param conf the job configuration
+ * @param path the hfile partition file
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ private void reconfigurePartitions(Configuration conf, Path path) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ if (fs.exists(path)) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) {
+ int partitionCount = 0;
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ partitionCount++;
+ }
+ TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path);
+ // The reduce tasks should be one more than partition keys
+ job.setNumReduceTasks(partitionCount+1);
+ }
+ }
+ }
+
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new CubeHFileJob(), args);
System.exit(exitCode);
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index c3bd7b5..2a21640 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -12,7 +12,6 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob;
import org.apache.kylin.storage.hbase.ii.IICreateHFileJob;
import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
-import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.realization.IRealizationSegment;
@@ -72,7 +71,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+ appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
createHtableStep.setJobParams(cmd.toString());
@@ -90,6 +89,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
appendExecCmdParameters(cmd, "input", inputPath);
appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
new file mode 100644
index 0000000..2ff7356
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xjiang, ysong1
+ *
+ */
+
+public class RangeKeyDistributionJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+
+ parseOptions(options, args);
+
+ // start job
+ String jobName = getOptionValue(OPTION_JOB_NAME);
+ job = Job.getInstance(getConf(), jobName);
+
+ setJobClasspath(job);
+
+ addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ FileOutputFormat.setOutputPath(job, output);
+ // job.getConfiguration().set("dfs.block.size", "67108864");
+
+ // Mapper
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(RangeKeyDistributionMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+
+ // Reducer - only one
+ job.setReducerClass(RangeKeyDistributionReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+ job.setNumReduceTasks(1);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ int hfileSizeGB = KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB();
+ DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
+ int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
+ int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
+ int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
+ job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+ job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB));
+ job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
+ job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
+ job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
+ // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class);
+
+ // Passed the sandbox property to mapper, to simulate large dataset
+ if (System.getProperty("useSandbox") != null && System.getProperty("useSandbox").equals("true")) {
+ job.getConfiguration().setBoolean("useSandbox", true);
+ }
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
new file mode 100644
index 0000000..6f2d2bc
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinMapper;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
+
+ private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
+
+ private LongWritable outputValue = new LongWritable(0);
+
+ private long bytesRead = 0;
+
+ private Text lastKey;
+
+ private Long scaleFactorForSandbox = 1L;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ if (context.getConfiguration().getBoolean("useSandbox", false)) {
+ scaleFactorForSandbox = 1024L;
+ }
+ }
+
+ @Override
+ public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ lastKey = key;
+
+ int bytesLength = key.getLength() + value.getLength();
+ bytesRead += bytesLength;
+
+ if ((bytesRead * scaleFactorForSandbox) >= ONE_MEGA_BYTES) {
+ outputValue.set(bytesRead * scaleFactorForSandbox);
+ context.write(key, outputValue);
+
+ // reset bytesRead
+ bytesRead = 0;
+ }
+
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ if (lastKey != null) {
+ outputValue.set(bytesRead);
+ context.write(lastKey, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
new file mode 100644
index 0000000..acdab62
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
+
+ public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
+ private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
+
+ private LongWritable outputValue = new LongWritable(0);
+
+ private int minRegionCount = 1;
+ private int maxRegionCount = 500;
+ private int cut = 10;
+ private int hfileSizeGB = 1;
+ private long bytesRead = 0;
+ private List<Text> gbPoints = new ArrayList<Text>();
+ private String output = null;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) {
+ output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH);
+ }
+
+ if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) {
+ hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB));
+ }
+
+ if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
+ cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
+ }
+
+ if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
+ minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
+ }
+
+ if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
+ maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
+ }
+
+ logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount
+ + ", min region count=" + minRegionCount + ", hfile size=" + hfileSizeGB);
+
+ // add empty key at position 0
+ gbPoints.add(new Text());
+ }
+
+ @Override
+ public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ for (LongWritable v : values) {
+ bytesRead += v.get();
+ }
+
+ if (bytesRead >= ONE_GIGA_BYTES) {
+ gbPoints.add(new Text(key));
+ bytesRead = 0; // reset bytesRead
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ int nRegion = Math.round((float) gbPoints.size() / (float) cut);
+ nRegion = Math.max(minRegionCount, nRegion);
+ nRegion = Math.min(maxRegionCount, nRegion);
+
+ int gbPerRegion = gbPoints.size() / nRegion;
+ gbPerRegion = Math.max(1, gbPerRegion);
+
+ if (hfileSizeGB <= 0) {
+ hfileSizeGB = gbPerRegion;
+ }
+ int hfilePerRegion = gbPerRegion / hfileSizeGB;
+ hfilePerRegion = Math.max(1, hfilePerRegion);
+
+ System.out.println(nRegion + " regions");
+ System.out.println(gbPerRegion + " GB per region");
+ System.out.println(hfilePerRegion + " hfile per region");
+
+ Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
+ try (SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(
+ hfilePartitionFile.getFileSystem(context.getConfiguration()),
+ context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class)) {
+ int hfileCountInOneRegion = 0;
+ for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
+ hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
+ if (++hfileCountInOneRegion >= hfilePerRegion) {
+ Text key = gbPoints.get(i);
+ outputValue.set(i);
+ System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+ context.write(key, outputValue);
+
+ hfileCountInOneRegion = 0;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
index ba0da00..346c3a2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +50,8 @@ public class HBaseRegionSizeCalculator {
**/
private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ private final Map<byte[], Pair<Integer, Integer>> countMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
/**
@@ -93,6 +96,7 @@ public class HBaseRegionSizeCalculator {
long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
sizeMap.put(regionId, regionSizeBytes);
+ countMap.put(regionId, new Pair<>(regionLoad.getStores(), regionLoad.getStorefiles()));
// logger.info("Region " + regionLoad.getNameAsString()
// + " has size " + regionSizeBytes);
@@ -125,4 +129,8 @@ public class HBaseRegionSizeCalculator {
public Map<byte[], Long> getRegionSizeMap() {
return Collections.unmodifiableMap(sizeMap);
}
+
+ public Map<byte[], Pair<Integer, Integer>> getRegionHFileCountMap() {
+ return Collections.unmodifiableMap(countMap);
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
index 7f5b24b..70e1ac7 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
index ca716c3..03a3cba 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.engine.mr.steps.RangeKeyDistributionMapper;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
index cbf0657..c027c40 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.kylin.engine.mr.steps.RangeKeyDistributionReducer;
import org.junit.Before;
import org.junit.Test;