You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/25 14:12:18 UTC
[5/7] incubator-kylin git commit: KYLIN-942 support parallel scan for
grid table
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 2b31e70..c81dd63 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -2,6 +2,7 @@ package org.apache.kylin.gridtable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.Set;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -88,7 +89,7 @@ public class GTScanRequest {
}
private void validateFilterPushDown(GTInfo info) {
- if (hasFilterPushDown() == false)
+ if (!hasFilterPushDown())
return;
Set<TblColRef> filterColumns = Sets.newHashSet();
@@ -102,7 +103,7 @@ public class GTScanRequest {
}
// un-evaluatable filter must be removed
- if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) {
+ if (!TupleFilter.isEvaluableRecursively(filterPushDown)) {
Set<TblColRef> unevaluableColumns = Sets.newHashSet();
filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns);
@@ -147,6 +148,10 @@ public class GTScanRequest {
return range.pkEnd;
}
+ public List<GTRecord> getFuzzyKeys() {
+ return range.fuzzyKeys;
+ }
+
public ImmutableBitSet getSelectedColBlocks() {
return selectedColBlocks;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
index bbd82c8..de9a5ce 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -33,10 +33,6 @@ public class GTUtil {
return convertFilter(rootFilter, info, null, false, unevaluatableColumnCollector);
}
- public static TupleFilter convertFilterConstants(TupleFilter rootFilter, GTInfo info) {
- return convertFilter(rootFilter, info, null, true, null);
- }
-
public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
List<TblColRef> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
return convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
index f812b8f..8f81654 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
@@ -14,17 +14,14 @@ public class GridTable implements Closeable {
}
public GTBuilder rebuild() throws IOException {
- assert info.isShardingEnabled() == false;
return rebuild(-1);
}
public GTBuilder rebuild(int shard) throws IOException {
- assert shard < info.nShards;
return new GTBuilder(info, shard, store);
}
public GTBuilder append() throws IOException {
- assert info.isShardingEnabled() == false;
return append(-1);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
index 5282544..f4c44f8 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
@@ -6,9 +6,9 @@ public interface IGTStore {
GTInfo getInfo();
- IGTWriter rebuild(int shard) throws IOException;
+ IGTWriter rebuild() throws IOException;
- IGTWriter append(int shard) throws IOException;
+ IGTWriter append() throws IOException;
IGTScanner scan(GTScanRequest scanRequest) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
index d7074e4..9675aa1 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
@@ -22,9 +22,6 @@ public class GTSimpleMemStore implements IGTStore {
public GTSimpleMemStore(GTInfo info) {
this.info = info;
this.rowList = new ArrayList<byte[]>();
-
- if (info.isShardingEnabled())
- throw new UnsupportedOperationException();
}
@Override
@@ -41,13 +38,13 @@ public class GTSimpleMemStore implements IGTStore {
}
@Override
- public IGTWriter rebuild(int shard) {
+ public IGTWriter rebuild() {
rowList.clear();
return new Writer();
}
@Override
- public IGTWriter append(int shard) {
+ public IGTWriter append() {
return new Writer();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
index 9a7970c..98f1eef 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
@@ -28,10 +28,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * @author George Song (ysong1)
- *
- */
public class RowKeySplitterTest extends LocalFileMetadataTestCase {
@Before
@@ -49,23 +45,23 @@ public class RowKeySplitterTest extends LocalFileMetadataTestCase {
public void testWithSlr() throws Exception {
CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITH_SLR_READY");
- RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20);
+ RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20);
// base cuboid rowkey
- byte[] input = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
- rowKeySplitter.split(input, input.length);
+ byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ rowKeySplitter.split(input);
- assertEquals(10, rowKeySplitter.getBufferSize());
+ assertEquals(11, rowKeySplitter.getBufferSize());
}
@Test
public void testWithoutSlr() throws Exception {
CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_READY");
- RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20);
+ RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20);
// base cuboid rowkey
- byte[] input = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
- rowKeySplitter.split(input, input.length);
+ byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
+ rowKeySplitter.split(input);
- assertEquals(9, rowKeySplitter.getBufferSize());
+ assertEquals(10, rowKeySplitter.getBufferSize());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
index 3704e03..d6b1718 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
@@ -34,10 +34,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * @author George Song (ysong1)
- *
- */
public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
@Before
@@ -57,7 +53,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
- byte[] key = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
+ byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
rowKeyDecoder.decode(key);
List<String> values = rowKeyDecoder.getValues();
@@ -70,7 +66,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
- byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
rowKeyDecoder.decode(key);
List<String> values = rowKeyDecoder.getValues();
@@ -97,7 +93,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
byte[] encodedKey = rowKeyEncoder.encode(data);
- assertEquals(30, encodedKey.length);
+ assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
rowKeyDecoder.decode(encodedKey);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
index c50b8c9..45c8108 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
@@ -35,10 +35,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * @author George Song (ysong1)
- *
- */
public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
@Before
@@ -74,9 +70,11 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
byte[] encodedKey = rowKeyEncoder.encode(data);
- assertEquals(30, encodedKey.length);
- byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
- byte[] rest = Arrays.copyOfRange(encodedKey, 8, encodedKey.length);
+ assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+ byte[] rest = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ assertEquals(0, Bytes.toShort(shard));
assertEquals(255, Bytes.toLong(cuboidId));
assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest);
}
@@ -104,10 +102,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
byte[] encodedKey = rowKeyEncoder.encode(data);
- assertEquals(48, encodedKey.length);
- byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26);
- byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
- byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length);
+ assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN);
+ byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+ byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ assertEquals(0, Bytes.toShort(shard));
assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
assertEquals(511, Bytes.toLong(cuboidId));
assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest);
@@ -136,10 +136,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
byte[] encodedKey = rowKeyEncoder.encode(data);
- assertEquals(48, encodedKey.length);
- byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26);
- byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
- byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length);
+ assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+ byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN);
+ byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+ assertEquals(0, Bytes.toShort(shard));
assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
assertEquals(511, Bytes.toLong(cuboidId));
assertArrayEquals(new byte[] { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }, rest);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index 684f0ef..91e7e18 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.gridtable.CubeCodeSystem;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.NumberDictionaryBuilder;
@@ -48,6 +49,7 @@ import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -55,39 +57,120 @@ import com.google.common.collect.Maps;
public class DictGridTableTest {
+ private GridTable table;
+ private GTInfo info;
+ private CompareTupleFilter timeComp0;
+ private CompareTupleFilter timeComp1;
+ private CompareTupleFilter timeComp2;
+ private CompareTupleFilter timeComp3;
+ private CompareTupleFilter timeComp4;
+ private CompareTupleFilter timeComp5;
+ private CompareTupleFilter timeComp6;
+ private CompareTupleFilter ageComp1;
+ private CompareTupleFilter ageComp2;
+ private CompareTupleFilter ageComp3;
+ private CompareTupleFilter ageComp4;
+
+ @Before
+ public void setup() throws IOException {
+ table = newTestTable();
+ info = table.getInfo();
+
+ timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
+ timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+ timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+ timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+ timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
+ timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
+ ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+ ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+ ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+ ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+ }
+
+ @Test
+ public void verifySegmentSkipping() {
+
+ ByteArray segmentStart = enc(info, 0, "2015-01-14");
+ ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
+ ByteArray segmentEnd = enc(info, 0, "2015-01-15");
+ assertEquals(segmentStart, segmentStartX);
+
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0));
+
+ {
+ LogicalTupleFilter filter = and(timeComp0, ageComp1);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(1, r.size());//scan range are [close,close]
+ assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+ assertEquals(1, r.get(0).fuzzyKeys.size());
+ assertEquals("[[10]]", r.get(0).fuzzyKeys.toString());
+ }
+ {
+ LogicalTupleFilter filter = and(timeComp2, ageComp1);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(0, r.size());
+ }
+ {
+ LogicalTupleFilter filter = and(timeComp4, ageComp1);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(0, r.size());
+ }
+ {
+ LogicalTupleFilter filter = and(timeComp5, ageComp1);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(0, r.size());
+ }
+ {
+ LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1));
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(1, r.size());
+ assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
+ assertEquals("[[10], [1421193600000, 10]]", r.get(0).fuzzyKeys.toString());
+ }
+ {
+ LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(1, r.size());
+ assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
+ assertEquals(0, r.get(0).fuzzyKeys.size());
+ }
+ }
+
@Test
- public void test() throws IOException {
- GridTable table = newTestTable();
- verifyScanRangePlanner(table);
- verifyFirstRow(table);
- verifyScanWithUnevaluatableFilter(table);
- verifyScanWithEvaluatableFilter(table);
- verifyConvertFilterConstants1(table);
- verifyConvertFilterConstants2(table);
- verifyConvertFilterConstants3(table);
- verifyConvertFilterConstants4(table);
+ public void verifySegmentSkipping2() {
+ ByteArray segmentEnd = enc(info, 0, "2015-01-15");
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0));
+
+ {
+ LogicalTupleFilter filter = and(timeComp0, ageComp1);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(1, r.size());//scan range are [close,close]
+ assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+ assertEquals(1, r.get(0).fuzzyKeys.size());
+ assertEquals("[[10]]", r.get(0).fuzzyKeys.toString());
+ }
+
+ {
+ LogicalTupleFilter filter = and(timeComp5, ageComp1);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(0, r.size());//scan range are [close,close]
+ }
}
- private void verifyScanRangePlanner(GridTable table) {
- GTInfo info = table.getInfo();
- GTScanRangePlanner planner = new GTScanRangePlanner(info);
+ @Test
+ public void verifyScanRangePlanner() {
- CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
- CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
- CompareTupleFilter timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
- CompareTupleFilter timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
- CompareTupleFilter ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
- CompareTupleFilter ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
- CompareTupleFilter ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
- CompareTupleFilter ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null);
// flatten or-and & hbase fuzzy value
{
LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
List<GTScanRange> r = planner.planScanRanges(filter);
assertEquals(1, r.size());
- assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
- assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
+ assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
+ assertEquals("[[10], [20]]", r.get(0).fuzzyKeys.toString());
}
// pre-evaluate ever false
@@ -124,11 +207,13 @@ public class DictGridTableTest {
}
}
- private void verifyFirstRow(GridTable table) throws IOException {
+ @Test
+ public void verifyFirstRow() throws IOException {
doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]");
}
- private void verifyScanWithUnevaluatableFilter(GridTable table) throws IOException {
+ @Test
+ public void verifyScanWithUnevaluatableFilter() throws IOException {
GTInfo info = table.getInfo();
CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
@@ -144,7 +229,8 @@ public class DictGridTableTest {
doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]");
}
- private void verifyScanWithEvaluatableFilter(GridTable table) throws IOException {
+ @Test
+ public void verifyScanWithEvaluatableFilter() throws IOException {
GTInfo info = table.getInfo();
CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
@@ -159,7 +245,8 @@ public class DictGridTableTest {
doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
}
- private void verifyConvertFilterConstants1(GridTable table) {
+ @Test
+ public void verifyConvertFilterConstants1() {
GTInfo info = table.getInfo();
TableDesc extTable = TableDesc.mockup("ext");
@@ -178,7 +265,8 @@ public class DictGridTableTest {
assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString());
}
- private void verifyConvertFilterConstants2(GridTable table) {
+ @Test
+ public void verifyConvertFilterConstants2() {
GTInfo info = table.getInfo();
TableDesc extTable = TableDesc.mockup("ext");
@@ -198,7 +286,8 @@ public class DictGridTableTest {
assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString());
}
- private void verifyConvertFilterConstants3(GridTable table) {
+ @Test
+ public void verifyConvertFilterConstants3() {
GTInfo info = table.getInfo();
TableDesc extTable = TableDesc.mockup("ext");
@@ -218,7 +307,8 @@ public class DictGridTableTest {
assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString());
}
- private void verifyConvertFilterConstants4(GridTable table) {
+ @Test
+ public void verifyConvertFilterConstants4() {
GTInfo info = table.getInfo();
TableDesc extTable = TableDesc.mockup("ext");
@@ -252,7 +342,7 @@ public class DictGridTableTest {
scanner.close();
}
- private Object enc(GTInfo info, int col, String value) {
+ private ByteArray enc(GTInfo info, int col, String value) {
ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
info.codeSystem.encodeColumnValue(col, value, buf);
return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 8b8fb87..919ede6 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -18,6 +18,9 @@
package org.apache.kylin.engine;
+import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V1;
+import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V2;
+
import java.util.HashMap;
import java.util.Map;
@@ -25,30 +28,29 @@ import org.apache.kylin.common.util.ImplementationSwitch;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IEngineAware;
-import static org.apache.kylin.metadata.model.IEngineAware.*;
public class EngineFactory {
-
- private static ImplementationSwitch batchEngines;
- private static ImplementationSwitch streamingEngines;
+
+ private static ImplementationSwitch<IBatchCubingEngine> batchEngines;
+ private static ImplementationSwitch<IStreamingCubingEngine> streamingEngines;
static {
Map<Integer, String> impls = new HashMap<>();
impls.put(ID_MR_V1, "org.apache.kylin.engine.mr.MRBatchCubingEngine");
impls.put(ID_MR_V2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
- batchEngines = new ImplementationSwitch(impls);
-
+ batchEngines = new ImplementationSwitch<IBatchCubingEngine>(impls, IBatchCubingEngine.class);
+
impls.clear();
- streamingEngines = new ImplementationSwitch(impls); // TODO
+ streamingEngines = new ImplementationSwitch<IStreamingCubingEngine>(impls, IStreamingCubingEngine.class); // TODO
}
-
+
public static IBatchCubingEngine batchEngine(IEngineAware aware) {
- return batchEngines.get(aware.getEngineType(), IBatchCubingEngine.class);
+ return batchEngines.get(aware.getEngineType());
}
-
+
public static IStreamingCubingEngine streamingEngine(IEngineAware aware) {
- return streamingEngines.get(aware.getEngineType(), IStreamingCubingEngine.class);
+ return streamingEngines.get(aware.getEngineType());
}
-
+
/** Build a new cube segment, typically its time range appends to the end of current cube. */
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);
@@ -58,7 +60,7 @@ public class EngineFactory {
public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
}
-
+
public static Runnable createStreamingCubingBuilder(CubeSegment seg) {
return streamingEngine(seg).createStreamingCubingBuilder(seg);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index c5bd3e0..e456ac1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -115,6 +115,17 @@ public abstract class TupleFilter {
throw new UnsupportedOperationException();
}
+ /**
+ * flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
+ * flatten filter will ONLY contain AND and OR , no NOT will exist.
+ * This will help to decide scan ranges.
+ *
+ * Notice that the flatten filter will ONLY be used for determining scan ranges,
+ * The filter that is later pushed down into storage level is still the ORIGINAL
+ * filter, since the flattened filter will be too "fat" to evaluate
+ *
+ * @return
+ */
public TupleFilter flatFilter() {
return flattenInternal(this);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
index a9d785b..7404136 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
@@ -33,7 +33,7 @@ import org.apache.kylin.common.util.BytesUtil;
*/
public class TupleFilterSerializer {
- public static interface Decorator {
+ public interface Decorator {
TupleFilter onSerialize(TupleFilter filter);
}
@@ -69,20 +69,20 @@ public class TupleFilterSerializer {
if (filter.hasChildren()) {
// serialize filter+true
- serializeFilter(1, filter, decorator, buffer, cs);
+ serializeFilter(1, filter, buffer, cs);
// serialize children
for (TupleFilter child : filter.getChildren()) {
internalSerialize(child, decorator, buffer, cs);
}
// serialize none
- serializeFilter(-1, filter, decorator, buffer, cs);
+ serializeFilter(-1, filter, buffer, cs);
} else {
// serialize filter+false
- serializeFilter(0, filter, decorator, buffer, cs);
+ serializeFilter(0, filter, buffer, cs);
}
}
- private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
+ private static void serializeFilter(int flag, TupleFilter filter, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
if (flag < 0) {
BytesUtil.writeVInt(-1, buffer);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index a771f5e..54068f6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -18,25 +18,26 @@
package org.apache.kylin.source;
+import static org.apache.kylin.metadata.model.ISourceAware.ID_HIVE;
+
import java.util.HashMap;
import java.util.Map;
import org.apache.kylin.common.util.ImplementationSwitch;
import org.apache.kylin.metadata.model.ISourceAware;
-import static org.apache.kylin.metadata.model.ISourceAware.*;
import org.apache.kylin.metadata.model.TableDesc;
public class SourceFactory {
- private static ImplementationSwitch sources;
+ private static ImplementationSwitch<ISource> sources;
static {
Map<Integer, String> impls = new HashMap<>();
impls.put(ID_HIVE, "org.apache.kylin.source.hive.HiveSource");
- sources = new ImplementationSwitch(impls);
+ sources = new ImplementationSwitch<ISource>(impls, ISource.class);
}
public static ISource tableSource(ISourceAware aware) {
- return sources.get(aware.getSourceType(), ISource.class);
+ return sources.get(aware.getSourceType());
}
public static ReadableTable createReadableTable(TableDesc table) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index b26dfdb..271583c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -18,7 +18,8 @@
package org.apache.kylin.storage;
-import static org.apache.kylin.metadata.model.IStorageAware.*;
+import static org.apache.kylin.metadata.model.IStorageAware.ID_HBASE;
+import static org.apache.kylin.metadata.model.IStorageAware.ID_HYBRID;
import java.util.HashMap;
import java.util.Map;
@@ -31,22 +32,22 @@ import org.apache.kylin.metadata.realization.IRealization;
*/
public class StorageFactory {
- private static ImplementationSwitch storages;
+ private static ImplementationSwitch<IStorage> storages;
static {
Map<Integer, String> impls = new HashMap<>();
impls.put(ID_HBASE, "org.apache.kylin.storage.hbase.HBaseStorage");
impls.put(ID_HYBRID, "org.apache.kylin.storage.hybrid.HybridStorage");
- storages = new ImplementationSwitch(impls);
+ storages = new ImplementationSwitch<IStorage>(impls, IStorage.class);
}
-
+
public static IStorage storage(IStorageAware aware) {
- return storages.get(aware.getStorageType(), IStorage.class);
+ return storages.get(aware.getStorageType());
}
-
+
public static IStorageQuery createQuery(IRealization realization) {
return storage(realization).createQuery(realization);
}
-
+
public static <T> T createEngineAdapter(IStorageAware aware, Class<T> engineInterface) {
return storage(aware).adaptToBuildEngine(engineInterface);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 0c30a3c..4049fcd 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -163,7 +163,7 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
@Override
public int getCost(SQLDigest digest) {
- cost = 100;
+ cost = Integer.MAX_VALUE;
for (IRealization realization : this.getRealizations()) {
if (realization.isCapable(digest))
cost = Math.min(cost, realization.getCost(digest));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
index fbc6d19..1e05eb8 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
@@ -32,23 +32,21 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-/**
- * @author yangli9
- *
- */
public class FuzzyValueCombination {
- private static class Dim {
+ private static class Dim<E> {
TblColRef col;
- Set<String> values;
+ Set<E> values;
}
- private static final Set<String> SINGLE_NULL_SET = Sets.newHashSet();
+ private static final Set SINGLE_NULL_SET = Sets.newHashSet();
+
static {
SINGLE_NULL_SET.add(null);
}
- public static List<Map<TblColRef, String>> calculate(Map<TblColRef, Set<String>> fuzzyValues, long cap) {
+ public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> fuzzyValues, long cap) {
+ Collections.emptyMap();
Dim[] dims = toDims(fuzzyValues);
// If a query has many IN clause and each IN clause has many values, then it will easily generate
// thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked
@@ -61,9 +59,9 @@ public class FuzzyValueCombination {
}
@SuppressWarnings("unchecked")
- private static List<Map<TblColRef, String>> combination(Dim[] dims) {
+ private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) {
- List<Map<TblColRef, String>> result = Lists.newArrayList();
+ List<Map<TblColRef, E>> result = Lists.newArrayList();
int emptyDims = 0;
for (Dim dim : dims) {
@@ -76,8 +74,8 @@ public class FuzzyValueCombination {
return result;
}
- Map<TblColRef, String> r = Maps.newHashMap();
- Iterator<String>[] iters = new Iterator[dims.length];
+ Map<TblColRef, E> r = Maps.newHashMap();
+ Iterator<E>[] iters = new Iterator[dims.length];
int level = 0;
while (true) {
Dim dim = dims[level];
@@ -85,7 +83,7 @@ public class FuzzyValueCombination {
iters[level] = dim.values.iterator();
}
- Iterator<String> it = iters[level];
+ Iterator<E> it = iters[level];
if (it.hasNext() == false) {
if (level == 0)
break;
@@ -97,7 +95,7 @@ public class FuzzyValueCombination {
r.put(dim.col, it.next());
if (level == dims.length - 1) {
- result.add(new HashMap<TblColRef, String>(r));
+ result.add(new HashMap<TblColRef, E>(r));
} else {
level++;
}
@@ -105,10 +103,10 @@ public class FuzzyValueCombination {
return result;
}
- private static Dim[] toDims(Map<TblColRef, Set<String>> fuzzyValues) {
+ private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) {
Dim[] dims = new Dim[fuzzyValues.size()];
int i = 0;
- for (Entry<TblColRef, Set<String>> entry : fuzzyValues.entrySet()) {
+ for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) {
dims[i] = new Dim();
dims[i].col = entry.getKey();
dims[i].values = entry.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
index 47553ad..bdcd257 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
@@ -119,7 +119,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
}
AbstractRowKeyEncoder encoder = AbstractRowKeyEncoder.createInstance(cubeSeg, cuboid);
-
+ encoder.setEncodeShard(false);//will enumerate all possible shards when scanning
+
encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
this.startKey = encoder.encode(startValues);
@@ -133,7 +134,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
// restore encoder defaults for later reuse (note
// AbstractRowKeyEncoder.createInstance() caches instances)
encoder.setBlankByte(AbstractRowKeyEncoder.DEFAULT_BLANK_BYTE);
-
+
+ encoder.setEncodeShard(true);
// always fuzzy match cuboid ID to lock on the selected cuboid
this.fuzzyKeys = buildFuzzyKeys(fuzzyValues);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/dev-support/test_all.sh
----------------------------------------------------------------------
diff --git a/dev-support/test_all.sh b/dev-support/test_all.sh
new file mode 100644
index 0000000..6a7b887
--- /dev/null
+++ b/dev-support/test_all.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+dir=$(dirname ${0})
+cd ${dir}
+cd ..
+
+mvn clean install -DskipTests | tee mci.log
+mvn test -Dtest=org.apache.kylin.job.BuildCubeWithEngineTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithEngineTest.log
+mvn test -Dtest=org.apache.kylin.job.BuildIIWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildIIWithStreamTest.log
+mvn test -Dtest=org.apache.kylin.job.BuildCubeWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithStreamTest.log
+mvn verify -fae -P sandbox | tee mvnverify.log
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
new file mode 100644
index 0000000..d09e4ec
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class CuboidShardUtil {
+ protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class);
+
+ public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards, int totalShards) throws IOException {
+ CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ Map<Long, Short> filered = Maps.newHashMap();
+ for (Map.Entry<Long, Short> entry : cuboidShards.entrySet()) {
+ if (entry.getValue() <= 1) {
+ logger.info("Cuboid {} has {} shards, skip saving it as an optimization", entry.getKey(), entry.getValue());
+ } else {
+ logger.info("Cuboid {} has {} shards, saving it", entry.getKey(), entry.getValue());
+ filered.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ segment.setCuboidShardNums(filered);
+ segment.setTotalShards(totalShards);
+
+ CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance());
+ cubeBuilder.setToUpdateSegs(segment);
+ cubeManager.updateCube(cubeBuilder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
new file mode 100644
index 0000000..10c82c3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+
+public class CuboidStatsUtil {
+
+ public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
+ Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class));
+
+ List<Long> allCuboids = new ArrayList<Long>();
+ allCuboids.addAll(cuboidHLLMap.keySet());
+ Collections.sort(allCuboids);
+
+ // persist the sample percentage with key 0
+ writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage)));
+ ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ try {
+ for (long i : allCuboids) {
+ valueBuf.clear();
+ cuboidHLLMap.get(i).writeRegisters(valueBuf);
+ valueBuf.flip();
+ writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()));
+ }
+ } finally {
+ writer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 85312ff..568dd77 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -47,6 +47,7 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Lists;
@@ -144,7 +145,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
//output the hll info;
if (collectStatistics) {
writeMapperAndCuboidStatistics(context); // for human check
- writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob
+ CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob
}
}
@@ -202,27 +203,4 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
}
- public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
- Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
- SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class));
-
- List<Long> allCuboids = new ArrayList<Long>();
- allCuboids.addAll(cuboidHLLMap.keySet());
- Collections.sort(allCuboids);
-
- // persist the sample percentage with key 0
- writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage)));
- ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- try {
- for (long i : allCuboids) {
- valueBuf.clear();
- cuboidHLLMap.get(i).writeRegisters(valueBuf);
- valueBuf.flip();
- writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()));
- }
- } finally {
- writer.close();
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 7bb2e16..4c743fb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -7,7 +7,8 @@ import java.util.BitSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
@@ -37,12 +38,13 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
private ByteArrayWritable outputValue = new ByteArrayWritable();
private long cuboidRowCount = 0;
+ //for shard
+
public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
this.mapContext = mapContext;
this.cubeDesc = cubeDesc;
this.cubeSegment = cubeSegment;
this.measureCount = cubeDesc.getMeasures().size();
-
}
@Override
@@ -59,12 +61,20 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
}
cuboidRowCount++;
- int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+ int header = RowConstants.ROWKEY_HEADER_LEN;
+ int offSet = header;
for (int x = 0; x < dimensions; x++) {
System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
offSet += record.get(x).length();
}
+ //fill shard
+ short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
+ short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum);
+ short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
+ short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
+ BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
//output measures
valueBuf.clear();
record.exportColumns(measureColumnsIndex, valueBuf);
@@ -89,7 +99,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
}
private void initVariables(Long cuboidId) {
- bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+ bytesLength = RowConstants.ROWKEY_HEADER_LEN;
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
for (TblColRef column : cuboid.getColumns()) {
bytesLength += cubeSegment.getColumnLength(column);
@@ -102,6 +112,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
measureColumnsIndex[i] = dimensions + i;
}
- System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+ //write cuboid id first
+ BytesUtil.writeLong(cuboidId, keyBuf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 4598673..9b25c97 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -124,11 +124,16 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
Preconditions.checkState(key.offset() == 0);
- long cuboidID = rowKeySplitter.split(key.array(), key.length());
+ long cuboidID = rowKeySplitter.split(key.array());
+ short shard = rowKeySplitter.getLastSplittedShard();
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
int bufOffset = 0;
+
+ BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN);
+ bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
+
BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 45f0d32..6301f3d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -103,7 +103,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
}
-
+
private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
@@ -111,7 +111,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
String jobID = extractJobIDFromPath(filePath);
return findSegmentWithUuid(jobID, cube);
}
-
+
private static String extractJobIDFromPath(String path) {
Matcher matcher = JOB_NAME_PATTERN.matcher(path);
// check the first occurrence
@@ -134,11 +134,14 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
@Override
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
+ long cuboidID = rowKeySplitter.split(key.getBytes());
+ short shard = rowKeySplitter.getLastSplittedShard();
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
int bufOffset = 0;
+ BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN);
+ bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 5e935eb..67c4416 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -47,6 +47,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -126,7 +127,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
}
}
averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
- FactDistinctColumnsReducer.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
+ CuboidStatsUtil.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
FileSystem fs = statisticsFilePath.getFileSystem(conf);
FSDataInputStream is = fs.open(statisticsFilePath);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java
index 40c4dd7..dc8fb3f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java
@@ -20,11 +20,6 @@ package org.apache.kylin.engine.mr.steps;
import org.apache.hadoop.util.ToolRunner;
-/**
- * @author George Song (ysong1)
- *
- */
-
public class NDCuboidJob extends CuboidJob {
public NDCuboidJob() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index c47d090..2180dd6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -30,6 +32,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -49,6 +52,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private Text outputKey = new Text();
private String cubeName;
private String segmentName;
+ private CubeSegment cubeSegment;
private CubeDesc cubeDesc;
private CuboidScheduler cuboidScheduler;
@@ -68,7 +72,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
cubeDesc = cube.getDescriptor();
// initialize CubiodScheduler
@@ -80,16 +84,21 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
int offset = 0;
+ //shard id will be filled after other contents
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
+
// cuboid id
System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
- offset += childCuboid.getBytes().length;
+ offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ int bodyOffset = offset;
// rowkey columns
long mask = Long.highestOneBit(parentCuboid.getId());
long parentCuboidId = parentCuboid.getId();
long childCuboidId = childCuboid.getId();
long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
- int index = 1; // skip cuboidId
+ int index = 2; // skip shard and cuboidId
for (int i = 0; i < parentCuboidIdActualLength; i++) {
if ((mask & parentCuboidId) > 0) {// if the this bit position equals
// 1
@@ -103,12 +112,18 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
mask = mask >> 1;
}
+ //fill shard
+ short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId);
+ short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - bodyOffset, cuboidShardNum);
+ short finalShard = ShardingHash.normalize(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards());
+ BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
return offset;
}
@Override
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
+ long cuboidId = rowKeySplitter.split(key.getBytes());
Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
index 165bc13..5f2f100 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
@@ -9,6 +9,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
import org.junit.Test;
import com.google.common.collect.Maps;
@@ -28,7 +29,7 @@ public class FactDistinctColumnsReducerTest {
System.out.println(outputPath);
Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
- FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
+ CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
FileSystem.getLocal(conf).delete(outputPath, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
index efcb2ba..9e1fc2d 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.junit.After;
import org.junit.Before;
@@ -73,7 +74,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ byte[] key = { 0,0,0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
Pair<Text, Text> input1 = new Pair<Text, Text>(new Text(key), new Text(value));
@@ -83,7 +84,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
assertEquals(4, result.size());
- byte[] resultKey = { 0, 0, 0, 0, 0, 0, 1, 127, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ byte[] resultKey = { 0,0,0, 0, 0, 0, 0, 0, 1, 127, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue));
@@ -103,7 +104,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
System.out.println(Bytes.toLong(new byte[] { 0, 0, 0, 0, 0, 0, 1, -1 }));
for (int i = 0; i < result.size(); i++) {
byte[] bytes = new byte[result.get(i).getFirst().getLength()];
- System.arraycopy(result.get(i).getFirst().getBytes(), 0, bytes, 0, result.get(i).getFirst().getLength());
+ System.arraycopy(result.get(i).getFirst().getBytes(), RowConstants.ROWKEY_SHARDID_LEN, bytes, 0, result.get(i).getFirst().getLength()-RowConstants.ROWKEY_SHARDID_LEN);
System.out.println(Bytes.toLong(bytes));
keySet[i] = Bytes.toLong(bytes);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 b/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000
old mode 100644
new mode 100755
index d277125..c412e3a
Binary files a/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 and b/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 differ
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/resources/data/base_cuboid/part-r-00000
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 b/engine-mr/src/test/resources/data/base_cuboid/part-r-00000
old mode 100644
new mode 100755
index ed53ffb..9ade717
Binary files a/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 and b/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 differ
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 7296fec..70b62c0 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -298,7 +298,7 @@ public class SparkCubing extends AbstractSparkApplication {
private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
- CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap();
for (TblColRef tblColRef : baseCuboidColumn) {
@@ -341,7 +341,7 @@ public class SparkCubing extends AbstractSparkApplication {
LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
System.out.println("load properties finished");
AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
- final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeDesc, columnLengthMap));
+ final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeSegment, columnLengthMap));
Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
try {
while (listIterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
index c687b78..986e45e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
@@ -17,18 +17,21 @@
*/
package org.apache.kylin.engine.spark.cube;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Map;
+
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.TblColRef;
-import scala.Tuple2;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Map;
+import scala.Tuple2;
/**
*/
@@ -36,13 +39,13 @@ public final class DefaultTupleConverter implements TupleConverter {
private final static ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>();
private final static ThreadLocal<int[]> measureColumnsIndex = new ThreadLocal<>();
- private final CubeDesc cubeDesc;
+ private final CubeSegment segment;
private final int measureCount;
private final Map<TblColRef, Integer> columnLengthMap;
- public DefaultTupleConverter(CubeDesc cubeDesc, Map<TblColRef, Integer> columnLengthMap) {
- this.cubeDesc = cubeDesc;
- this.measureCount = cubeDesc.getMeasures().size();
+ public DefaultTupleConverter(CubeSegment segment, Map<TblColRef, Integer> columnLengthMap) {
+ this.segment = segment;
+ this.measureCount = segment.getCubeDesc().getMeasures().size();
this.columnLengthMap = columnLengthMap;
}
@@ -59,16 +62,16 @@ public final class DefaultTupleConverter implements TupleConverter {
}
return measureColumnsIndex.get();
}
-
+
@Override
public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) {
- int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+ int bytesLength = RowConstants.ROWKEY_HEADER_LEN;
+ Cuboid cuboid = Cuboid.findById(segment.getCubeDesc(), cuboidId);
for (TblColRef column : cuboid.getColumns()) {
bytesLength += columnLengthMap.get(column);
}
- final int dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+ final int dimensions = BitSet.valueOf(new long[] { cuboidId }).cardinality();
int[] measureColumnsIndex = getMeasureColumnsIndex();
for (int i = 0; i < measureCount; i++) {
measureColumnsIndex[i] = dimensions + i;
@@ -76,13 +79,21 @@ public final class DefaultTupleConverter implements TupleConverter {
byte[] key = new byte[bytesLength];
System.arraycopy(Bytes.toBytes(cuboidId), 0, key, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
- int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+ int header = RowConstants.ROWKEY_HEADER_LEN;
+ int offSet = header;
for (int x = 0; x < dimensions; x++) {
final ByteArray byteArray = record.get(x);
System.arraycopy(byteArray.array(), byteArray.offset(), key, offSet, byteArray.length());
offSet += byteArray.length();
}
+ //fill shard
+ short cuboidShardNum = segment.getCuboidShardNum(cuboidId);
+ short shardOffset = ShardingHash.getShard(key, header, offSet - header, cuboidShardNum);
+ short cuboidShardBase = segment.getCuboidBaseShard(cuboidId);
+ short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, segment.getTotalShards());
+ BytesUtil.writeShort(finalShard, key, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
ByteBuffer valueBuf = getValueBuf();
valueBuf.clear();
record.exportColumns(measureColumnsIndex, valueBuf);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java
deleted file mode 100644
index f16e9fe..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.invertedindex.index;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-public class ShardingHash {
-
- static HashFunction hashFunc = Hashing.murmur3_128();
-
- public static long hashInt(int integer) {
- return hashFunc.newHasher().putInt(integer).hash().asLong();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index 817bf01..2521fbf 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.metadata.measure.LongMutable;
@@ -153,7 +154,7 @@ public class TableRecord implements Cloneable {
public short getShard() {
int timestampID = rawRecord.getValueID(info.getTimestampColumn());
- return (short) (Math.abs(ShardingHash.hashInt(timestampID)) % info.getDescriptor().getSharding());
+ return ShardingHash.getShard(timestampID, info.getDescriptor().getSharding());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 661672e..7a2313c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@
<commons-cli.version>1.2</commons-cli.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.1</commons-lang3.version>
+ <commons-math3.version>3.3</commons-math3.version>
<commons-io.version>2.4</commons-io.version>
<commons-configuration.version>1.9</commons-configuration.version>
<commons-daemon.version>1.0.15</commons-daemon.version>
@@ -331,6 +332,12 @@
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>${commons-math3.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index d80763c..1cd55d4 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -49,13 +49,10 @@ public abstract class RoutingRule {
public static void applyRules(List<IRealization> realizations, OLAPContext olapContext) {
for (RoutingRule rule : rules) {
- logger.info("Initial realizations order:");
- logger.info(getPrintableText(realizations));
- logger.info("Applying rule " + rule);
-
+ logger.info("Realizations order before: " + getPrintableText(realizations));
+ logger.info("Applying rule : " + rule);
rule.apply(realizations, olapContext);
-
- logger.info(getPrintableText(realizations));
+ logger.info("Realizations order after: " + getPrintableText(realizations));
logger.info("===================================================");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/debug/query78.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/debug/query78.sql b/query/src/test/resources/query/debug/query78.sql
new file mode 100644
index 0000000..299f1a4
--- /dev/null
+++ b/query/src/test/resources/query/debug/query78.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select count(*) as c,sum(PRICE) as GMV, LSTG_FORMAT_NAME as FORMAT_NAME
+from test_kylin_fact
+where (LSTG_FORMAT_NAME in ('ABIN')) or (LSTG_FORMAT_NAME>='FP-GTC' and LSTG_FORMAT_NAME<='Others')
+group by LSTG_FORMAT_NAME
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/sql/query01.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query01.sql b/query/src/test/resources/query/sql/query01.sql
index 5a53058..9ed1db3 100644
--- a/query/src/test/resources/query/sql/query01.sql
+++ b/query/src/test/resources/query/sql/query01.sql
@@ -16,5 +16,5 @@
-- limitations under the License.
--
-select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact
- group by LSTG_FORMAT_NAME
+select LSTG_FORMAT_NAME,slr_segment_cd ,sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact
+ group by LSTG_FORMAT_NAME ,slr_segment_cd