You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/25 10:50:59 UTC

incubator-kylin git commit: minor change

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-942 eaef279c7 -> 7c4847678


minor change


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7c484767
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7c484767
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7c484767

Branch: refs/heads/KYLIN-942
Commit: 7c4847678ec04cd4912e96558951e99d6792e53c
Parents: eaef279
Author: honma <ho...@ebay.com>
Authored: Fri Sep 25 10:18:24 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Sep 25 16:53:53 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/cube/kv/FuzzyMaskEncoder.java  |  3 +-
 .../org/apache/kylin/cube/kv/RowKeyEncoder.java |  2 +-
 .../kylin/engine/mr/common/CuboidShardUtil.java |  8 +--
 .../mr/steps/MapContextGTRecordWriter.java      |  2 +-
 server/src/main/resources/log4j.properties      |  2 +-
 .../storage/hbase/steps/CreateHTableJob.java    | 52 ++++++++++++--------
 .../kylin/storage/hbase/steps/MergeGCStep.java  | 10 +++-
 7 files changed, 50 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 1c2f4ee..254482c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -37,8 +37,9 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
 
     @Override
     protected int fillHeader(byte[] bytes) {
+        Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.FUZZY_MASK_ONE);
         // always fuzzy match cuboid ID to lock on the selected cuboid
-        Arrays.fill(bytes, 0, RowConstants.ROWKEY_HEADER_LEN, RowConstants.FUZZY_MASK_ZERO);
+        Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.FUZZY_MASK_ZERO);
         return this.headerLength;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 736a2b9..bc4a927 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -112,7 +112,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
         if (this.headerLength != offset) {
             throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
         }
-
+        
         return offset;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
index 4839ab0..507f5c4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
@@ -34,11 +34,11 @@ import com.google.common.collect.Maps;
 public class CuboidShardUtil {
     protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class);
 
-//    public static Map<Long, Short> loadCuboidShards(CubeSegment segment) {
-//        return DefaultedMap.defaultedMap(segment.getCuboidShards(), (short) 1);
-//    }
+    //    public static Map<Long, Short> loadCuboidShards(CubeSegment segment) {
+    //        return DefaultedMap.defaultedMap(segment.getCuboidShards(), (short) 1);
+    //    }
 
