You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/05 08:29:41 UTC
[2/3] incubator-kylin git commit: next step: enable rowkeyencoder
deal with gtrecord
next step: enable rowkeyencoder deal with gtrecord
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6a9c7413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6a9c7413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6a9c7413
Branch: refs/heads/KYLIN-1126
Commit: 6a9c741383d607e6cb7e5d552c8377f7c29c0db7
Parents: 9ac673a
Author: honma <ho...@ebay.com>
Authored: Thu Nov 5 11:12:58 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Nov 5 11:12:58 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeSegment.java | 18 +++---
.../kylin/cube/kv/AbstractRowKeyEncoder.java | 16 ++----
.../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 12 ++--
.../org/apache/kylin/cube/kv/RowConstants.java | 1 +
.../org/apache/kylin/cube/kv/RowKeyEncoder.java | 59 +++++++++++---------
.../coprocessor/CoprocessorProjector.java | 5 +-
6 files changed, 60 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 1a44fcf..bd36dfa 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -69,6 +69,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
private String lastBuildJobID;
@JsonProperty("create_time_utc")
private long createTimeUTC;
+ @JsonProperty("enable_sharding")
+ private boolean enableSharding = true;
@JsonProperty("cuboid_shard_nums")
private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
@JsonProperty("total_shards")
@@ -368,6 +370,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
return cubeInstance.getStorageType();
}
+ public boolean isEnableSharding() {
+ return enableSharding;
+ }
+
+ public void setEnableSharding(boolean enableSharding) {
+ this.enableSharding = enableSharding;
+ }
+
/**
* get the number of shards where each cuboid will distribute
* @return
@@ -381,14 +391,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
}
}
- // /**
- // * get the number of shards where each cuboid will distribute
- // * @return
- // */
- // public Map<Long, Short> getCuboidShards() {
- // return this.cuboidShards;
- // }
-
public void setCuboidShardNums(Map<Long, Short> newCuboidShards) {
this.cuboidShardNums = newCuboidShards;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index 231f737..b7de983 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -34,30 +34,26 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractRowKeyEncoder {
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
public static final byte DEFAULT_BLANK_BYTE = Dictionary.NULL;
- protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
+ protected final Cuboid cuboid;
+ protected final CubeSegment cubeSeg;
+ protected byte blankByte = DEFAULT_BLANK_BYTE;
public static AbstractRowKeyEncoder createInstance(CubeSegment cubeSeg, Cuboid cuboid) {
return new RowKeyEncoder(cubeSeg, cuboid);
}
- protected final Cuboid cuboid;
- protected byte blankByte = DEFAULT_BLANK_BYTE;
- protected boolean encodeShard = true;
-
- protected AbstractRowKeyEncoder(Cuboid cuboid) {
+ protected AbstractRowKeyEncoder(CubeSegment cubeSeg,Cuboid cuboid) {
this.cuboid = cuboid;
+ this.cubeSeg = cubeSeg;
}
public void setBlankByte(byte blankByte) {
this.blankByte = blankByte;
}
- public void setEncodeShard(boolean encodeShard) {
- this.encodeShard = encodeShard;
- }
-
abstract public byte[] encode(Map<TblColRef, String> valueMap);
abstract public byte[] encode(byte[][] values);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index bf67538..7748e8c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -36,11 +36,15 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
}
@Override
- protected int fillHeader(byte[] bytes) {
- Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
+ protected void fillHeader(byte[] bytes) {
+ int offset = 0;
+ if (enableSharding) {
+ Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
+ }
// always fuzzy match cuboid ID to lock on the selected cuboid
- Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.BYTE_ZERO);
- return this.headerLength;
+ int headerLength = this.getHeaderLength();
+ Arrays.fill(bytes, offset, headerLength, RowConstants.BYTE_ZERO);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 6a8eeb5..09bccc3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -33,6 +33,7 @@ public class RowConstants {
public static final int ROWKEY_SHARDID_LEN = 2;
public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
+ public static final int ROWKEY_SHARD_AND_CUBOID_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
public static final byte BYTE_ZERO = 0;
public static final byte BYTE_ONE = 1;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 0676df6..059288a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -32,18 +32,35 @@ import org.apache.kylin.metadata.model.TblColRef;
public class RowKeyEncoder extends AbstractRowKeyEncoder {
- private int bytesLength;
- protected int headerLength;
+ private int bodyLength = 0;
private RowKeyColumnIO colIO;
- CubeSegment cubeSeg;
+ protected boolean enableSharding;
protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
- super(cuboid);
- this.cubeSeg = cubeSeg;
+ super(cubeSeg, cuboid);
+ enableSharding = cubeSeg.isEnableSharding();
colIO = new RowKeyColumnIO(cubeSeg);
- bytesLength = headerLength = RowConstants.ROWKEY_HEADER_LEN; // include shard and cuboidid
for (TblColRef column : cuboid.getColumns()) {
- bytesLength += colIO.getColumnLength(column);
+ bodyLength += colIO.getColumnLength(column);
+ }
+ }
+
+ protected int getHeaderLength() {
+ return enableSharding ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : RowConstants.ROWKEY_CUBOIDID_LEN;
+ }
+
+ protected int getBytesLength() {
+ return getHeaderLength() + bodyLength;
+ }
+
+ protected short calculateShard(byte[] key) {
+ if (enableSharding) {
+ int bodyOffset = RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN;
+ short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+ short shardOffset = ShardingHash.getShard(key, bodyOffset, bodyLength, cuboidShardNum);
+ return ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
+ } else {
+ throw new RuntimeException("If enableSharding false, you should never caculate shard");
}
}
@@ -71,9 +88,8 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
@Override
public byte[] encode(byte[][] values) {
- byte[] bytes = new byte[this.bytesLength];
- int bodyOffset = RowConstants.ROWKEY_HEADER_LEN;
- int offset = bodyOffset;
+ byte[] bytes = new byte[this.getBytesLength()];
+ int offset = getHeaderLength();
for (int i = 0; i < cuboid.getColumns().size(); i++) {
TblColRef column = cuboid.getColumns().get(i);
@@ -93,27 +109,18 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
return bytes;
}
- protected int fillHeader(byte[] bytes) {
+ protected void fillHeader(byte[] bytes) {
int offset = 0;
- if (encodeShard) {
- short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
- short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
- short finalShard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
- BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
- } else {
- BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ if (enableSharding) {
+ short shard = calculateShard(bytes);
+ BytesUtil.writeShort(shard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
}
- offset += RowConstants.ROWKEY_SHARDID_LEN;
System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
- offset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
- if (this.headerLength != offset) {
- throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
- }
-
- return offset;
+ //offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+ //return offset;
}
protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 9b839c3..c37b2f4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -41,9 +41,8 @@ public class CoprocessorProjector {
RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
@Override
- protected int fillHeader(byte[] bytes) {
- Arrays.fill(bytes, 0, this.headerLength, (byte) 0xff);
- return this.headerLength;
+ protected void fillHeader(byte[] bytes) {
+ Arrays.fill(bytes, 0, this.getHeaderLength(), (byte) 0xff);
}
@Override