You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/09 04:14:04 UTC

incubator-kylin git commit: KYLIN-965 make RangeKeyDistributionReducer also reading from kylin.properties

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging fdca970bf -> 70c700dc1


KYLIN-965 make RangeKeyDistributionReducer also reading from kylin.properties

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/70c700dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/70c700dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/70c700dc

Branch: refs/heads/2.x-staging
Commit: 70c700dc12bbd6d06c5d918e855e07252eace8cc
Parents: fdca970
Author: shaofengshi <sh...@apache.org>
Authored: Wed Sep 9 10:14:01 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Sep 9 10:14:01 2015 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/common/BatchConstants.java  |  6 ++--
 .../mr/steps/RangeKeyDistributionJob.java       | 14 +++++---
 .../mr/steps/RangeKeyDistributionReducer.java   | 38 +++++++++-----------
 3 files changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/70c700dc/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 09397d6..b305741 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -38,8 +38,10 @@ public interface BatchConstants {
 
     String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
 
-    String CUBE_CAPACITY = "cube.capacity";
-
+    String REGION_NUMBER_MIN = "region.number.min";
+    String REGION_NUMBER_MAX = "region.number.max";
+    String REGION_SPLIT_SIZE = "region.split.size";
+    
     String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
     String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/";
     

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/70c700dc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
index 757c70d..5632fc1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
@@ -32,7 +32,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+import org.apache.kylin.metadata.model.DataModelDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,9 +91,15 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob {
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
-            RealizationCapacity realizationCapacity = cube.getDescriptor().getModel().getCapacity();
-            job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, realizationCapacity.toString());
-
+            DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
+            int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
+            int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
+            int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
+            
+            job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
+            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
+            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
+            
             return waitForCompletion(job);
         } catch (Exception e) {
             printUsage(options);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/70c700dc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
index 551a643..68be74e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.DataModelDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,17 +37,13 @@ import org.slf4j.LoggerFactory;
 public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
 
     public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
-    public static final int SMALL_CUT = 5; //  5 GB per region
-    public static final int MEDIUM_CUT = 10; //  10 GB per region
-    public static final int LARGE_CUT = 50; // 50 GB per region
-
-    public static final int MAX_REGION = 1000;
-
     private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
 
     private LongWritable outputValue = new LongWritable(0);
 
-    private int cut;
+    private int minRegionCount = 1;
+    private int maxRegionCount = 500;
+    private int cut = 10;
     private long bytesRead = 0;
     private List<Text> gbPoints = new ArrayList<Text>();
 
@@ -56,20 +51,19 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
     protected void setup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
 
-        DataModelDesc.RealizationCapacity cubeCapacity = DataModelDesc.RealizationCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
-        switch (cubeCapacity) {
-        case SMALL:
-            cut = SMALL_CUT;
-            break;
-        case MEDIUM:
-            cut = MEDIUM_CUT;
-            break;
-        case LARGE:
-            cut = LARGE_CUT;
-            break;
+        if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
+            cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
+        }
+
+        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
+            minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
+        }
+
+        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
+            maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
         }
 
-        logger.info("Chosen cut for htable is " + cut);
+        logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount);
     }
 
     @Override
@@ -87,8 +81,8 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
         int nRegion = Math.round((float) gbPoints.size() / (float) cut);
-        nRegion = Math.max(1, nRegion);
-        nRegion = Math.min(MAX_REGION, nRegion);
+        nRegion = Math.max(minRegionCount, nRegion);
+        nRegion = Math.min(maxRegionCount, nRegion);
 
         int gbPerRegion = gbPoints.size() / nRegion;
         gbPerRegion = Math.max(1, gbPerRegion);