You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/23 08:17:17 UTC
[2/3] incubator-kylin git commit: add shard as header
add shard as header
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1d351433
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1d351433
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1d351433
Branch: refs/heads/KYLIN-942
Commit: 1d35143323efefdb681fc50c50e6f67f764b9d09
Parents: ec307f6
Author: honma <ho...@ebay.com>
Authored: Tue Sep 22 18:44:26 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Sep 23 14:19:45 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BytesUtil.java | 20 ++++++-
.../apache/kylin/common/util/ShardingHash.java | 23 +++++++-
.../org/apache/kylin/common/util/BasicTest.java | 20 ++-----
.../java/org/apache/kylin/cube/CubeSegment.java | 45 +++++++++++++---
.../kylin/cube/common/RowKeySplitter.java | 31 +++++++----
.../org/apache/kylin/cube/cuboid/Cuboid.java | 4 +-
.../kylin/cube/kv/AbstractRowKeyEncoder.java | 5 ++
.../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 6 +--
.../org/apache/kylin/cube/kv/RowConstants.java | 3 ++
.../org/apache/kylin/cube/kv/RowKeyDecoder.java | 4 +-
.../org/apache/kylin/cube/kv/RowKeyEncoder.java | 56 +++++++++-----------
.../kylin/cube/common/RowKeySplitterTest.java | 20 +++----
.../apache/kylin/cube/kv/RowKeyDecoderTest.java | 10 ++--
.../apache/kylin/cube/kv/RowKeyEncoderTest.java | 32 +++++------
.../kylin/storage/translate/HBaseKeyRange.java | 6 ++-
.../kylin/engine/mr/common/CuboidShardUtil.java | 9 ++--
.../mr/steps/MapContextGTRecordWriter.java | 24 ++++-----
.../mr/steps/MergeCuboidFromStorageMapper.java | 7 ++-
.../engine/mr/steps/MergeCuboidMapper.java | 9 ++--
.../kylin/engine/mr/steps/NDCuboidMapper.java | 25 +++++++--
.../coprocessor/CoprocessorProjector.java | 2 +-
.../common/coprocessor/CoprocessorRowType.java | 2 +-
.../storage/hbase/cube/v1/CubeStorageQuery.java | 50 +++++++++++++++--
.../storage/hbase/steps/CreateHTableJob.java | 2 +-
24 files changed, 271 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0880da1..0d4dba9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -25,14 +25,23 @@ public class BytesUtil {
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
- public static void writeLong(long num, byte[] bytes, int offset, int size) {
+ public static void writeShort(short num, byte[] bytes, int offset, int size) {
for (int i = offset + size - 1; i >= offset; i--) {
bytes[i] = (byte) num;
num >>>= 8;
}
}
- public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+ public static long readShort(byte[] bytes, int offset, int size) {
+ short num = 0;
+ for (int i = offset, n = offset + size; i < n; i++) {
+ num <<= 8;
+ num |= (short) bytes[i] & 0xFF;
+ }
+ return num;
+ }
+
+ public static void writeLong(long num, byte[] bytes, int offset, int size) {
for (int i = offset + size - 1; i >= offset; i--) {
bytes[i] = (byte) num;
num >>>= 8;
@@ -48,6 +57,13 @@ public class BytesUtil {
return integer;
}
+ public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+ for (int i = offset + size - 1; i >= offset; i--) {
+ bytes[i] = (byte) num;
+ num >>>= 8;
+ }
+ }
+
public static int readUnsigned(byte[] bytes, int offset, int size) {
int integer = 0;
for (int i = offset, n = offset + size; i < n; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
index 42eb443..97feda1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
@@ -26,21 +26,42 @@ public class ShardingHash {
static HashFunction hashFunc = Hashing.murmur3_128();
public static short getShard(int integerValue, int totalShards) {
+ if (totalShards <= 1) {
+ return 0;
+ }
long hash = hashFunc.hashInt(integerValue).asLong();
return _getShard(hash, totalShards);
}
public static short getShard(long longValue, int totalShards) {
+ if (totalShards <= 1) {
+ return 0;
+ }
long hash = hashFunc.hashLong(longValue).asLong();
return _getShard(hash, totalShards);
}
public static short getShard(byte[] byteValues, int offset, int length, int totalShards) {
+ if (totalShards <= 1) {
+ return 0;
+ }
+
long hash = hashFunc.hashBytes(byteValues, offset, length).asLong();
return _getShard(hash, totalShards);
}
+ public static short getShard(short cuboidShardBase, short shardOffset, int totalShards) {
+ if (totalShards <= 1) {
+ return 0;
+ }
+ return (short) ((cuboidShardBase + shardOffset) % totalShards);
+ }
+
private static short _getShard(long hash, int totalShard) {
- return (short) (Math.abs(hash) % totalShard);
+ long x = hash % totalShard;
+ if (x < 0) {
+ x += totalShard;
+ }
+ return (short) x;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 21deaba..1590c92 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -27,7 +27,6 @@ import java.util.Calendar;
import java.util.IdentityHashMap;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.math3.primes.Primes;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -65,21 +64,9 @@ public class BasicTest {
@Test
public void testxx() {
- byte[] temp = new byte[] { 1, 2, 3 };
- byte[] temp2 = new byte[] { 1, 2, 3 };
-
- System.out.println(temp.hashCode());
- System.out.println(temp2.hashCode());
-
- ByteBuffer buffer = ByteBuffer.allocateDirect(3);
- buffer.put((byte) 1);
- buffer.put((byte) 1);
- buffer.put((byte) 1);
- buffer.put((byte) 1);
- System.out.println(buffer.position());
- System.out.println(buffer.limit());
- System.out.println(buffer.capacity());
-
+ short x = -64;
+ int y = (int)x;
+ System.out.println(y);
}
@Test
@@ -103,6 +90,7 @@ public class BasicTest {
a.put(s1, null);
b.put(s2, null);
}
+
@Test
@Ignore("convenient trial tool for dev")
public void testX() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d324b9c..1a44fcf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -25,6 +25,7 @@ import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
@@ -37,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
@@ -67,8 +69,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
private String lastBuildJobID;
@JsonProperty("create_time_utc")
private long createTimeUTC;
- @JsonProperty("cuboid_shards")
- private Map<Long, Short> cuboidShards = null;
+ @JsonProperty("cuboid_shard_nums")
+ private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
@JsonProperty("total_shards")
private int totalShards = 0;
@@ -80,6 +82,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
@JsonProperty("snapshots")
private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path
+ private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid
+
public CubeDesc getCubeDesc() {
return getCubeInstance().getDescriptor();
}
@@ -364,14 +368,31 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
return cubeInstance.getStorageType();
}
- public Map<Long, Short> getCuboidShards() {
- return cuboidShards;
+ /**
+ * get the number of shards where each cuboid will distribute
+ * @return
+ */
+ public Short getCuboidShardNum(Long cuboidId) {
+ Short ret = this.cuboidShardNums.get(cuboidId);
+ if (ret == null) {
+ return 1;
+ } else {
+ return ret;
+ }
}
- public void setCuboidShards(Map<Long, Short> newCuboidShards) {
- this.cuboidShards = newCuboidShards;
+ // /**
+ // * get the number of shards where each cuboid will distribute
+ // * @return
+ // */
+ // public Map<Long, Short> getCuboidShards() {
+ // return this.cuboidShards;
+ // }
+
+ public void setCuboidShardNums(Map<Long, Short> newCuboidShards) {
+ this.cuboidShardNums = newCuboidShards;
}
-
+
public int getTotalShards() {
return totalShards;
}
@@ -379,4 +400,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
public void setTotalShards(int totalShards) {
this.totalShards = totalShards;
}
+
+ public short getCuboidBaseShard(Long cuboidId) {
+ Short ret = cuboidBaseShards.get(cuboidId);
+ if (ret == null) {
+ ret = ShardingHash.getShard(cuboidId, totalShards);
+ cuboidBaseShards.put(cuboidId, ret);
+ }
+ return ret;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index 7e379dd..0111cee 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -27,10 +27,6 @@ import org.apache.kylin.cube.kv.RowKeyColumnIO;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.TblColRef;
-/**
- * @author George Song (ysong1)
- *
- */
public class RowKeySplitter {
private CubeDesc cubeDesc;
@@ -39,6 +35,9 @@ public class RowKeySplitter {
private SplittedBytes[] splitBuffers;
private int bufferSize;
+ private long lastSplittedCuboidId;
+ private short lastSplittedShard;
+
public SplittedBytes[] getSplitBuffers() {
return splitBuffers;
}
@@ -47,6 +46,14 @@ public class RowKeySplitter {
return bufferSize;
}
+ public long getLastSplittedCuboidId() {
+ return lastSplittedCuboidId;
+ }
+
+ public short getLastSplittedShard() {
+ return lastSplittedShard;
+ }
+
public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
this.cubeDesc = cubeSeg.getCubeDesc();
this.colIO = new RowKeyColumnIO(cubeSeg);
@@ -60,21 +67,27 @@ public class RowKeySplitter {
/**
* @param bytes
- * @param byteLen
* @return cuboid ID
*/
- public long split(byte[] bytes, int byteLen) {
+ public long split(byte[] bytes) {
this.bufferSize = 0;
int offset = 0;
+ // extract shard
+ SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++];
+ shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN;
+ System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
+
// extract cuboid id
SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN;
System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
offset += RowConstants.ROWKEY_CUBOIDID_LEN;
- long cuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+ lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
+ lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length);
+ Cuboid cuboid = Cuboid.findById(cubeDesc, lastSplittedCuboidId);
// rowkey columns
for (int i = 0; i < cuboid.getColumns().size(); i++) {
@@ -86,6 +99,6 @@ public class RowKeySplitter {
offset += colLength;
}
- return cuboidId;
+ return lastSplittedCuboidId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index a7b2de4..9ee2315 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -28,6 +28,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.RowKeyColDesc;
@@ -36,9 +37,6 @@ import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
import org.apache.kylin.metadata.model.TblColRef;
-/**
- * @author George Song (ysong1)
- */
public class Cuboid implements Comparable<Cuboid> {
private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index f566f5c..1e24432 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -62,6 +62,7 @@ public abstract class AbstractRowKeyEncoder {
protected final Cuboid cuboid;
protected byte blankByte = DEFAULT_BLANK_BYTE;
+ protected boolean encodeShard = true;
protected AbstractRowKeyEncoder(Cuboid cuboid) {
this.cuboid = cuboid;
@@ -71,6 +72,10 @@ public abstract class AbstractRowKeyEncoder {
this.blankByte = blankByte;
}
+ public void setEncodeShard(boolean encodeShard) {
+ this.encodeShard = encodeShard;
+ }
+
abstract public byte[] encode(Map<TblColRef, String> valueMap);
abstract public byte[] encode(byte[][] values);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 5077287..1c2f4ee 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -36,11 +36,9 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
}
@Override
- protected int fillHeader(byte[] bytes, byte[][] values) {
+ protected int fillHeader(byte[] bytes) {
// always fuzzy match cuboid ID to lock on the selected cuboid
- int cuboidStart = this.headerLength - RowConstants.ROWKEY_CUBOIDID_LEN;
- Arrays.fill(bytes, 0, cuboidStart, RowConstants.FUZZY_MASK_ONE);
- Arrays.fill(bytes, cuboidStart, this.headerLength, RowConstants.FUZZY_MASK_ZERO);
+ Arrays.fill(bytes, 0, RowConstants.ROWKEY_HEADER_LEN, RowConstants.FUZZY_MASK_ZERO);
return this.headerLength;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 5db37aa..af4adbb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -26,11 +26,14 @@ public class RowConstants {
public static final byte ROWKEY_LOWER_BYTE = 0;
// row key upper bound
public static final byte ROWKEY_UPPER_BYTE = (byte) 0xff;
+
// row key cuboid id length
public static final int ROWKEY_CUBOIDID_LEN = 8;
// row key shard length
public static final int ROWKEY_SHARDID_LEN = 2;
+ public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
+
// fuzzy mask
public static final byte FUZZY_MASK_ZERO = 0;
public static final byte FUZZY_MASK_ONE = 1;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 1b896a0..3506845 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -53,12 +53,12 @@ public class RowKeyDecoder {
public long decode(byte[] bytes) throws IOException {
this.values.clear();
- long cuboidId = rowKeySplitter.split(bytes, bytes.length);
+ long cuboidId = rowKeySplitter.split(bytes);
initCuboid(cuboidId);
SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
- int offset = 1; // skip cuboid id part
+ int offset = 2; // skip shard and cuboid id part
for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
TblColRef col = this.cuboid.getColumns().get(i);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 7f8bbd3..736a2b9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -24,56 +24,33 @@ import java.util.List;
import java.util.Map;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.TblColRef;
-/**
- * @author George Song (ysong1)
- */
public class RowKeyEncoder extends AbstractRowKeyEncoder {
private int bytesLength;
protected int headerLength;
private RowKeyColumnIO colIO;
+ CubeSegment cubeSeg;
protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
super(cuboid);
+ this.cubeSeg = cubeSeg;
colIO = new RowKeyColumnIO(cubeSeg);
- bytesLength = headerLength = RowConstants.ROWKEY_CUBOIDID_LEN; // header
+ bytesLength = headerLength = RowConstants.ROWKEY_HEADER_LEN; // include shard and cuboidid
for (TblColRef column : cuboid.getColumns()) {
bytesLength += colIO.getColumnLength(column);
}
}
- public RowKeyColumnIO getColumnIO() {
- return colIO;
- }
-
- public int getColumnOffset(TblColRef col) {
- int offset = RowConstants.ROWKEY_CUBOIDID_LEN;
-
- for (TblColRef dimCol : cuboid.getColumns()) {
- if (col.equals(dimCol))
- return offset;
- offset += colIO.getColumnLength(dimCol);
- }
-
- throw new IllegalArgumentException("Column " + col + " not found on cuboid " + cuboid);
- }
-
public int getColumnLength(TblColRef col) {
return colIO.getColumnLength(col);
}
- public int getRowKeyLength() {
- return bytesLength;
- }
-
- public int getHeaderLength() {
- return headerLength;
- }
-
@Override
public byte[] encode(Map<TblColRef, String> valueMap) {
List<byte[]> valueList = new ArrayList<byte[]>();
@@ -95,7 +72,8 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
@Override
public byte[] encode(byte[][] values) {
byte[] bytes = new byte[this.bytesLength];
- int offset = fillHeader(bytes, values);
+ int bodyOffset = RowConstants.ROWKEY_HEADER_LEN;
+ int offset = bodyOffset;
for (int i = 0; i < cuboid.getColumns().size(); i++) {
TblColRef column = cuboid.getColumns().get(i);
@@ -107,18 +85,34 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
fillColumnValue(column, colLength, value, value.length, bytes, offset);
}
offset += colLength;
-
}
+
+ //fill shard and cuboid
+ fillHeader(bytes);
+
return bytes;
}
- protected int fillHeader(byte[] bytes, byte[][] values) {
+ protected int fillHeader(byte[] bytes) {
int offset = 0;
+
+ if (encodeShard) {
+ short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+ short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
+ short finalShard = ShardingHash.getShard(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
+ BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ } else {
+ BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ }
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
+
System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
if (this.headerLength != offset) {
throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
}
+
return offset;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
index 9a7970c..98f1eef 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
@@ -28,10 +28,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * @author George Song (ysong1)
- *
- */
public class RowKeySplitterTest extends LocalFileMetadataTestCase {
@Before
@@ -49,23 +45,23 @@ public class RowKeySplitterTest extends LocalFileMetadataTestCase {
public void testWithSlr() throws Exception {
CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITH_SLR_READY");
- RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20);
+ RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20);
// base cuboid rowkey
- byte[] input = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
- rowKeySplitter.split(input, input.length);
+ byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ rowKeySplitter.split(input);
- assertEquals(10, rowKeySplitter.getBufferSize());
+ assertEquals(11, rowKeySplitter.getBufferSize());
}
@Test
public void testWithoutSlr() throws Exception {
CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_READY");
- RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20);
+ RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20);
// base cuboid rowkey
- byte[] input = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
- rowKeySplitter.split(input, input.length);
+ byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
+ rowKeySplitter.split(input);
- assertEquals(9, rowKeySplitter.getBufferSize());
+ assertEquals(10, rowKeySplitter.getBufferSize());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
index 3704e03..d6b1718 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
@@ -34,10 +34,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * @author George Song (ysong1)
- *
- */
public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
@Before
@@ -57,7 +53,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
- byte[] key = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
+ byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
rowKeyDecoder.decode(key);
List<String> values = rowKeyDecoder.getValues();
@@ -70,7 +66,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
- byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
rowKeyDecoder.decode(key);
List<String> values = rowKeyDecoder.getValues();
@@ -97,7 +93,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
byte[] encodedKey = rowKeyEncoder.encode(data);
- assertEquals(30, encodedKey.length);
+ assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
rowKeyDecoder.decode(encodedKey);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
index c50b8c9..45c8108 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
@@ -35,10 +35,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * @author George Song (ysong1)
- *
- */
public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
@Before
@@ -74,9 +70,11 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
byte[] encodedKey = rowKeyEncoder.encode(data);
- assertEquals(30, encodedKey.length);
- byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
- byte[] rest = Arrays.copyOfRange(encodedKey, 8, encodedKey.length);
+ assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+ byte[] rest = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ assertEquals(0, Bytes.toShort(shard));
assertEquals(255, Bytes.toLong(cuboidId));
assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest);
}
@@ -104,10 +102,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
byte[] encodedKey = rowKeyEncoder.encode(data);
- assertEquals(48, encodedKey.length);
- byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26);
- byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
- byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length);
+ assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN);
+ byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+ byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ assertEquals(0, Bytes.toShort(shard));
assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
assertEquals(511, Bytes.toLong(cuboidId));
assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest);
@@ -136,10 +136,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
byte[] encodedKey = rowKeyEncoder.encode(data);
- assertEquals(48, encodedKey.length);
- byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26);
- byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
- byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length);
+ assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+ byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN);
+ byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ assertEquals(0, Bytes.toShort(shard));
assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
assertEquals(511, Bytes.toLong(cuboidId));
assertArrayEquals(new byte[] { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }, rest);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
index 47553ad..bdcd257 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
@@ -119,7 +119,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
}
AbstractRowKeyEncoder encoder = AbstractRowKeyEncoder.createInstance(cubeSeg, cuboid);
-
+ encoder.setEncodeShard(false);//will enumerate all possible shards when scanning
+
encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
this.startKey = encoder.encode(startValues);
@@ -133,7 +134,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
// restore encoder defaults for later reuse (note
// AbstractRowKeyEncoder.createInstance() caches instances)
encoder.setBlankByte(AbstractRowKeyEncoder.DEFAULT_BLANK_BYTE);
-
+
+ encoder.setEncodeShard(true);
// always fuzzy match cuboid ID to lock on the selected cuboid
this.fuzzyKeys = buildFuzzyKeys(fuzzyValues);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
index 3fb3f93..4839ab0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.common;
import java.io.IOException;
import java.util.Map;
-import org.apache.commons.collections4.map.DefaultedMap;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -35,9 +34,9 @@ import com.google.common.collect.Maps;
public class CuboidShardUtil {
protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class);
- public static Map<Long, Short> loadCuboidShards(CubeSegment segment) {
- return DefaultedMap.defaultedMap(segment.getCuboidShards(), (short) 1);
- }
+// public static Map<Long, Short> loadCuboidShards(CubeSegment segment) {
+// return DefaultedMap.defaultedMap(segment.getCuboidShards(), (short) 1);
+// }
public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards,int totalShards) throws IOException {
CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -53,7 +52,7 @@ public class CuboidShardUtil {
}
}
});
- segment.setCuboidShards(filered);
+ segment.setCuboidShardNums(filered);
segment.setTotalShards(totalShards);
CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 24043fc..58e820f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -7,7 +7,7 @@ import java.util.BitSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,14 +39,12 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
private long cuboidRowCount = 0;
//for shard
- private int totalShards = 0;
public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
this.mapContext = mapContext;
this.cubeDesc = cubeDesc;
this.cubeSegment = cubeSegment;
this.measureCount = cubeDesc.getMeasures().size();
- this.totalShards = cubeSegment.getTotalShards();
}
@Override
@@ -63,19 +61,19 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
}
cuboidRowCount++;
- int preamble = RowConstants.ROWKEY_CUBOIDID_LEN + RowConstants.ROWKEY_SHARDID_LEN;
+ int preamble = RowConstants.ROWKEY_HEADER_LEN;
int offSet = preamble;
for (int x = 0; x < dimensions; x++) {
System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
offSet += record.get(x).length();
}
- short shardForThisRec = 0;
- if (totalShards != 0) {
- shardForThisRec = ShardingHash.getShard(keyBuf, preamble, offSet, totalShards);
- }
- com.google.common.primitives.Bytes.
- System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
+ //fill shard
+ short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
+ short shardOffset = ShardingHash.getShard(keyBuf, preamble, offSet - preamble, cuboidShardNum);
+ Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
+ short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
+ BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
//output measures
valueBuf.clear();
@@ -96,8 +94,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
}
private void initVariables(Long cuboidId) {
- bytesLength = RowConstants.ROWKEY_SHARDID_LEN;
- bytesLength += RowConstants.ROWKEY_CUBOIDID_LEN;
+ bytesLength = RowConstants.ROWKEY_HEADER_LEN;
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
for (TblColRef column : cuboid.getColumns()) {
bytesLength += cubeSegment.getColumnLength(column);
@@ -110,6 +107,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
measureColumnsIndex[i] = dimensions + i;
}
- System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+ //write cuboid id first
+ BytesUtil.writeLong(cuboidId, keyBuf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 4598673..9b25c97 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -124,11 +124,16 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
Preconditions.checkState(key.offset() == 0);
- long cuboidID = rowKeySplitter.split(key.array(), key.length());
+ long cuboidID = rowKeySplitter.split(key.array());
+ short shard = rowKeySplitter.getLastSplittedShard();
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
int bufOffset = 0;
+
+ BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN);
+ bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
+
BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 45f0d32..6301f3d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -103,7 +103,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
}
-
+
private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
@@ -111,7 +111,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
String jobID = extractJobIDFromPath(filePath);
return findSegmentWithUuid(jobID, cube);
}
-
+
private static String extractJobIDFromPath(String path) {
Matcher matcher = JOB_NAME_PATTERN.matcher(path);
// check the first occurrence
@@ -134,11 +134,14 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
@Override
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
+ long cuboidID = rowKeySplitter.split(key.getBytes());
+ short shard = rowKeySplitter.getLastSplittedShard();
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
int bufOffset = 0;
+ BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN);
+ bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index c47d090..e7db1fb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -30,6 +32,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -38,6 +41,8 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* @author George Song (ysong1)
*
@@ -49,6 +54,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private Text outputKey = new Text();
private String cubeName;
private String segmentName;
+ private CubeSegment cubeSegment;
private CubeDesc cubeDesc;
private CuboidScheduler cuboidScheduler;
@@ -68,7 +74,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
cubeDesc = cube.getDescriptor();
// initialize CubiodScheduler
@@ -80,16 +86,21 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
int offset = 0;
+ //shard id will be filled after other contents
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
+
// cuboid id
System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
- offset += childCuboid.getBytes().length;
+ offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ int bodyOffset = offset;
// rowkey columns
long mask = Long.highestOneBit(parentCuboid.getId());
long parentCuboidId = parentCuboid.getId();
long childCuboidId = childCuboid.getId();
long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
- int index = 1; // skip cuboidId
+ int index = 2; // skip shard and cuboidId
for (int i = 0; i < parentCuboidIdActualLength; i++) {
if ((mask & parentCuboidId) > 0) {// if the this bit position equals
// 1
@@ -103,12 +114,18 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
mask = mask >> 1;
}
+ //fill shard
+ short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId);
+ short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - bodyOffset, cuboidShardNum);
+ short finalShard = ShardingHash.getShard(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards());
+ BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
return offset;
}
@Override
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
+ long cuboidId = rowKeySplitter.split(key.getBytes());
Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 09295b0..9b839c3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -41,7 +41,7 @@ public class CoprocessorProjector {
RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
@Override
- protected int fillHeader(byte[] bytes, byte[][] values) {
+ protected int fillHeader(byte[] bytes) {
Arrays.fill(bytes, 0, this.headerLength, (byte) 0xff);
return this.headerLength;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
index 4b7c4dc..7ec97c0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
@@ -131,7 +131,7 @@ public class CoprocessorRowType {
private void init() {
int[] offsets = new int[columns.length];
- int o = RowConstants.ROWKEY_CUBOIDID_LEN;
+ int o = RowConstants.ROWKEY_HEADER_LEN;
for (int i = 0; i < columns.length; i++) {
offsets[i] = o;
o += columnSizes[i];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 836f142..1e796ca 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -19,6 +19,7 @@
package org.apache.kylin.storage.hbase.cube.v1;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
@@ -34,11 +35,13 @@ import java.util.TreeSet;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -437,27 +440,30 @@ public class CubeStorageQuery implements ICachableStorageQuery {
}
//log
- sb.append(scanRanges.size() + "=>");
+ sb.append(scanRanges.size() + "=(mergeoverlap)>");
List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
//log
- sb.append(mergedRanges.size() + "=>");
+ sb.append(mergedRanges.size() + "=(mergetoomany)>");
mergedRanges = mergeTooManyRanges(mergedRanges);
//log
- sb.append(mergedRanges.size() + ", ");
+ sb.append(mergedRanges.size() + ",");
result.addAll(mergedRanges);
}
-
logger.info(sb.toString());
logger.info("hbasekeyrange count: " + result.size());
+
dropUnhitSegments(result);
logger.info("hbasekeyrange count after dropping unhit :" + result.size());
+ result = duplicateRangeByShard(result);
+ logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size());
+
return result;
}
@@ -667,6 +673,42 @@ public class CubeStorageQuery implements ICachableStorageQuery {
}
}
+ private List<HBaseKeyRange> duplicateRangeByShard(List<HBaseKeyRange> scans) {
+ List<HBaseKeyRange> ret = Lists.newArrayList();
+
+ for (HBaseKeyRange scan : scans) {
+ CubeSegment segment = scan.getCubeSegment();
+
+ byte[] startKey = scan.getStartKey();
+ byte[] stopKey = scan.getStopKey();
+
+ short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId());
+ short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId());
+ for (short i = 0; i < cuboidShardNum; ++i) {
+ byte[] newStartKey = duplicateKeyAndChangeShard(i, startKey);
+ byte[] newStopKey = duplicateKeyAndChangeShard(i, stopKey);
+ HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, //
+ scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate());
+ ret.add(newRange);
+ }
+ }
+
+ Collections.sort(ret, new Comparator<HBaseKeyRange>() {
+ @Override
+ public int compare(HBaseKeyRange o1, HBaseKeyRange o2) {
+ return Bytes.compareTo(o1.getStartKey(), o2.getStartKey());
+ }
+ });
+
+ return ret;
+ }
+
+ private byte[] duplicateKeyAndChangeShard(short newShard, byte[] bytes) {
+ byte[] ret = Arrays.copyOf(bytes, bytes.length);
+ BytesUtil.writeShort(newShard, ret, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ return ret;
+ }
+
private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 086acb0..45a5f96 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -355,7 +355,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
*/
private static long estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
- int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+ int bytesLength = RowConstants.ROWKEY_HEADER_LEN;
long mask = Long.highestOneBit(baseCuboidId);
long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);