You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/01/23 09:32:00 UTC
[13/50] [abbrv] incubator-kylin git commit: smoother region split
smoother region split
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/46fd452f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/46fd452f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/46fd452f
Branch: refs/heads/inverted-index
Commit: 46fd452f95d9717936fc96eb011a9792bdfae1d1
Parents: 5ac6241
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Jan 22 16:38:29 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu Jan 22 16:38:29 2015 +0800
----------------------------------------------------------------------
.../cube/RangeKeyDistributionReducer.java | 62 +++++++++++---------
.../job/hadoop/hbase/CreateHTableJob.java | 28 +++++----
2 files changed, 51 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/46fd452f/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
index 5bc8517..e7f22cf 100644
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -17,6 +17,8 @@
package com.kylinolap.job.hadoop.cube;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -32,62 +34,66 @@ import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
*/
public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
- public static final long TEN_GIGA_BYTES = 10L * 1024L * 1024L * 1024L;
- public static final long TWENTY_GIGA_BYTES = 20L * 1024L * 1024L * 1024L;
- public static final long HUNDRED_GIGA_BYTES = 100L * 1024L * 1024L * 1024L;
+ public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
+ public static final int SMALL_CUT = 10; // 10 GB per region
+ public static final int MEDIUM_CUT = 20; // 20 GB per region
+ public static final int LARGE_CUT = 100; // 100 GB per region
+
+ public static final int MAX_REGION = 200;
private LongWritable outputValue = new LongWritable(0);
+ private int cut;
private long bytesRead = 0;
- private Text lastKey;
-
- private CubeCapacity cubeCapacity;
- private long cut;
+ private List<Text> gbPoints = new ArrayList<Text>();
@Override
protected void setup(Context context) throws IOException {
super.publishConfiguration(context.getConfiguration());
- cubeCapacity = CubeCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
+ CubeCapacity cubeCapacity = CubeCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
switch (cubeCapacity) {
case SMALL:
- cut = TEN_GIGA_BYTES;
+ cut = SMALL_CUT;
break;
case MEDIUM:
- cut = TWENTY_GIGA_BYTES;
+ cut = MEDIUM_CUT;
break;
case LARGE:
- cut = HUNDRED_GIGA_BYTES;
+ cut = LARGE_CUT;
break;
}
}
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
- lastKey = key;
- long length = 0;
for (LongWritable v : values) {
- length += v.get();
+ bytesRead += v.get();
}
-
- bytesRead += length;
-
- if (bytesRead >= cut) {
- outputValue.set(bytesRead);
- context.write(key, outputValue);
- System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
- // reset bytesRead
- bytesRead = 0;
+
+ if (bytesRead >= ONE_GIGA_BYTES) {
+ gbPoints.add(new Text(key));
+ bytesRead = 0; // reset bytesRead
}
-
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
- if (lastKey != null) {
- outputValue.set(bytesRead);
- context.write(lastKey, outputValue);
- System.out.println(StringUtils.byteToHexString(lastKey.getBytes()) + "\t" + outputValue.get());
+ int nRegion = Math.round((float) gbPoints.size() / (float) cut);
+ nRegion = Math.max(1, nRegion);
+ nRegion = Math.min(MAX_REGION, nRegion);
+
+ int gbPerRegion = gbPoints.size() / nRegion;
+ gbPerRegion = Math.max(1, gbPerRegion);
+
+ System.out.println(nRegion + " regions");
+ System.out.println(gbPerRegion + " GB per region");
+
+ for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
+ Text key = gbPoints.get(i);
+ outputValue.set(i);
+ System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+ context.write(key, outputValue);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/46fd452f/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
index eec398b..4f7dd1e 100644
--- a/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
// https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
- tableDesc.setValue(CubeManager.getHtableMetadataKey(),config.getMetadataUrlPrefix());
+ tableDesc.setValue(CubeManager.getHtableMetadataKey(), config.getMetadataUrlPrefix());
Configuration conf = HBaseConfiguration.create(getConf());
HBaseAdmin admin = new HBaseAdmin(conf);
@@ -152,17 +153,20 @@ public class CreateHTableJob extends AbstractHadoopJob {
@SuppressWarnings("deprecation")
public byte[][] getSplits(Configuration conf, Path path) throws Exception {
+ FileSystem fs = path.getFileSystem(conf);
+ if (fs.exists(path) == false) {
+ System.err.println("Path " + path + " not found, no region split, HTable will be one region");
+ return null;
+ }
+
List<byte[]> rowkeyList = new ArrayList<byte[]>();
SequenceFile.Reader reader = null;
try {
- reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
+ reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
- byte[] tmp = ((Text) key).copyBytes();
- if (rowkeyList.contains(tmp) == false) {
- rowkeyList.add(tmp);
- }
+ rowkeyList.add(((Text) key).copyBytes());
}
} catch (Exception e) {
e.printStackTrace();
@@ -170,13 +174,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
} finally {
IOUtils.closeStream(reader);
}
-
- byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
- if (retValue.length == 0) {
- throw new IllegalStateException("Split number is 0, no records in cube??");
+
+ System.out.println((rowkeyList.size() + 1) + " regions");
+ System.out.println(rowkeyList.size() + " splits");
+ for (byte[] split : rowkeyList) {
+ System.out.println(StringUtils.byteToHexString(split));
}
- return retValue;
+ byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
+ return retValue.length == 0 ? null : retValue;
}
public static void main(String[] args) throws Exception {