You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/05 08:29:41 UTC

[2/3] incubator-kylin git commit: next step: enable rowkeyencoder deal with gtrecord

next step: enable rowkeyencoder deal with gtrecord


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6a9c7413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6a9c7413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6a9c7413

Branch: refs/heads/KYLIN-1126
Commit: 6a9c741383d607e6cb7e5d552c8377f7c29c0db7
Parents: 9ac673a
Author: honma <ho...@ebay.com>
Authored: Thu Nov 5 11:12:58 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Nov 5 11:12:58 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeSegment.java | 18 +++---
 .../kylin/cube/kv/AbstractRowKeyEncoder.java    | 16 ++----
 .../apache/kylin/cube/kv/FuzzyMaskEncoder.java  | 12 ++--
 .../org/apache/kylin/cube/kv/RowConstants.java  |  1 +
 .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 59 +++++++++++---------
 .../coprocessor/CoprocessorProjector.java       |  5 +-
 6 files changed, 60 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 1a44fcf..bd36dfa 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -69,6 +69,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     private String lastBuildJobID;
     @JsonProperty("create_time_utc")
     private long createTimeUTC;
+    @JsonProperty("enable_sharding")
+    private boolean enableSharding = true;
     @JsonProperty("cuboid_shard_nums")
     private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
     @JsonProperty("total_shards")
@@ -368,6 +370,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
         return cubeInstance.getStorageType();
     }
 
+    public boolean isEnableSharding() {
+        return enableSharding;
+    }
+
+    public void setEnableSharding(boolean enableSharding) {
+        this.enableSharding = enableSharding;
+    }
+
     /**
      * get the number of shards where each cuboid will distribute
      * @return
@@ -381,14 +391,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
         }
     }
 
-    //    /**
-    //     * get the number of shards where each cuboid will distribute
-    //     * @return
-    //     */
-    //    public Map<Long, Short> getCuboidShards() {
-    //        return this.cuboidShards;
-    //    }
-
     public void setCuboidShardNums(Map<Long, Short> newCuboidShards) {
         this.cuboidShardNums = newCuboidShards;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index 231f737..b7de983 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -34,30 +34,26 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractRowKeyEncoder {
 
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
     public static final byte DEFAULT_BLANK_BYTE = Dictionary.NULL;
 
-    protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
+    protected final Cuboid cuboid;
+    protected final CubeSegment cubeSeg;
+    protected byte blankByte = DEFAULT_BLANK_BYTE;
 
     public static AbstractRowKeyEncoder createInstance(CubeSegment cubeSeg, Cuboid cuboid) {
         return new RowKeyEncoder(cubeSeg, cuboid);
     }
 
-    protected final Cuboid cuboid;
-    protected byte blankByte = DEFAULT_BLANK_BYTE;
-    protected boolean encodeShard = true;
-
-    protected AbstractRowKeyEncoder(Cuboid cuboid) {
+    protected AbstractRowKeyEncoder(CubeSegment cubeSeg,Cuboid cuboid) {
         this.cuboid = cuboid;
+        this.cubeSeg = cubeSeg;
     }
 
     public void setBlankByte(byte blankByte) {
         this.blankByte = blankByte;
     }
 
-    public void setEncodeShard(boolean encodeShard) {
-        this.encodeShard = encodeShard;
-    }
-
     abstract public byte[] encode(Map<TblColRef, String> valueMap);
 
     abstract public byte[] encode(byte[][] values);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index bf67538..7748e8c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -36,11 +36,15 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
     }
 
     @Override
-    protected int fillHeader(byte[] bytes) {
-        Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
+    protected void fillHeader(byte[] bytes) {
+        int offset = 0;
+        if (enableSharding) {
+            Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
+            offset += RowConstants.ROWKEY_SHARDID_LEN;
+        }
         // always fuzzy match cuboid ID to lock on the selected cuboid
-        Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.BYTE_ZERO);
-        return this.headerLength;
+        int headerLength = this.getHeaderLength();
+        Arrays.fill(bytes, offset, headerLength, RowConstants.BYTE_ZERO);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 6a8eeb5..09bccc3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -33,6 +33,7 @@ public class RowConstants {
     public static final int ROWKEY_SHARDID_LEN = 2;
 
     public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
+    public static final int ROWKEY_SHARD_AND_CUBOID_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
     
     public static final byte BYTE_ZERO = 0;
     public static final byte BYTE_ONE = 1;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 0676df6..059288a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -32,18 +32,35 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 public class RowKeyEncoder extends AbstractRowKeyEncoder {
 
-    private int bytesLength;
-    protected int headerLength;
+    private int bodyLength = 0;
     private RowKeyColumnIO colIO;
-    CubeSegment cubeSeg;
+    protected boolean enableSharding;
 
     protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
-        super(cuboid);
-        this.cubeSeg = cubeSeg;
+        super(cubeSeg, cuboid);
+        enableSharding = cubeSeg.isEnableSharding();
         colIO = new RowKeyColumnIO(cubeSeg);
-        bytesLength = headerLength = RowConstants.ROWKEY_HEADER_LEN; // include shard and cuboidid 
         for (TblColRef column : cuboid.getColumns()) {
-            bytesLength += colIO.getColumnLength(column);
+            bodyLength += colIO.getColumnLength(column);
+        }
+    }
+
+    protected int getHeaderLength() {
+        return enableSharding ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : RowConstants.ROWKEY_CUBOIDID_LEN;
+    }
+
+    protected int getBytesLength() {
+        return getHeaderLength() + bodyLength;
+    }
+
+    protected short calculateShard(byte[] key) {
+        if (enableSharding) {
+            int bodyOffset = RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN;
+            short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+            short shardOffset = ShardingHash.getShard(key, bodyOffset, bodyLength, cuboidShardNum);
+            return ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
+        } else {
+            throw new RuntimeException("If enableSharding false, you should never caculate shard");
         }
     }
 
@@ -71,9 +88,8 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
 
     @Override
     public byte[] encode(byte[][] values) {
-        byte[] bytes = new byte[this.bytesLength];
-        int bodyOffset = RowConstants.ROWKEY_HEADER_LEN;
-        int offset = bodyOffset;
+        byte[] bytes = new byte[this.getBytesLength()];
+        int offset = getHeaderLength();
 
         for (int i = 0; i < cuboid.getColumns().size(); i++) {
             TblColRef column = cuboid.getColumns().get(i);
@@ -93,27 +109,18 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
         return bytes;
     }
 
-    protected int fillHeader(byte[] bytes) {
+    protected void fillHeader(byte[] bytes) {
         int offset = 0;
 
-        if (encodeShard) {
-            short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
-            short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
-            short finalShard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
-            BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
-        } else {
-            BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+        if (enableSharding) {
+            short shard = calculateShard(bytes);
+            BytesUtil.writeShort(shard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+            offset += RowConstants.ROWKEY_SHARDID_LEN;
         }
-        offset += RowConstants.ROWKEY_SHARDID_LEN;
 
         System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
-        offset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        if (this.headerLength != offset) {
-            throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
-        }
-        
-        return offset;
+        //offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+        //return offset;
     }
 
     protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 9b839c3..c37b2f4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -41,9 +41,8 @@ public class CoprocessorProjector {
 
         RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
             @Override
-            protected int fillHeader(byte[] bytes) {
-                Arrays.fill(bytes, 0, this.headerLength, (byte) 0xff);
-                return this.headerLength;
+            protected void fillHeader(byte[] bytes) {
+                Arrays.fill(bytes, 0, this.getHeaderLength(), (byte) 0xff);
             }
 
             @Override