You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/01/31 12:57:25 UTC

[27/53] [abbrv] incubator-kylin git commit: smoother region split

smoother region split


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/46fd452f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/46fd452f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/46fd452f

Branch: refs/heads/master
Commit: 46fd452f95d9717936fc96eb011a9792bdfae1d1
Parents: 5ac6241
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Jan 22 16:38:29 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu Jan 22 16:38:29 2015 +0800

----------------------------------------------------------------------
 .../cube/RangeKeyDistributionReducer.java       | 62 +++++++++++---------
 .../job/hadoop/hbase/CreateHTableJob.java       | 28 +++++----
 2 files changed, 51 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/46fd452f/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
index 5bc8517..e7f22cf 100644
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -17,6 +17,8 @@
 package com.kylinolap.job.hadoop.cube;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -32,62 +34,66 @@ import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
  */
 public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
 
-    public static final long TEN_GIGA_BYTES = 10L * 1024L * 1024L * 1024L;
-    public static final long TWENTY_GIGA_BYTES = 20L * 1024L * 1024L * 1024L;
-    public static final long HUNDRED_GIGA_BYTES = 100L * 1024L * 1024L * 1024L;
+    public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
+    public static final int SMALL_CUT = 10;  //  10 GB per region
+    public static final int MEDIUM_CUT = 20; //  20 GB per region
+    public static final int LARGE_CUT = 100; // 100 GB per region
+    
+    public static final int MAX_REGION = 200;
 
     private LongWritable outputValue = new LongWritable(0);
 
+    private int cut;
     private long bytesRead = 0;
-    private Text lastKey;
-
-    private CubeCapacity cubeCapacity;
-    private long cut;
+    private List<Text> gbPoints = new ArrayList<Text>();
 
     @Override
     protected void setup(Context context) throws IOException {
         super.publishConfiguration(context.getConfiguration());
 
-        cubeCapacity = CubeCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
+        CubeCapacity cubeCapacity = CubeCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
         switch (cubeCapacity) {
         case SMALL:
-            cut = TEN_GIGA_BYTES;
+            cut = SMALL_CUT;
             break;
         case MEDIUM:
-            cut = TWENTY_GIGA_BYTES;
+            cut = MEDIUM_CUT;
             break;
         case LARGE:
-            cut = HUNDRED_GIGA_BYTES;
+            cut = LARGE_CUT;
             break;
         }
     }
 
     @Override
     public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-        lastKey = key;
-        long length = 0;
         for (LongWritable v : values) {
-            length += v.get();
+            bytesRead += v.get();
         }
-
-        bytesRead += length;
-
-        if (bytesRead >= cut) {
-            outputValue.set(bytesRead);
-            context.write(key, outputValue);
-            System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
-            // reset bytesRead
-            bytesRead = 0;
+        
+        if (bytesRead >= ONE_GIGA_BYTES) {
+            gbPoints.add(new Text(key));
+            bytesRead = 0; // reset bytesRead
         }
-
     }
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        if (lastKey != null) {
-            outputValue.set(bytesRead);
-            context.write(lastKey, outputValue);
-            System.out.println(StringUtils.byteToHexString(lastKey.getBytes()) + "\t" + outputValue.get());
+        int nRegion = Math.round((float) gbPoints.size() / (float) cut);
+        nRegion = Math.max(1,  nRegion);
+        nRegion = Math.min(MAX_REGION, nRegion);
+        
+        int gbPerRegion = gbPoints.size() / nRegion;
+        gbPerRegion = Math.max(1, gbPerRegion);
+        
+        System.out.println(nRegion + " regions");
+        System.out.println(gbPerRegion + " GB per region");
+        
+        for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
+            Text key = gbPoints.get(i);
+            outputValue.set(i);
+            System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+            context.write(key, outputValue);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/46fd452f/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
index eec398b..4f7dd1e 100644
--- a/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
         // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
         tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
-        tableDesc.setValue(CubeManager.getHtableMetadataKey(),config.getMetadataUrlPrefix());
+        tableDesc.setValue(CubeManager.getHtableMetadataKey(), config.getMetadataUrlPrefix());
 
         Configuration conf = HBaseConfiguration.create(getConf());
         HBaseAdmin admin = new HBaseAdmin(conf);
@@ -152,17 +153,20 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
     @SuppressWarnings("deprecation")
     public byte[][] getSplits(Configuration conf, Path path) throws Exception {
+        FileSystem fs = path.getFileSystem(conf);
+        if (fs.exists(path) == false) {
+            System.err.println("Path " + path + " not found, no region split, HTable will be one region");
+            return null;
+        }
+
         List<byte[]> rowkeyList = new ArrayList<byte[]>();
         SequenceFile.Reader reader = null;
         try {
-            reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
+            reader = new SequenceFile.Reader(fs, path, conf);
             Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
             Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
             while (reader.next(key, value)) {
-                byte[] tmp = ((Text) key).copyBytes();
-                if (rowkeyList.contains(tmp) == false) {
-                    rowkeyList.add(tmp);
-                }
+                rowkeyList.add(((Text) key).copyBytes());
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -170,13 +174,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
         } finally {
             IOUtils.closeStream(reader);
         }
-
-        byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-        if (retValue.length == 0) {
-            throw new IllegalStateException("Split number is 0, no records in cube??");
+        
+        System.out.println((rowkeyList.size() + 1) + " regions");
+        System.out.println(rowkeyList.size() + " splits");
+        for (byte[] split : rowkeyList) {
+            System.out.println(StringUtils.byteToHexString(split));
         }
 
-        return retValue;
+        byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
+        return retValue.length == 0 ? null : retValue;
     }
 
     public static void main(String[] args) throws Exception {