-    public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards,int totalShards) throws IOException {
+    public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards, int totalShards) throws IOException {
         CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         Map<Long, Short> filered = Maps.filterEntries(cuboidShards, new Predicate<Map.Entry<Long, Short>>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 402bec0..7510c40 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -71,7 +71,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
         //fill shard
         short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
         short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum);
-        Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
+        short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
         short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
         BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/server/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/server/src/main/resources/log4j.properties b/server/src/main/resources/log4j.properties
index ef4bff4..b04538a 100644
--- a/server/src/main/resources/log4j.properties
+++ b/server/src/main/resources/log4j.properties
@@ -30,7 +30,7 @@ log4j.logger.org.springframework=WARN
 log4j.logger.org.apache.kylin.rest.controller.QueryController=DEBUG, query
 log4j.logger.org.apache.kylin.rest.service.QueryService=DEBUG, query
 log4j.logger.org.apache.kylin.query=DEBUG, query
-log4j.logger.org.apache.kylin.storage=DEBUG, query
+#log4j.logger.org.apache.kylin.storage=DEBUG, query //too many stuff in storage package now
 
 #job config
 log4j.logger.org.apache.kylin.rest.controller.JobController=DEBUG, job

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 45a5f96..2444c6a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -250,19 +250,19 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
         logger.info("Cube capacity " + cubeCapacity.toString() + ", chosen cut for HTable is " + cut + "GB");
 
-        long totalSizeInM = 0;
+        double totalSizeInM = 0;
 
         List<Long> allCuboids = Lists.newArrayList();
         allCuboids.addAll(cubeRowCountMap.keySet());
         Collections.sort(allCuboids);
 
-        Map<Long, Long> cubeSizeMap = Maps.transformEntries(cubeRowCountMap, new Maps.EntryTransformer<Long, Long, Long>() {
+        Map<Long, Double> cubeSizeMap = Maps.transformEntries(cubeRowCountMap, new Maps.EntryTransformer<Long, Long, Double>() {
             @Override
-            public Long transformEntry(@Nullable Long key, @Nullable Long value) {
+            public Double transformEntry(@Nullable Long key, @Nullable Long value) {
                 return estimateCuboidStorageSize(cubeDesc, key, value, baseCuboidId, rowkeyColumnSize);
             }
         });
-        for (Long cuboidSize : cubeSizeMap.values()) {
+        for (Double cuboidSize : cubeSizeMap.values()) {
             totalSizeInM += cuboidSize;
         }
 
@@ -270,14 +270,22 @@ public class CreateHTableJob extends AbstractHadoopJob {
         nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion);
         nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);
 
-        //TODO: remove nRegion > 1 so that even small cubes will have at least two regions
-        if (ENABLE_CUBOID_SHARDING && (nRegion > 1)) {
+        if (ENABLE_CUBOID_SHARDING) {//&& (nRegion > 1)) {
             //use prime nRegions to help random sharding
+            int original = nRegion;
             nRegion = Primes.nextPrime(nRegion);//return 2 for input 1
-            logger.info("Region count is adjusted to " + nRegion + " to help random sharding");
+
+            if (nRegion > Short.MAX_VALUE) {
+                logger.info("Too many regions! reduce to " + Short.MAX_VALUE);
+                nRegion = Short.MAX_VALUE;
+            }
+
+            if (nRegion != original) {
+                logger.info("Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
+            }
         }
 
-        int mbPerRegion = (int) (totalSizeInM / (nRegion));
+        int mbPerRegion = (int) (totalSizeInM / nRegion);
         mbPerRegion = Math.max(1, mbPerRegion);
 
         logger.info("Total size " + totalSizeInM + "M (estimated)");
@@ -287,17 +295,20 @@ public class CreateHTableJob extends AbstractHadoopJob {
         if (ENABLE_CUBOID_SHARDING) {
             //each cuboid will be split into different number of shards
             HashMap<Long, Short> cuboidShards = Maps.newHashMap();
-            long[] regionSizes = new long[nRegion];
+            double[] regionSizes = new double[nRegion];
             for (long cuboidId : allCuboids) {
-                long estimatedSize = cubeSizeMap.get(cuboidId);
-                double magic = Math.PI;
-                int shard = (int) (1.0 * estimatedSize / mbPerRegion * magic);
-                if (shard == 0) {
+                double estimatedSize = cubeSizeMap.get(cuboidId);
+                double magic = 10;
+                int shard = (int) (1.0 * estimatedSize * magic / mbPerRegion);
+                if (shard < 1) {
                     shard = 1;
                 }
-                if (shard > Short.MAX_VALUE) {
-                    logger.info(String.format("Cuboid %d 's estimated size %d MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shard, Short.MAX_VALUE));
-                    shard = Short.MAX_VALUE;
+
+                if (shard > nRegion) {
+                    logger.info(String.format("Cuboid %d 's estimated size %0.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shard, nRegion));
+                    shard = nRegion;
+                } else {
+                    logger.info(String.format("Cuboid %d 's estimated size %0.2f MB will generate %d regions", cuboidId, estimatedSize, shard));
                 }
 
                 cuboidShards.put(cuboidId, (short) shard);
@@ -309,7 +320,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
 
             for (int i = 0; i < nRegion; ++i) {
-                logger.info(String.format("Region %d's estimated size is %d MB, accounting for %0.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
+                logger.info(String.format("Region %d's estimated size is %0.2f MB, accounting for %0.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
             }
 
             CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion);
@@ -353,7 +364,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
      * @param rowCount
      * @return the cuboid size in M bytes
      */
-    private static long estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
+    private static double estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
 
         int bytesLength = RowConstants.ROWKEY_HEADER_LEN;
 
@@ -380,8 +391,9 @@ public class CreateHTableJob extends AbstractHadoopJob {
         bytesLength += space;
 
         logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes.");
-        logger.info("Cuboid " + cuboidId + " total size is " + (bytesLength * rowCount / (1024L * 1024L)) + "M.");
-        return bytesLength * rowCount / (1024L * 1024L);
+        double ret = 1.0 * (bytesLength * rowCount / (1024L * 1024L));
+        logger.info("Cuboid " + cuboidId + " total size is " + ret + "M.");
+        return ret;
     }
 
     public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c484767/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
index df42560..a4a8a35 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -56,8 +56,16 @@ public class MergeGCStep extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
 
+        try {
+            logger.info("Sleep one minute before deleting the Htables");
+            Thread.sleep(60000);
+        } catch (InterruptedException e) {
+            logger.warn("Thread interrupted");
+        }
+        
+        logger.info("Start doing merge gc work");
+        
         StringBuffer output = new StringBuffer();
-
         List<String> oldTables = getOldHTables();
         if (oldTables != null && oldTables.size() > 0) {
             String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();