You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/14 07:00:09 UTC
[2/2] incubator-kylin git commit: draft
draft
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/18e7999d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/18e7999d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/18e7999d
Branch: refs/heads/KYLIN-942
Commit: 18e7999dc1d256723cef7649fef8a36e2d6a84b2
Parents: 38f7d83
Author: honma <ho...@ebay.com>
Authored: Mon Oct 12 14:47:58 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Oct 14 12:48:54 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/ShardingHash.java | 2 +-
.../cube/common/FuzzyValueCombination.java | 130 +++++++++++++++++++
.../kylin/cube/gridtable/CubeCodeSystem.java | 12 +-
.../cube/gridtable/TrimmedCubeCodeSystem.java | 2 +-
.../apache/kylin/cube/kv/FuzzyKeyEncoder.java | 2 +-
.../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 8 +-
.../org/apache/kylin/cube/kv/RowConstants.java | 7 +-
.../org/apache/kylin/cube/kv/RowKeyEncoder.java | 2 +-
.../org/apache/kylin/gridtable/GTBuilder.java | 4 +-
.../java/org/apache/kylin/gridtable/GTInfo.java | 16 ++-
.../org/apache/kylin/gridtable/GTRecord.java | 34 +----
.../org/apache/kylin/gridtable/GTScanRange.java | 30 +----
.../kylin/gridtable/GTScanRangePlanner.java | 106 +++++++++++----
.../apache/kylin/gridtable/GTScanRequest.java | 9 +-
.../kylin/gridtable/DictGridTableTest.java | 4 +-
.../translate/FuzzyValueCombination.java | 30 ++---
dev-support/test_all.sh | 11 ++
.../mr/steps/MapContextGTRecordWriter.java | 2 +-
.../kylin/engine/mr/steps/NDCuboidMapper.java | 4 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 8 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 94 ++++++++++++--
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 10 +-
.../hbase/cube/v2/CubeSegmentScanner.java | 64 ++++++++-
.../kylin/storage/hbase/cube/v2/HBaseScan.java | 88 +++++++++++++
.../kylin/storage/hbase/cube/v2/RawScan.java | 20 +++
.../storage/hbase/steps/HBaseCuboidWriter.java | 2 +-
26 files changed, 542 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
index 97feda1..8d728c8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
@@ -50,7 +50,7 @@ public class ShardingHash {
return _getShard(hash, totalShards);
}
- public static short getShard(short cuboidShardBase, short shardOffset, int totalShards) {
+ public static short normalize(short cuboidShardBase, short shardOffset, int totalShards) {
if (totalShards <= 1) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
new file mode 100644
index 0000000..4ddb06a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class FuzzyValueCombination {
+
+ private static class Dim<K, V> {
+ K col;
+ Set<V> values;
+ }
+
+ private static final Set SINGLE_NULL_SET = Sets.newHashSet();
+
+ static {
+ SINGLE_NULL_SET.add(null);
+ }
+
+ public static <K, V> List<Map<K, V>> calculate(Map<K, Set<V>> fuzzyValues, long cap) {
+ Collections.emptyMap();
+ Dim<K, V>[] dims = toDims(fuzzyValues);
+ // If a query has many IN clause and each IN clause has many values, then it will easily generate
+ // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked
+ // on it. So simply choose to abandon all fuzzy keys in this case.
+ if (exceedCap(dims, cap)) {
+ return Lists.newArrayList();
+ } else {
+ return combination(dims);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K, V> List<Map<K, V>> combination(Dim<K, V>[] dims) {
+
+ List<Map<K, V>> result = Lists.newArrayList();
+
+ int emptyDims = 0;
+ for (Dim dim : dims) {
+ if (dim.values.isEmpty()) {
+ dim.values = SINGLE_NULL_SET;
+ emptyDims++;
+ }
+ }
+ if (emptyDims == dims.length) {
+ return result;
+ }
+
+ Map<K, V> r = Maps.newHashMap();
+ Iterator<V>[] iters = new Iterator[dims.length];
+ int level = 0;
+ while (true) {
+ Dim<K, V> dim = dims[level];
+ if (iters[level] == null) {
+ iters[level] = dim.values.iterator();
+ }
+
+ Iterator<V> it = iters[level];
+ if (it.hasNext() == false) {
+ if (level == 0)
+ break;
+ r.remove(dim.col);
+ iters[level] = null;
+ level--;
+ continue;
+ }
+
+ r.put(dim.col, it.next());
+ if (level == dims.length - 1) {
+ result.add(new HashMap<K, V>(r));
+ } else {
+ level++;
+ }
+ }
+ return result;
+ }
+
+ private static <K, V> Dim<K, V>[] toDims(Map<K, Set<V>> fuzzyValues) {
+ Dim[] dims = new Dim[fuzzyValues.size()];
+ int i = 0;
+ for (Entry<K, Set<V>> entry : fuzzyValues.entrySet()) {
+ dims[i] = new Dim<K, V>();
+ dims[i].col = entry.getKey();
+ dims[i].values = entry.getValue();
+ if (dims[i].values == null)
+ dims[i].values = Collections.emptySet();
+ i++;
+ }
+ return dims;
+ }
+
+ private static boolean exceedCap(Dim[] dims, long cap) {
+ return combCount(dims) > cap;
+ }
+
+ private static long combCount(Dim[] dims) {
+ long count = 1;
+ for (Dim dim : dims) {
+ count *= Math.max(dim.values.size(), 1);
+ }
+ return count;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index e52a6e1..99258e9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -18,17 +18,15 @@ import org.apache.kylin.gridtable.IGTComparator;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
import org.apache.kylin.metadata.measure.serializer.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Created by shaoshi on 3/23/15.
- * This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check
- * its data type to serialize/deserialize it;
+ * defines how column values will be encoded to/ decoded from GTRecord
+ *
+ * Cube meta can provide which columns are dictionary encoded (dict encoded dimensions) or fixed length encoded (fixed length dimensions)
+ * Metrics columns are more flexible, they will use DataTypeSerializer according to their data type.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class CubeCodeSystem implements IGTCodeSystem {
- private static final Logger logger = LoggerFactory.getLogger(CubeCodeSystem.class);
// ============================================================================
@@ -113,7 +111,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
if (serializer instanceof DictionarySerializer) {
((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf);
} else {
- if ((!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer)) && (value instanceof String)) {
+ if ((value instanceof String) && (!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer))) {
value = serializer.valueOf((String) value);
}
serializer.serialize(value, buf);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index e662a82..ea020f3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -78,7 +78,7 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem {
//TODO: remove this check
throw new IllegalStateException("Encode dictionary value in coprocessor");
} else {
- if ((!(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer)) && (value instanceof String)) {
+ if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) {
value = serializer.valueOf((String) value);
}
serializer.serialize(value, buf);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
index a17bb28..2185bc5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
@@ -37,7 +37,7 @@ public class FuzzyKeyEncoder extends RowKeyEncoder {
@Override
protected byte[] defaultValue(int length) {
byte[] keyBytes = new byte[length];
- Arrays.fill(keyBytes, RowConstants.FUZZY_MASK_ZERO);
+ Arrays.fill(keyBytes, RowConstants.BYTE_ZERO);
return keyBytes;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 254482c..bf67538 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -37,18 +37,18 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
@Override
protected int fillHeader(byte[] bytes) {
- Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.FUZZY_MASK_ONE);
+ Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
// always fuzzy match cuboid ID to lock on the selected cuboid
- Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.FUZZY_MASK_ZERO);
+ Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.BYTE_ZERO);
return this.headerLength;
}
@Override
protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
if (value == null) {
- Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ONE);
+ Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ONE);
} else {
- Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ZERO);
+ Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ZERO);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index c5adfb5..6a8eeb5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -33,10 +33,9 @@ public class RowConstants {
public static final int ROWKEY_SHARDID_LEN = 2;
public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
-
- // fuzzy mask
- public static final byte FUZZY_MASK_ZERO = 0;
- public static final byte FUZZY_MASK_ONE = 1;
+
+ public static final byte BYTE_ZERO = 0;
+ public static final byte BYTE_ONE = 1;
// row value delimiter
public static final byte ROWVALUE_DELIMITER_BYTE = 7;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index bc4a927..0676df6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -99,7 +99,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
if (encodeShard) {
short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
- short finalShard = ShardingHash.getShard(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
+ short finalShard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
} else {
BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
index 31ea9e2..5eefa54 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
@@ -19,9 +19,9 @@ public class GTBuilder implements Closeable {
this.info = info;
if (append) {
- storeWriter = store.append(shard);
+ storeWriter = store.append();
} else {
- storeWriter = store.rebuild(shard);
+ storeWriter = store.rebuild();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index d0559ad..c0ffcc1 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -53,7 +53,7 @@ public class GTInfo {
public ImmutableBitSet getPrimaryKey() {
return primaryKey;
}
-
+
public ImmutableBitSet getAllColumns() {
return colAll;
}
@@ -112,7 +112,7 @@ public class GTInfo {
public void validateColRef(TblColRef ref) {
TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex());
- if (expected.equals(ref) == false)
+ if (!expected.equals(ref))
throw new IllegalArgumentException();
}
@@ -155,11 +155,11 @@ public class GTInfo {
for (int i = 0; i < colBlocks.length; i++) {
merge = merge.or(colBlocks[i]);
}
- if (merge.equals(colAll) == false)
+ if (!merge.equals(colAll))
throw new IllegalStateException();
// primary key must be the first column block
- if (primaryKey.equals(colBlocks[0]) == false)
+ if (!primaryKey.equals(colBlocks[0]))
throw new IllegalStateException();
// drop empty column block
@@ -170,7 +170,7 @@ public class GTInfo {
if (cb.isEmpty())
it.remove();
}
- colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]);
+ colBlocks = list.toArray(new ImmutableBitSet[list.size()]);
}
public static class Builder {
@@ -243,8 +243,12 @@ public class GTInfo {
return KryoUtils.serialize(info);
}
}
-
+
public static GTInfo deserialize(byte[] bytes) {
return KryoUtils.deserialize(bytes, GTInfo.class);
}
+
+ public IGTCodeSystem getCodeSystem() {
+ return codeSystem;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index dbfdf57..0d02655 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -2,7 +2,6 @@ package org.apache.kylin.gridtable;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.BitSet;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -159,7 +158,7 @@ public class GTRecord implements Comparable<GTRecord> {
return false;
for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
int c = maskForEqualHashComp.trueBitAt(i);
- if (this.cols[c].equals(o.cols[c]) == false) {
+ if (!this.cols[c].equals(o.cols[c])) {
return false;
}
}
@@ -228,7 +227,10 @@ public class GTRecord implements Comparable<GTRecord> {
buf.setLength(pos);
}
- /** write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not*/
+ /**
+ * write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not.
+ * for saving space
+ */
public void exportAllColumns(ByteBuffer buf) {
for (int i = 0; i < info.colAll.trueBitCount(); i++) {
int c = info.colAll.trueBitAt(i);
@@ -300,30 +302,4 @@ public class GTRecord implements Comparable<GTRecord> {
}
}
- /** similar to export(primaryKey) but will stop at the first null value */
- public static ByteArray exportScanKey(GTRecord rec) {
- if (rec == null)
- return null;
-
- GTInfo info = rec.getInfo();
-
- BitSet selectedColumns = new BitSet();
- int len = 0;
- for (int i = 0; i < info.primaryKey.trueBitCount(); i++) {
- int c = info.primaryKey.trueBitAt(i);
- if (rec.cols[c].array() == null) {
- break;
- }
- selectedColumns.set(c);
- len += rec.cols[c].length();
- }
-
- if (selectedColumns.cardinality() == 0)
- return null;
-
- ByteArray buf = ByteArray.allocate(len);
- rec.exportColumns(new ImmutableBitSet(selectedColumns), buf);
- return buf;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
index 197fde4..eefe88e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
@@ -7,42 +7,26 @@ public class GTScanRange {
final public GTRecord pkStart; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
- final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys
+ final public List<GTRecord> fuzzyKeys; // partial matching primary keys
public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
this(pkStart, pkEnd, null);
}
- public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) {
+ public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys) {
GTInfo info = pkStart.info;
assert info == pkEnd.info;
- validateRangeKey(pkStart);
- validateRangeKey(pkEnd);
-
this.pkStart = pkStart;
this.pkEnd = pkEnd;
- this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord> emptyList() : hbaseFuzzyKeys;
- }
-
- private void validateRangeKey(GTRecord pk) {
- pk.maskForEqualHashComp(pk.info.primaryKey);
- boolean afterNull = false;
- for (int i = 0; i < pk.info.primaryKey.trueBitCount(); i++) {
- int c = pk.info.primaryKey.trueBitAt(i);
- if (afterNull) {
- pk.cols[c].set(null, 0, 0);
- } else {
- afterNull = pk.cols[c].array() == null;
- }
- }
+ this.fuzzyKeys = fuzzyKeys == null ? Collections.<GTRecord> emptyList() : fuzzyKeys;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode());
+ result = prime * result + ((fuzzyKeys == null) ? 0 : fuzzyKeys.hashCode());
result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode());
result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode());
return result;
@@ -57,10 +41,10 @@ public class GTScanRange {
if (getClass() != obj.getClass())
return false;
GTScanRange other = (GTScanRange) obj;
- if (hbaseFuzzyKeys == null) {
- if (other.hbaseFuzzyKeys != null)
+ if (fuzzyKeys == null) {
+ if (other.fuzzyKeys != null)
return false;
- } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys))
+ } else if (!fuzzyKeys.equals(other.fuzzyKeys))
return false;
if (pkEnd == null) {
if (other.pkEnd != null)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 58114d7..d01fe2c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -1,6 +1,7 @@
package org.apache.kylin.gridtable;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -11,32 +12,45 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.google.common.collect.Maps;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.common.FuzzyValueCombination;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class GTScanRangePlanner {
+ private static final Logger logger = LoggerFactory.getLogger(GTScanRangePlanner.class);
+
private static final int MAX_HBASE_FUZZY_KEYS = 100;
final private GTInfo info;
+ final private Pair<ByteArray, ByteArray> segmentStartAndEnd;
+ final private TblColRef partitionColRef;
+
final private ComparatorEx<ByteArray> byteUnknownIsSmaller;
final private ComparatorEx<ByteArray> byteUnknownIsBigger;
final private ComparatorEx<GTRecord> recordUnknownIsSmaller;
final private ComparatorEx<GTRecord> recordUnknownIsBigger;
- public GTScanRangePlanner(GTInfo info) {
+ public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> segmentStartAndEnd, TblColRef partitionColRef) {
this.info = info;
+ this.segmentStartAndEnd = segmentStartAndEnd;
+ this.partitionColRef = partitionColRef;
IGTComparator comp = info.codeSystem.getComparator();
+
this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(comp);
this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(comp);
this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(comp);
@@ -58,7 +72,8 @@ public class GTScanRangePlanner {
List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
GTScanRange scanRange = newScanRange(andDimRanges);
- scanRanges.add(scanRange);
+ if (scanRange != null)
+ scanRanges.add(scanRange);
}
List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
@@ -70,40 +85,75 @@ public class GTScanRangePlanner {
private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
GTRecord pkStart = new GTRecord(info);
GTRecord pkEnd = new GTRecord(info);
- Map<Integer,Set<ByteArray>> fuzzyValues = Maps.newHashMap();
-
- List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList();
+ Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
+
+ List<GTRecord> fuzzyKeys;
for (ColumnRange range : andDimRanges) {
+
+ if (partitionColRef != null && range.column.equals(partitionColRef)) {
+ boolean rangeEndGTESegStart = true;
+
+ if (segmentStartAndEnd.getFirst().array() != null && range.end.array() != null) {
+ rangeEndGTESegStart = info.getCodeSystem().getComparator().compare(range.end, segmentStartAndEnd.getFirst()) >= 0;
+ }
+
+ if (!rangeEndGTESegStart) {
+ return null;
+ }
+
+ boolean rangeStartLTESegEnd = true;
+ if (segmentStartAndEnd.getSecond().array() != null && range.begin.array() != null) {
+ rangeStartLTESegEnd = info.getCodeSystem().getComparator().compare(range.begin, segmentStartAndEnd.getSecond()) <= 0;
+ }
+
+ if (!rangeStartLTESegEnd) {
+ return null;
+ }
+ }
+
int col = range.column.getColumnDesc().getZeroBasedIndex();
- if (info.primaryKey.get(col) == false)
+ if (!info.primaryKey.get(col))
continue;
pkStart.set(col, range.begin);
pkEnd.set(col, range.end);
-// if (range.equals != null) {
-// ImmutableBitSet fuzzyMask = new ImmutableBitSet(col);
-// for (ByteArray v : range.equals) {
-// GTRecord fuzzy = new GTRecord(info);
-// fuzzy.set(col, v);
-// fuzzy.maskForEqualHashComp(fuzzyMask);
-// hbaseFuzzyKeys.add(fuzzy);
-// }
-// }
-
- if(range.valueSet != null)
- {
- for (ByteArray v : range.equals) {
-// GTRecord fuzzy = new GTRecord(info);
-// fuzzy.set(col, v);
-// fuzzy.maskForEqualHashComp(fuzzyMask);
-// hbaseFuzzyKeys.add(fuzzy);
-// }
+ if (range.valueSet != null && !range.valueSet.isEmpty()) {
+ fuzzyValues.put(col, range.valueSet);
}
}
- return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys);
+ fuzzyKeys = buildFuzzyKeys(fuzzyValues);
+
+ return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
+ }
+
+ private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) {
+ ArrayList<GTRecord> result = Lists.newArrayList();
+
+ if (fuzzyValueSet.isEmpty())
+ return result;
+
+ // debug/profiling purpose
+ if (BackdoorToggles.getDisableFuzzyKey()) {
+ logger.info("The execution of this query will not use fuzzy key");
+ return result;
+ }
+
+ List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS);
+
+ for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
+ GTRecord fuzzy = new GTRecord(info);
+ BitSet bitSet = new BitSet(info.getColumnCount());
+ for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
+ bitSet.set(entry.getKey());
+ fuzzy.set(entry.getKey(), entry.getValue());
+ }
+ fuzzy.maskForEqualHashComp(new ImmutableBitSet(bitSet));
+ result.add(fuzzy);
+ }
+ return result;
}
private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
@@ -252,8 +302,8 @@ public class GTScanRangePlanner {
boolean hasNonFuzzyRange = false;
for (GTScanRange range : ranges) {
- hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty();
- newFuzzyKeys.addAll(range.hbaseFuzzyKeys);
+ hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty();
+ newFuzzyKeys.addAll(range.fuzzyKeys);
end = recordUnknownIsBigger.max(end, range.pkEnd);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 2b31e70..c81dd63 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -2,6 +2,7 @@ package org.apache.kylin.gridtable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.Set;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -88,7 +89,7 @@ public class GTScanRequest {
}
private void validateFilterPushDown(GTInfo info) {
- if (hasFilterPushDown() == false)
+ if (!hasFilterPushDown())
return;
Set<TblColRef> filterColumns = Sets.newHashSet();
@@ -102,7 +103,7 @@ public class GTScanRequest {
}
// un-evaluatable filter must be removed
- if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) {
+ if (!TupleFilter.isEvaluableRecursively(filterPushDown)) {
Set<TblColRef> unevaluableColumns = Sets.newHashSet();
filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns);
@@ -147,6 +148,10 @@ public class GTScanRequest {
return range.pkEnd;
}
+ public List<GTRecord> getFuzzyKeys() {
+ return range.fuzzyKeys;
+ }
+
public ImmutableBitSet getSelectedColBlocks() {
return selectedColBlocks;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index 684f0ef..c991e66 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -70,7 +70,7 @@ public class DictGridTableTest {
private void verifyScanRangePlanner(GridTable table) {
GTInfo info = table.getInfo();
- GTScanRangePlanner planner = new GTScanRangePlanner(info);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null);
CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
@@ -87,7 +87,7 @@ public class DictGridTableTest {
List<GTScanRange> r = planner.planScanRanges(filter);
assertEquals(1, r.size());
assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
- assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
+ assertEquals("[[10], [20]]", r.get(0).fuzzyKeys.toString());
}
// pre-evaluate ever false
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
index fbc6d19..1e05eb8 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
@@ -32,23 +32,21 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-/**
- * @author yangli9
- *
- */
public class FuzzyValueCombination {
- private static class Dim {
+ private static class Dim<E> {
TblColRef col;
- Set<String> values;
+ Set<E> values;
}
- private static final Set<String> SINGLE_NULL_SET = Sets.newHashSet();
+ private static final Set SINGLE_NULL_SET = Sets.newHashSet();
+
static {
SINGLE_NULL_SET.add(null);
}
- public static List<Map<TblColRef, String>> calculate(Map<TblColRef, Set<String>> fuzzyValues, long cap) {
+ public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> fuzzyValues, long cap) {
+ Collections.emptyMap();
Dim[] dims = toDims(fuzzyValues);
// If a query has many IN clause and each IN clause has many values, then it will easily generate
// thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked
@@ -61,9 +59,9 @@ public class FuzzyValueCombination {
}
@SuppressWarnings("unchecked")
- private static List<Map<TblColRef, String>> combination(Dim[] dims) {
+ private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) {
- List<Map<TblColRef, String>> result = Lists.newArrayList();
+ List<Map<TblColRef, E>> result = Lists.newArrayList();
int emptyDims = 0;
for (Dim dim : dims) {
@@ -76,8 +74,8 @@ public class FuzzyValueCombination {
return result;
}
- Map<TblColRef, String> r = Maps.newHashMap();
- Iterator<String>[] iters = new Iterator[dims.length];
+ Map<TblColRef, E> r = Maps.newHashMap();
+ Iterator<E>[] iters = new Iterator[dims.length];
int level = 0;
while (true) {
Dim dim = dims[level];
@@ -85,7 +83,7 @@ public class FuzzyValueCombination {
iters[level] = dim.values.iterator();
}
- Iterator<String> it = iters[level];
+ Iterator<E> it = iters[level];
if (it.hasNext() == false) {
if (level == 0)
break;
@@ -97,7 +95,7 @@ public class FuzzyValueCombination {
r.put(dim.col, it.next());
if (level == dims.length - 1) {
- result.add(new HashMap<TblColRef, String>(r));
+ result.add(new HashMap<TblColRef, E>(r));
} else {
level++;
}
@@ -105,10 +103,10 @@ public class FuzzyValueCombination {
return result;
}
- private static Dim[] toDims(Map<TblColRef, Set<String>> fuzzyValues) {
+ private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) {
Dim[] dims = new Dim[fuzzyValues.size()];
int i = 0;
- for (Entry<TblColRef, Set<String>> entry : fuzzyValues.entrySet()) {
+ for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) {
dims[i] = new Dim();
dims[i].col = entry.getKey();
dims[i].values = entry.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/dev-support/test_all.sh
----------------------------------------------------------------------
diff --git a/dev-support/test_all.sh b/dev-support/test_all.sh
new file mode 100644
index 0000000..6a7b887
--- /dev/null
+++ b/dev-support/test_all.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+dir=$(dirname ${0})
+cd ${dir}
+cd ..
+
+mvn clean install -DskipTests | tee mci.log
+mvn test -Dtest=org.apache.kylin.job.BuildCubeWithEngineTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithEngineTest.log
+mvn test -Dtest=org.apache.kylin.job.BuildIIWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildIIWithStreamTest.log
+mvn test -Dtest=org.apache.kylin.job.BuildCubeWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithStreamTest.log
+mvn verify -fae -P sandbox | tee mvnverify.log
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 7510c40..86e2f07 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -72,7 +72,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum);
short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
- short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
+ short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
//output measures
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index e7db1fb..2180dd6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -41,8 +41,6 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
* @author George Song (ysong1)
*
@@ -117,7 +115,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
//fill shard
short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId);
short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - bodyOffset, cuboidShardNum);
- short finalShard = ShardingHash.getShard(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards());
+ short finalShard = ShardingHash.normalize(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards());
BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
return offset;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index e0efb10..2e58644 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.util.KryoUtils;
@@ -132,9 +131,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
// globally shared connection, does not require close
HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
- final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
- List<RawScan> rawScans = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns);
+ List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
final ByteString scanRequestBytesString = ByteString.copyFrom(scanRequestBytes);
@@ -142,6 +140,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size());
final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList());
+ for (RawScan rawScan : rawScans) {
+ logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+ }
+
for (final RawScan rawScan : rawScans) {
executorService.submit(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 0a9c1d4..cc7ec4f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -2,6 +2,7 @@ package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
@@ -28,6 +29,7 @@ import org.apache.kylin.gridtable.IGTScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public abstract class CubeHBaseRPC {
@@ -70,38 +72,77 @@ public abstract class CubeHBaseRPC {
return scan;
}
- protected List<RawScan> prepareRawScan(GTRecord pkStart, GTRecord pkEnd, List<Pair<byte[], byte[]>> selectedColumns) {
+ protected List<RawScan> preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+ final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
List<RawScan> ret = Lists.newArrayList();
- byte[] start = makeRowKeyToScan(pkStart, (byte) 0x00);
- byte[] end = makeRowKeyToScan(pkEnd, (byte) 0xff);
-
- //TODO fuzzy match
+ byte[] start = makeRowKeyToScan(pkStart, RowConstants.ROWKEY_LOWER_BYTE);
+ byte[] end = makeRowKeyToScan(pkEnd, RowConstants.ROWKEY_UPPER_BYTE);
+ List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys);
short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
for (short i = 0; i < cuboidShardNum; ++i) {
- short shard = ShardingHash.getShard(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
+ short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
byte[] shardStart = Arrays.copyOf(start, start.length);
byte[] shardEnd = Arrays.copyOf(end, end.length);
BytesUtil.writeShort(shard, shardStart, 0, RowConstants.ROWKEY_SHARDID_LEN);
BytesUtil.writeShort(shard, shardEnd, 0, RowConstants.ROWKEY_SHARDID_LEN);
- ret.add(new RawScan(shardStart, shardEnd, selectedColumns, null));
+ ret.add(new RawScan(shardStart, shardEnd, selectedColumns, hbaseFuzzyKeys));
}
return ret;
}
+ /**
+ * translate GTRecord format fuzzy keys to hbase expected format
+ * @return
+ */
+ private List<Pair<byte[], byte[]>> translateFuzzyKeys(List<GTRecord> fuzzyKeys) {
+ if (fuzzyKeys == null || fuzzyKeys.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<Pair<byte[], byte[]>> ret = Lists.newArrayList();
+ int coreLength = fullGTInfo.getMaxColumnLength(fullGTInfo.getPrimaryKey());
+ for (GTRecord gtRecordFuzzyKey : fuzzyKeys) {
+ byte[] hbaseFuzzyKey = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN];
+ byte[] hbaseFuzzyMask = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN];
+
+ int pos = 0;
+ //shard part
+ Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);//shard part should better be FIXED, for simplicity we make it non-fixed
+ pos += RowConstants.ROWKEY_SHARDID_LEN;
+
+ //cuboid part
+ Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_CUBOIDID_LEN, RowConstants.BYTE_ZERO);
+ System.arraycopy(cuboid.getBytes(), 0, hbaseFuzzyKey, pos, RowConstants.ROWKEY_CUBOIDID_LEN);
+ pos += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ //row key core part
+ ByteArray coreKey = HBaseScan.exportScanKey(gtRecordFuzzyKey, RowConstants.BYTE_ZERO);
+ System.arraycopy(coreKey.array(), coreKey.offset(), hbaseFuzzyKey, pos, coreKey.length());
+ ByteArray coreMask = HBaseScan.exportScanMask(gtRecordFuzzyKey);
+ System.arraycopy(coreMask.array(), coreMask.offset(), hbaseFuzzyMask, pos, coreMask.length());
+
+ Preconditions.checkState(coreKey.length() == coreMask.length(), "corekey length not equal coremask length");
+ pos += coreKey.length();
+ Preconditions.checkState(hbaseFuzzyKey.length == pos, "HBase fuzzy key not completely populated");
+
+ ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask));
+ }
+
+ return ret;
+ }
+
private byte[] makeRowKeyToScan(GTRecord pkRec, byte fill) {
- ByteArray pk = GTRecord.exportScanKey(pkRec);
- int pkMaxLen = pkRec.getInfo().getMaxColumnLength(pkRec.getInfo().getPrimaryKey());
+ ByteArray pk = HBaseScan.exportScanKey(pkRec, fill);
- byte[] buf = new byte[pkMaxLen + RowConstants.ROWKEY_HEADER_LEN];
+ byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_HEADER_LEN];
Arrays.fill(buf, fill);
- //for scanning/reading, later all possbile shard will be applied
- //BytesUtil.writeShort((short) 0, buf, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ //for scanning/reading, later all possible shard will be applied
System.arraycopy(cuboid.getBytes(), 0, buf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
if (pk != null && pk.array() != null) {
@@ -175,4 +216,33 @@ public abstract class CubeHBaseRPC {
return result;
}
+ protected void logScan(RawScan rawScan, String tableName) {
+ StringBuilder info = new StringBuilder();
+ info.append("\nVisiting hbase table ").append(tableName).append(": ");
+ if (cuboid.requirePostAggregation()) {
+ info.append("cuboid require post aggregation, from ");
+ } else {
+ info.append("cuboid exact match, from ");
+ }
+ info.append(cuboid.getInputID());
+ info.append(" to ");
+ info.append(cuboid.getId());
+ info.append("\nStart: ");
+ info.append(rawScan.getStartKeyAsString());
+ info.append(" - ");
+ info.append(Bytes.toStringBinary(rawScan.startKey));
+ info.append("\nStop: ");
+ info.append(rawScan.getEndKeyAsString());
+ info.append(" - ");
+ info.append(Bytes.toStringBinary(rawScan.endKey));
+ if (rawScan.fuzzyKey != null) {
+ info.append("\nFuzzy key counts: " + rawScan.fuzzyKey.size());
+ info.append("\nFuzzy: ");
+ info.append(rawScan.getFuzzyKeyAsString());
+ } else {
+ info.append("\nNo Fuzzy Key");
+ }
+ logger.info(info.toString());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index a2ba39f..a31bcdf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -11,7 +11,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
@@ -70,19 +69,18 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
// primary key (also the 0th column block) is always selected
final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
-
// globally shared connection, does not require close
HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-
final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
- final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
- List<RawScan> rawScans = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns);
+ List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
final List<ResultScanner> scanners = Lists.newArrayList();
final List<Iterator<Result>> resultIterators = Lists.newArrayList();
for (RawScan rawScan : rawScans) {
+
+ logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
Scan hbaseScan = buildScan(rawScan);
final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
@@ -119,7 +117,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
};
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, hbaseColumns);
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns);
IGTScanner rawScanner = store.scan(scanRequest);
final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 756f8d6..d49de56 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -1,6 +1,7 @@
package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
@@ -10,7 +11,10 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.gridtable.CubeGridTable;
@@ -57,8 +61,18 @@ public class CubeSegmentScanner implements IGTScanner {
ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
- //TODO: should remove this in endpoint scenario
- GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(info);
+ GTScanRangePlanner scanRangePlanner;
+ if (cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) {
+ TblColRef tblColRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+ Pair<ByteArray, ByteArray> segmentStartAndEnd = null;
+ int index = mapping.getIndexOf(tblColRef);
+ if (index >= 0) {
+ segmentStartAndEnd = getSegmentStartAndEnd(tblColRef, index);
+ }
+ scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, tblColRef);
+ } else {
+ scanRangePlanner = new GTScanRangePlanner(info, null, null);
+ }
List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
@@ -73,6 +87,43 @@ public class CubeSegmentScanner implements IGTScanner {
scanner = new Scanner();
}
+ private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(TblColRef tblColRef, int index) {
+
+ String partitionColType = tblColRef.getColumnDesc().getDatatype();
+
+ ByteArray start;
+ if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) {
+ start = translateTsToString(cubeSeg.getDateRangeStart(), partitionColType, index);
+ } else {
+ start = new ByteArray();
+ }
+
+ ByteArray end;
+ if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) {
+ end = translateTsToString(cubeSeg.getDateRangeEnd(), partitionColType, index);
+ } else {
+ end = new ByteArray();
+ }
+ return Pair.newPair(start, end);
+
+ }
+
+ private ByteArray translateTsToString(long ts, String partitionColType, int index) {
+ String value;
+ if ("date".equalsIgnoreCase(partitionColType)) {
+ value = DateFormat.formatToDateStr(ts);
+ } else if ("timestamp".equalsIgnoreCase(partitionColType)) {
+ value = DateFormat.formatToTimeWithoutMilliStr(ts);
+ } else {
+ throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength());
+ info.getCodeSystem().encodeColumnValue(index, value, buffer);
+
+ return ByteArray.copyOf(buffer.array(), 0, buffer.position());
+ }
+
private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
Set<TblColRef> ret = Sets.newHashSet();
for (TblColRef col : input) {
@@ -150,8 +201,6 @@ public class CubeSegmentScanner implements IGTScanner {
return scanner.getScannedRowCount();
}
-
-
private class Scanner {
final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
int cur = 0;
@@ -171,10 +220,13 @@ public class CubeSegmentScanner implements IGTScanner {
return false;
try {
- CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);
+
+ //CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);
+ CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info);
+
//change previous line to CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info);
//to debug locally
-
+
inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur));
curIterator = inputScanners[cur].iterator();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
new file mode 100644
index 0000000..7667830
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import com.google.common.base.Preconditions;
+
+public class HBaseScan {
+
+ /**
+ * every column in scan key is fixed length. for empty values, 0 zero will be populated
+ */
+ public static ByteArray exportScanKey(GTRecord rec, byte fill) {
+
+ Preconditions.checkNotNull(rec);
+
+ GTInfo info = rec.getInfo();
+ int len = info.getMaxColumnLength(info.getPrimaryKey());
+ ByteArray buf = ByteArray.allocate(len);
+ int pos = 0;
+ for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
+ int c = info.getPrimaryKey().trueBitAt(i);
+ int colLength = info.getCodeSystem().maxCodeLength(c);
+
+ if (rec.get(c).array() != null) {
+ Preconditions.checkArgument(colLength == rec.get(c).length(), "ColLength :" + colLength + " not equals cols[c] length: " + rec.get(c).length() + " c is " + c);
+ System.arraycopy(rec.get(c).array(), rec.get(c).offset(), buf.array(), buf.offset() + pos, rec.get(c).length());
+ } else {
+ Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill);
+ }
+ pos += colLength;
+ }
+ buf.setLength(pos);
+
+ return buf;
+ }
+
+ /**
+ * every column in scan key is fixed length. for fixed columns, 0 will be populated, for non-fixed columns, 1 will be populated
+ */
+ public static ByteArray exportScanMask(GTRecord rec) {
+ Preconditions.checkNotNull(rec);
+
+ GTInfo info = rec.getInfo();
+ int len = info.getMaxColumnLength(info.getPrimaryKey());
+ ByteArray buf = ByteArray.allocate(len);
+ byte fill;
+
+ int pos = 0;
+ for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
+ int c = info.getPrimaryKey().trueBitAt(i);
+ int colLength = info.getCodeSystem().maxCodeLength(c);
+
+ if (rec.get(c).array() != null) {
+ fill = RowConstants.BYTE_ZERO;
+ } else {
+ fill = RowConstants.BYTE_ONE;
+ }
+ Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill);
+ pos += colLength;
+ }
+ buf.setLength(pos);
+
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
index aa73927..0184908 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.cube.v2;
import java.util.List;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Pair;
public class RawScan {
@@ -37,4 +38,23 @@ public class RawScan {
this.fuzzyKey = fuzzyKey;
}
+ public String getStartKeyAsString() {
+ return BytesUtil.toHex(this.startKey);
+ }
+
+ public String getEndKeyAsString() {
+ return BytesUtil.toHex(this.endKey);
+ }
+
+ public String getFuzzyKeyAsString() {
+ StringBuilder buf = new StringBuilder();
+ for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKey) {
+ buf.append(BytesUtil.toHex(fuzzyKey.getFirst()));
+ buf.append(" ");
+ buf.append(BytesUtil.toHex(fuzzyKey.getSecond()));
+ buf.append(System.lineSeparator());
+ }
+ return buf.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index bc0abc0..8f77f87 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -112,7 +112,7 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
short shardOffset = ShardingHash.getShard(byteBuffer.array(), //
RowConstants.ROWKEY_HEADER_LEN, byteBuffer.position() - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
- short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
+ short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
BytesUtil.writeShort(finalShard, byteBuffer.array(), 0, RowConstants.ROWKEY_SHARDID_LEN);
return byteBuffer;