You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/25 14:12:19 UTC
[6/7] incubator-kylin git commit: KYLIN-942 support parallel scan for
grid table
KYLIN-942 support parallel scan for grid table
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/fabdd5cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/fabdd5cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/fabdd5cd
Branch: refs/heads/2.x-staging
Commit: fabdd5cd9f752a617fca6670acb9b7af190be45b
Parents: 5c9c097
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 18:18:02 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Sun Oct 25 21:15:21 2015 +0800
----------------------------------------------------------------------
.../kylin/job/hadoop/invertedindex/IITest.java | 6 +-
commit_SHA1 | 16 +
core-common/pom.xml | 9 +
.../org/apache/kylin/common/util/BitSets.java | 39 +
.../org/apache/kylin/common/util/BytesUtil.java | 20 +-
.../kylin/common/util/CompressionUtils.java | 6 +-
.../kylin/common/util/ImplementationSwitch.java | 12 +-
.../apache/kylin/common/util/ShardingHash.java | 67 ++
.../apache/kylin/common/util/BitSetsTest.java | 36 +
.../apache/kylin/common/util/BytesUtilTest.java | 11 +-
.../common/util/ImplementationSwitchTest.java | 28 +-
.../org/apache/kylin/cube/CubeInstance.java | 12 +-
.../java/org/apache/kylin/cube/CubeSegment.java | 50 +
.../cube/common/FuzzyValueCombination.java | 130 +++
.../kylin/cube/common/RowKeySplitter.java | 31 +-
.../org/apache/kylin/cube/cuboid/Cuboid.java | 4 +-
.../kylin/cube/gridtable/CubeCodeSystem.java | 9 +-
.../cube/gridtable/TrimmedCubeCodeSystem.java | 15 +-
.../cube/inmemcubing/ConcurrentDiskStore.java | 4 +-
.../kylin/cube/inmemcubing/MemDiskStore.java | 4 +-
.../kylin/cube/kv/AbstractRowKeyEncoder.java | 5 +
.../apache/kylin/cube/kv/FuzzyKeyEncoder.java | 2 +-
.../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 11 +-
.../org/apache/kylin/cube/kv/RowConstants.java | 11 +-
.../org/apache/kylin/cube/kv/RowKeyDecoder.java | 4 +-
.../org/apache/kylin/cube/kv/RowKeyEncoder.java | 56 +-
.../org/apache/kylin/gridtable/GTBuilder.java | 4 +-
.../java/org/apache/kylin/gridtable/GTInfo.java | 30 +-
.../org/apache/kylin/gridtable/GTRecord.java | 75 +-
.../org/apache/kylin/gridtable/GTScanRange.java | 30 +-
.../kylin/gridtable/GTScanRangePlanner.java | 244 +++--
.../apache/kylin/gridtable/GTScanRequest.java | 9 +-
.../java/org/apache/kylin/gridtable/GTUtil.java | 4 -
.../org/apache/kylin/gridtable/GridTable.java | 3 -
.../org/apache/kylin/gridtable/IGTStore.java | 4 +-
.../gridtable/memstore/GTSimpleMemStore.java | 7 +-
.../kylin/cube/common/RowKeySplitterTest.java | 20 +-
.../apache/kylin/cube/kv/RowKeyDecoderTest.java | 10 +-
.../apache/kylin/cube/kv/RowKeyEncoderTest.java | 32 +-
.../kylin/gridtable/DictGridTableTest.java | 152 ++-
.../org/apache/kylin/engine/EngineFactory.java | 28 +-
.../kylin/metadata/filter/TupleFilter.java | 11 +
.../metadata/filter/TupleFilterSerializer.java | 10 +-
.../org/apache/kylin/source/SourceFactory.java | 9 +-
.../apache/kylin/storage/StorageFactory.java | 15 +-
.../kylin/storage/hybrid/HybridInstance.java | 2 +-
.../translate/FuzzyValueCombination.java | 30 +-
.../kylin/storage/translate/HBaseKeyRange.java | 6 +-
dev-support/test_all.sh | 11 +
.../kylin/engine/mr/common/CuboidShardUtil.java | 56 ++
.../kylin/engine/mr/common/CuboidStatsUtil.java | 61 ++
.../mr/steps/FactDistinctColumnsReducer.java | 26 +-
.../mr/steps/MapContextGTRecordWriter.java | 21 +-
.../mr/steps/MergeCuboidFromStorageMapper.java | 7 +-
.../engine/mr/steps/MergeCuboidMapper.java | 9 +-
.../engine/mr/steps/MergeStatisticsStep.java | 3 +-
.../kylin/engine/mr/steps/NDCuboidJob.java | 5 -
.../kylin/engine/mr/steps/NDCuboidMapper.java | 23 +-
.../steps/FactDistinctColumnsReducerTest.java | 3 +-
.../engine/mr/steps/NDCuboidMapperTest.java | 7 +-
.../test/resources/data/8d_cuboid/part-r-00000 | Bin 1476517 -> 913974 bytes
.../resources/data/base_cuboid/part-r-00000 | Bin 394644 -> 243816 bytes
.../apache/kylin/engine/spark/SparkCubing.java | 4 +-
.../spark/cube/DefaultTupleConverter.java | 39 +-
.../kylin/invertedindex/index/ShardingHash.java | 32 -
.../kylin/invertedindex/index/TableRecord.java | 3 +-
pom.xml | 7 +
.../apache/kylin/query/routing/RoutingRule.java | 9 +-
.../src/test/resources/query/debug/query78.sql | 22 +
query/src/test/resources/query/sql/query01.sql | 4 +-
query/src/test/resources/query/sql/query85.sql | 26 +
query/src/test/resources/query/sql/query86.sql | 24 +
.../resources/kylin-server-log4j.properties | 2 +-
.../kylin/storage/hbase/HBaseStorage.java | 5 +-
.../coprocessor/CoprocessorProjector.java | 2 +-
.../common/coprocessor/CoprocessorRowType.java | 2 +-
.../hbase/cube/v1/CubeSegmentTupleIterator.java | 2 +-
.../storage/hbase/cube/v1/CubeStorageQuery.java | 80 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 136 ++-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 155 ++-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 98 +-
.../storage/hbase/cube/v2/CubeScanner.java | 265 -----
.../hbase/cube/v2/CubeSegmentScanner.java | 290 ++++++
.../storage/hbase/cube/v2/CubeStorageQuery.java | 7 +-
.../hbase/cube/v2/HBaseReadonlyStore.java | 47 +-
.../kylin/storage/hbase/cube/v2/HBaseScan.java | 88 ++
.../kylin/storage/hbase/cube/v2/RawScan.java | 22 +-
.../cube/v2/SequentialCubeTupleIterator.java | 8 +-
.../coprocessor/endpoint/CubeVisitService.java | 29 +-
.../endpoint/generated/CubeVisitProtos.java | 981 ++++++++++++++++++-
.../endpoint/protobuf/CubeVisit.proto | 4 +
.../endpoint/EndpointTupleIterator.java | 21 +-
.../ii/coprocessor/endpoint/IIEndpoint.java | 22 +-
.../storage/hbase/steps/CreateHTableJob.java | 153 ++-
.../storage/hbase/steps/HBaseCuboidWriter.java | 26 +-
.../hbase/steps/HBaseStreamingOutput.java | 6 +-
.../kylin/storage/hbase/steps/MergeGCStep.java | 10 +-
.../hbase/steps/SandboxMetastoreCLI.java | 2 +-
98 files changed, 3158 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 913a2f7..2a643c8 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,9 +10,13 @@ import java.util.Set;
import javax.annotation.Nullable;
+
import com.google.common.base.Function;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
+
+import com.google.protobuf.HBaseZeroCopyByteString;
+
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -232,7 +236,7 @@ public class IITest extends LocalFileMetadataTestCase {
System.out.println(response.getRowsList().size());
Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
- byte[] measuresBytes = responseRow.getMeasures().toByteArray();
+ byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(responseRow.getMeasures());
List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
Assert.assertTrue(answers.contains(metrics.get(0)));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/commit_SHA1
----------------------------------------------------------------------
diff --git a/commit_SHA1 b/commit_SHA1
new file mode 100644
index 0000000..d2f3970
--- /dev/null
+++ b/commit_SHA1
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+b66c25803a2f976cca067148278dbe7d7b0d79ef
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 7c78723..ebcb676 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -49,6 +49,10 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
@@ -65,6 +69,11 @@
<artifactId>commons-email</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>${commons-collections4.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java b/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java
new file mode 100644
index 0000000..b8a6de7
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import java.util.BitSet;
+
+public class BitSets {
+ public static BitSet valueOf(int[] indexes) {
+ if (indexes == null || indexes.length == 0) {
+ return new BitSet();
+ }
+
+ int maxIndex = Integer.MIN_VALUE;
+ for (int index : indexes) {
+ maxIndex = Math.max(maxIndex, index);
+ }
+ BitSet set = new BitSet(maxIndex);
+ for (int index : indexes) {
+ set.set(index);
+ }
+ return set;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0880da1..0d4dba9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -25,14 +25,23 @@ public class BytesUtil {
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
- public static void writeLong(long num, byte[] bytes, int offset, int size) {
+ public static void writeShort(short num, byte[] bytes, int offset, int size) {
for (int i = offset + size - 1; i >= offset; i--) {
bytes[i] = (byte) num;
num >>>= 8;
}
}
- public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+ public static long readShort(byte[] bytes, int offset, int size) {
+ short num = 0;
+ for (int i = offset, n = offset + size; i < n; i++) {
+ num <<= 8;
+ num |= (short) bytes[i] & 0xFF;
+ }
+ return num;
+ }
+
+ public static void writeLong(long num, byte[] bytes, int offset, int size) {
for (int i = offset + size - 1; i >= offset; i--) {
bytes[i] = (byte) num;
num >>>= 8;
@@ -48,6 +57,13 @@ public class BytesUtil {
return integer;
}
+ public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+ for (int i = offset + size - 1; i >= offset; i--) {
+ bytes[i] = (byte) num;
+ num >>>= 8;
+ }
+ }
+
public static int readUnsigned(byte[] bytes, int offset, int size) {
int integer = 0;
for (int i = offset, n = offset + size; i < n; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
index 13abab5..c9838e4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
@@ -45,8 +45,7 @@ public class CompressionUtils {
outputStream.close();
byte[] output = outputStream.toByteArray();
- logger.info("Original: " + data.length + " bytes");
- logger.info("Compressed: " + output.length + " bytes");
+ logger.info("Original: " + data.length + " bytes. " + "Compressed: " + output.length + " bytes ");
return output;
}
@@ -63,8 +62,7 @@ public class CompressionUtils {
outputStream.close();
byte[] output = outputStream.toByteArray();
- logger.info("Original: " + data.length + " bytes");
- logger.info("Decompressed: " + output.length + " bytes");
+ logger.info("Original: " + data.length + " bytes. " + "Decompressed: " + output.length + " bytes");
return output;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
index 3101c81..f6924a0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
@@ -26,14 +26,16 @@ import org.slf4j.LoggerFactory;
* Provide switch between different implementations of a same interface.
* Each implementation is identified by an integer ID.
*/
-public class ImplementationSwitch {
+public class ImplementationSwitch<I> {
private static final Logger logger = LoggerFactory.getLogger(ImplementationSwitch.class);
final private Object[] instances;
+ private Class<I> interfaceClz;
- public ImplementationSwitch(Map<Integer, String> impls) {
- instances = initInstances(impls);
+ public ImplementationSwitch(Map<Integer, String> impls, Class<I> interfaceClz) {
+ this.interfaceClz = interfaceClz;
+ this.instances = initInstances(impls);
}
private Object[] initInstances(Map<Integer, String> impls) {
@@ -58,13 +60,13 @@ public class ImplementationSwitch {
return result;
}
- public <I> I get(int id, Class<I> interfaceClz) {
+ public I get(int id) {
@SuppressWarnings("unchecked")
I result = (I) instances[id];
if (result == null)
throw new IllegalArgumentException("Implementations missing, ID " + id + ", interafce " + interfaceClz.getName());
-
+
return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
new file mode 100644
index 0000000..8d728c8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+public class ShardingHash {
+
+ static HashFunction hashFunc = Hashing.murmur3_128();
+
+ public static short getShard(int integerValue, int totalShards) {
+ if (totalShards <= 1) {
+ return 0;
+ }
+ long hash = hashFunc.hashInt(integerValue).asLong();
+ return _getShard(hash, totalShards);
+ }
+
+ public static short getShard(long longValue, int totalShards) {
+ if (totalShards <= 1) {
+ return 0;
+ }
+ long hash = hashFunc.hashLong(longValue).asLong();
+ return _getShard(hash, totalShards);
+ }
+
+ public static short getShard(byte[] byteValues, int offset, int length, int totalShards) {
+ if (totalShards <= 1) {
+ return 0;
+ }
+
+ long hash = hashFunc.hashBytes(byteValues, offset, length).asLong();
+ return _getShard(hash, totalShards);
+ }
+
+ public static short normalize(short cuboidShardBase, short shardOffset, int totalShards) {
+ if (totalShards <= 1) {
+ return 0;
+ }
+ return (short) ((cuboidShardBase + shardOffset) % totalShards);
+ }
+
+ private static short _getShard(long hash, int totalShard) {
+ long x = hash % totalShard;
+ if (x < 0) {
+ x += totalShard;
+ }
+ return (short) x;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java
new file mode 100644
index 0000000..c923969
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import java.util.BitSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BitSetsTest {
+
+ @Test
+ public void basicTest() {
+ BitSet a = BitSets.valueOf(new int[] { 1, 3, 10 });
+ Assert.assertEquals(3, a.cardinality());
+ Assert.assertTrue(10 < a.size());
+ Assert.assertTrue(a.get(3));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
index 7436de9..79bc9f1 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
@@ -18,17 +18,15 @@
package org.apache.kylin.common.util;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.nio.ByteBuffer;
import java.util.Arrays;
-import junit.framework.TestCase;
-
import org.junit.Test;
-/**
- * by honma
- */
-public class BytesUtilTest extends TestCase {
+public class BytesUtilTest {
@Test
public void test() {
ByteBuffer buffer = ByteBuffer.allocate(10000);
@@ -77,6 +75,7 @@ public class BytesUtilTest extends TestCase {
assertTrue(Arrays.equals(anOtherNewBytes, ba.array()));
}
+ @Test
public void testReadable() {
String x = "\\x00\\x00\\x00\\x00\\x00\\x01\\xFC\\xA8";
byte[] bytes = BytesUtil.fromReadableText(x);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java b/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
index 4c69eeb..ab0b565 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.kylin.common.util;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
@@ -25,34 +25,34 @@ import java.util.Map;
import org.junit.Test;
public class ImplementationSwitchTest {
-
- ImplementationSwitch sw;
+
+ ImplementationSwitch<I> sw;
public ImplementationSwitchTest() {
Map<Integer, String> impls = new HashMap<>();
impls.put(0, "non.exist.class");
impls.put(1, Impl1.class.getName());
impls.put(2, Impl2.class.getName());
- sw = new ImplementationSwitch(impls);
+ sw = new ImplementationSwitch<I>(impls, I.class);
}
-
- public static interface I {
+
+ public interface I {
}
-
+
public static class Impl1 implements I {
}
-
+
public static class Impl2 implements I {
}
-
+
@Test
public void test() {
- assertTrue(sw.get(1, I.class) instanceof Impl1);
- assertTrue(sw.get(2, I.class) instanceof Impl2);
+ assertTrue(sw.get(1) instanceof Impl1);
+ assertTrue(sw.get(2) instanceof Impl2);
}
-
- @Test(expected = IllegalArgumentException.class)
+
+ @Test(expected = IllegalArgumentException.class)
public void testException() {
- sw.get(0, I.class);
+ sw.get(0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 8b6653b..f8e37f3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -48,10 +48,9 @@ import com.google.common.collect.Lists;
@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class CubeInstance extends RootPersistentEntity implements IRealization, IBuildable {
- private static final int COST_WEIGHT_DIMENSION = 1;
private static final int COST_WEIGHT_MEASURE = 1;
- private static final int COST_WEIGHT_LOOKUP_TABLE = 1;
- private static final int COST_WEIGHT_INNER_JOIN = 2;
+ private static final int COST_WEIGHT_DIMENSION = 10;
+ private static final int COST_WEIGHT_INNER_JOIN = 100;
public static CubeInstance create(String cubeName, String projectName, CubeDesc cubeDesc) {
CubeInstance cubeInstance = new CubeInstance();
@@ -67,7 +66,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return cubeInstance;
}
-
+
@JsonIgnore
private KylinConfig config;
@JsonProperty("name")
@@ -122,7 +121,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
}
return mergingSegments;
}
-
+
public CubeDesc getDescriptor() {
return CubeDescManager.getInstance(config).getCubeDesc(descName);
}
@@ -355,7 +354,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
for (LookupDesc lookupDesc : this.getDescriptor().getModel().getLookups()) {
// more tables, more cost
- calculatedCost += COST_WEIGHT_LOOKUP_TABLE;
if ("inner".equals(lookupDesc.getJoin().getType())) {
// inner join cost is bigger than left join, as it will filter some records
calculatedCost += COST_WEIGHT_INNER_JOIN;
@@ -444,12 +442,10 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
public int getStorageType() {
return getDescriptor().getStorageType();
}
-
@Override
public int getEngineType() {
return getDescriptor().getEngineType();
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 7d89470..1a44fcf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -25,6 +25,7 @@ import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
@@ -37,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
@@ -67,6 +69,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
private String lastBuildJobID;
@JsonProperty("create_time_utc")
private long createTimeUTC;
+ @JsonProperty("cuboid_shard_nums")
+ private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
+ @JsonProperty("total_shards")
+ private int totalShards = 0;
@JsonProperty("binary_signature")
private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check
@@ -76,6 +82,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
@JsonProperty("snapshots")
private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path
+ private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid
+
public CubeDesc getCubeDesc() {
return getCubeInstance().getDescriptor();
}
@@ -360,4 +368,46 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
return cubeInstance.getStorageType();
}
+ /**
+ * get the number of shards where each cuboid will distribute
+ * @return
+ */
+ public Short getCuboidShardNum(Long cuboidId) {
+ Short ret = this.cuboidShardNums.get(cuboidId);
+ if (ret == null) {
+ return 1;
+ } else {
+ return ret;
+ }
+ }
+
+ // /**
+ // * get the number of shards where each cuboid will distribute
+ // * @return
+ // */
+ // public Map<Long, Short> getCuboidShards() {
+ // return this.cuboidShards;
+ // }
+
+ public void setCuboidShardNums(Map<Long, Short> newCuboidShards) {
+ this.cuboidShardNums = newCuboidShards;
+ }
+
+ public int getTotalShards() {
+ return totalShards;
+ }
+
+ public void setTotalShards(int totalShards) {
+ this.totalShards = totalShards;
+ }
+
+ public short getCuboidBaseShard(Long cuboidId) {
+ Short ret = cuboidBaseShards.get(cuboidId);
+ if (ret == null) {
+ ret = ShardingHash.getShard(cuboidId, totalShards);
+ cuboidBaseShards.put(cuboidId, ret);
+ }
+ return ret;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
new file mode 100644
index 0000000..4ddb06a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class FuzzyValueCombination {
+
+ private static class Dim<K, V> {
+ K col;
+ Set<V> values;
+ }
+
+ private static final Set SINGLE_NULL_SET = Sets.newHashSet();
+
+ static {
+ SINGLE_NULL_SET.add(null);
+ }
+
+ public static <K, V> List<Map<K, V>> calculate(Map<K, Set<V>> fuzzyValues, long cap) {
+ Collections.emptyMap();
+ Dim<K, V>[] dims = toDims(fuzzyValues);
+ // If a query has many IN clause and each IN clause has many values, then it will easily generate
+ // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked
+ // on it. So simply choose to abandon all fuzzy keys in this case.
+ if (exceedCap(dims, cap)) {
+ return Lists.newArrayList();
+ } else {
+ return combination(dims);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K, V> List<Map<K, V>> combination(Dim<K, V>[] dims) {
+
+ List<Map<K, V>> result = Lists.newArrayList();
+
+ int emptyDims = 0;
+ for (Dim dim : dims) {
+ if (dim.values.isEmpty()) {
+ dim.values = SINGLE_NULL_SET;
+ emptyDims++;
+ }
+ }
+ if (emptyDims == dims.length) {
+ return result;
+ }
+
+ Map<K, V> r = Maps.newHashMap();
+ Iterator<V>[] iters = new Iterator[dims.length];
+ int level = 0;
+ while (true) {
+ Dim<K, V> dim = dims[level];
+ if (iters[level] == null) {
+ iters[level] = dim.values.iterator();
+ }
+
+ Iterator<V> it = iters[level];
+ if (it.hasNext() == false) {
+ if (level == 0)
+ break;
+ r.remove(dim.col);
+ iters[level] = null;
+ level--;
+ continue;
+ }
+
+ r.put(dim.col, it.next());
+ if (level == dims.length - 1) {
+ result.add(new HashMap<K, V>(r));
+ } else {
+ level++;
+ }
+ }
+ return result;
+ }
+
+ private static <K, V> Dim<K, V>[] toDims(Map<K, Set<V>> fuzzyValues) {
+ Dim[] dims = new Dim[fuzzyValues.size()];
+ int i = 0;
+ for (Entry<K, Set<V>> entry : fuzzyValues.entrySet()) {
+ dims[i] = new Dim<K, V>();
+ dims[i].col = entry.getKey();
+ dims[i].values = entry.getValue();
+ if (dims[i].values == null)
+ dims[i].values = Collections.emptySet();
+ i++;
+ }
+ return dims;
+ }
+
+ private static boolean exceedCap(Dim[] dims, long cap) {
+ return combCount(dims) > cap;
+ }
+
+ private static long combCount(Dim[] dims) {
+ long count = 1;
+ for (Dim dim : dims) {
+ count *= Math.max(dim.values.size(), 1);
+ }
+ return count;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index 7e379dd..0111cee 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -27,10 +27,6 @@ import org.apache.kylin.cube.kv.RowKeyColumnIO;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.TblColRef;
-/**
- * @author George Song (ysong1)
- *
- */
public class RowKeySplitter {
private CubeDesc cubeDesc;
@@ -39,6 +35,9 @@ public class RowKeySplitter {
private SplittedBytes[] splitBuffers;
private int bufferSize;
+ private long lastSplittedCuboidId;
+ private short lastSplittedShard;
+
public SplittedBytes[] getSplitBuffers() {
return splitBuffers;
}
@@ -47,6 +46,14 @@ public class RowKeySplitter {
return bufferSize;
}
+ public long getLastSplittedCuboidId() {
+ return lastSplittedCuboidId;
+ }
+
+ public short getLastSplittedShard() {
+ return lastSplittedShard;
+ }
+
public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
this.cubeDesc = cubeSeg.getCubeDesc();
this.colIO = new RowKeyColumnIO(cubeSeg);
@@ -60,21 +67,27 @@ public class RowKeySplitter {
/**
* @param bytes
- * @param byteLen
* @return cuboid ID
*/
- public long split(byte[] bytes, int byteLen) {
+ public long split(byte[] bytes) {
this.bufferSize = 0;
int offset = 0;
+ // extract shard
+ SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++];
+ shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN;
+ System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
+
// extract cuboid id
SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN;
System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
offset += RowConstants.ROWKEY_CUBOIDID_LEN;
- long cuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+ lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
+ lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length);
+ Cuboid cuboid = Cuboid.findById(cubeDesc, lastSplittedCuboidId);
// rowkey columns
for (int i = 0; i < cuboid.getColumns().size(); i++) {
@@ -86,6 +99,6 @@ public class RowKeySplitter {
offset += colLength;
}
- return cuboidId;
+ return lastSplittedCuboidId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 3ff1d61..2c8680d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -22,15 +22,13 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.*;
import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
import org.apache.kylin.metadata.model.TblColRef;
-/**
- * @author George Song (ysong1)
- */
public class Cuboid implements Comparable<Cuboid> {
private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index b0554ae..9e45fe0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -20,9 +20,10 @@ import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
import org.apache.kylin.metadata.measure.serializer.StringSerializer;
/**
- * Created by shaoshi on 3/23/15.
- * This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check
- * its data type to serialize/deserialize it;
+ * defines how column values will be encoded to/ decoded from GTRecord
+ *
+ * Cube meta can provide which columns are dictionary encoded (dict encoded dimensions) or fixed length encoded (fixed length dimensions)
+ * Metrics columns are more flexible, they will use DataTypeSerializer according to their data type.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class CubeCodeSystem implements IGTCodeSystem {
@@ -110,7 +111,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
if (serializer instanceof DictionarySerializer) {
((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf);
} else {
- if ((!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer)) && (value instanceof String)) {
+ if ((value instanceof String) && (!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer))) {
value = serializer.valueOf((String) value);
}
serializer.serialize(value, buf);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index e662a82..e4f32fb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -74,15 +74,12 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem {
@Override
public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
DataTypeSerializer serializer = serializers[col];
- if (serializer instanceof CubeCodeSystem.TrimmedDictionarySerializer || serializer instanceof CubeCodeSystem.DictionarySerializer) {
- //TODO: remove this check
- throw new IllegalStateException("Encode dictionary value in coprocessor");
- } else {
- if ((!(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer)) && (value instanceof String)) {
- value = serializer.valueOf((String) value);
- }
- serializer.serialize(value, buf);
- }
+
+// if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) {
+// value = serializer.valueOf((String) value);
+// }
+
+ serializer.serialize(value, buf);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
index ebff9c8..8b95b4f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
@@ -92,12 +92,12 @@ public class ConcurrentDiskStore implements IGTStore, Closeable {
}
@Override
- public IGTWriter rebuild(int shard) throws IOException {
+ public IGTWriter rebuild() throws IOException {
return newWriter(0);
}
@Override
- public IGTWriter append(int shard) throws IOException {
+ public IGTWriter append() throws IOException {
return newWriter(diskFile.length());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
index 2a12d1b..166ae76 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
@@ -88,12 +88,12 @@ public class MemDiskStore implements IGTStore, Closeable {
}
@Override
- public IGTWriter rebuild(int shard) throws IOException {
+ public IGTWriter rebuild() throws IOException {
return newWriter(0);
}
@Override
- public IGTWriter append(int shard) throws IOException {
+ public IGTWriter append() throws IOException {
return newWriter(length());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index f566f5c..1e24432 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -62,6 +62,7 @@ public abstract class AbstractRowKeyEncoder {
protected final Cuboid cuboid;
protected byte blankByte = DEFAULT_BLANK_BYTE;
+ protected boolean encodeShard = true;
protected AbstractRowKeyEncoder(Cuboid cuboid) {
this.cuboid = cuboid;
@@ -71,6 +72,10 @@ public abstract class AbstractRowKeyEncoder {
this.blankByte = blankByte;
}
+ public void setEncodeShard(boolean encodeShard) {
+ this.encodeShard = encodeShard;
+ }
+
abstract public byte[] encode(Map<TblColRef, String> valueMap);
abstract public byte[] encode(byte[][] values);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
index a17bb28..2185bc5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
@@ -37,7 +37,7 @@ public class FuzzyKeyEncoder extends RowKeyEncoder {
@Override
protected byte[] defaultValue(int length) {
byte[] keyBytes = new byte[length];
- Arrays.fill(keyBytes, RowConstants.FUZZY_MASK_ZERO);
+ Arrays.fill(keyBytes, RowConstants.BYTE_ZERO);
return keyBytes;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 5077287..bf67538 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -36,20 +36,19 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
}
@Override
- protected int fillHeader(byte[] bytes, byte[][] values) {
+ protected int fillHeader(byte[] bytes) {
+ Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
// always fuzzy match cuboid ID to lock on the selected cuboid
- int cuboidStart = this.headerLength - RowConstants.ROWKEY_CUBOIDID_LEN;
- Arrays.fill(bytes, 0, cuboidStart, RowConstants.FUZZY_MASK_ONE);
- Arrays.fill(bytes, cuboidStart, this.headerLength, RowConstants.FUZZY_MASK_ZERO);
+ Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.BYTE_ZERO);
return this.headerLength;
}
@Override
protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
if (value == null) {
- Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ONE);
+ Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ONE);
} else {
- Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ZERO);
+ Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ZERO);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 7607edf..6a8eeb5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -26,18 +26,23 @@ public class RowConstants {
public static final byte ROWKEY_LOWER_BYTE = 0;
// row key upper bound
public static final byte ROWKEY_UPPER_BYTE = (byte) 0xff;
+
// row key cuboid id length
public static final int ROWKEY_CUBOIDID_LEN = 8;
+ // row key shard length
+ public static final int ROWKEY_SHARDID_LEN = 2;
- // fuzzy mask
- public static final byte FUZZY_MASK_ZERO = 0;
- public static final byte FUZZY_MASK_ONE = 1;
+ public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
+
+ public static final byte BYTE_ZERO = 0;
+ public static final byte BYTE_ONE = 1;
// row value delimiter
public static final byte ROWVALUE_DELIMITER_BYTE = 7;
public static final String ROWVALUE_DELIMITER_STRING = String.valueOf((char) 7);
public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 };
+ public static final int ROWKEY_BUFFER_SIZE = 1024 * 1024; // 1 MB
public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB
// marker class
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 1b896a0..3506845 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -53,12 +53,12 @@ public class RowKeyDecoder {
public long decode(byte[] bytes) throws IOException {
this.values.clear();
- long cuboidId = rowKeySplitter.split(bytes, bytes.length);
+ long cuboidId = rowKeySplitter.split(bytes);
initCuboid(cuboidId);
SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
- int offset = 1; // skip cuboid id part
+ int offset = 2; // skip shard and cuboid id part
for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
TblColRef col = this.cuboid.getColumns().get(i);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 7f8bbd3..0676df6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -24,56 +24,33 @@ import java.util.List;
import java.util.Map;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.TblColRef;
-/**
- * @author George Song (ysong1)
- */
public class RowKeyEncoder extends AbstractRowKeyEncoder {
private int bytesLength;
protected int headerLength;
private RowKeyColumnIO colIO;
+ CubeSegment cubeSeg;
protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
super(cuboid);
+ this.cubeSeg = cubeSeg;
colIO = new RowKeyColumnIO(cubeSeg);
- bytesLength = headerLength = RowConstants.ROWKEY_CUBOIDID_LEN; // header
+ bytesLength = headerLength = RowConstants.ROWKEY_HEADER_LEN; // include shard and cuboidid
for (TblColRef column : cuboid.getColumns()) {
bytesLength += colIO.getColumnLength(column);
}
}
- public RowKeyColumnIO getColumnIO() {
- return colIO;
- }
-
- public int getColumnOffset(TblColRef col) {
- int offset = RowConstants.ROWKEY_CUBOIDID_LEN;
-
- for (TblColRef dimCol : cuboid.getColumns()) {
- if (col.equals(dimCol))
- return offset;
- offset += colIO.getColumnLength(dimCol);
- }
-
- throw new IllegalArgumentException("Column " + col + " not found on cuboid " + cuboid);
- }
-
public int getColumnLength(TblColRef col) {
return colIO.getColumnLength(col);
}
- public int getRowKeyLength() {
- return bytesLength;
- }
-
- public int getHeaderLength() {
- return headerLength;
- }
-
@Override
public byte[] encode(Map<TblColRef, String> valueMap) {
List<byte[]> valueList = new ArrayList<byte[]>();
@@ -95,7 +72,8 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
@Override
public byte[] encode(byte[][] values) {
byte[] bytes = new byte[this.bytesLength];
- int offset = fillHeader(bytes, values);
+ int bodyOffset = RowConstants.ROWKEY_HEADER_LEN;
+ int offset = bodyOffset;
for (int i = 0; i < cuboid.getColumns().size(); i++) {
TblColRef column = cuboid.getColumns().get(i);
@@ -107,18 +85,34 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
fillColumnValue(column, colLength, value, value.length, bytes, offset);
}
offset += colLength;
-
}
+
+ //fill shard and cuboid
+ fillHeader(bytes);
+
return bytes;
}
- protected int fillHeader(byte[] bytes, byte[][] values) {
+ protected int fillHeader(byte[] bytes) {
int offset = 0;
+
+ if (encodeShard) {
+ short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+ short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
+ short finalShard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
+ BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ } else {
+ BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ }
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
+
System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
if (this.headerLength != offset) {
throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
}
+
return offset;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
index 31ea9e2..5eefa54 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
@@ -19,9 +19,9 @@ public class GTBuilder implements Closeable {
this.info = info;
if (append) {
- storeWriter = store.append(shard);
+ storeWriter = store.append();
} else {
- storeWriter = store.rebuild(shard);
+ storeWriter = store.rebuild();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index d4fe3fb..e3d3640 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -34,13 +34,11 @@ public class GTInfo {
ImmutableBitSet colBlocksAll;
int rowBlockSize; // 0: disable row block
- // sharding
- int nShards; // 0: no sharding
-
// must create from builder
private GTInfo() {
}
+
public String getTableName() {
return tableName;
}
@@ -56,15 +54,11 @@ public class GTInfo {
public ImmutableBitSet getPrimaryKey() {
return primaryKey;
}
-
+
public ImmutableBitSet getAllColumns() {
return colAll;
}
- public boolean isShardingEnabled() {
- return nShards > 0;
- }
-
public boolean isRowBlockEnabled() {
return rowBlockSize > 0;
}
@@ -119,7 +113,7 @@ public class GTInfo {
public void validateColRef(TblColRef ref) {
TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex());
- if (expected.equals(ref) == false)
+ if (!expected.equals(ref))
throw new IllegalArgumentException();
}
@@ -162,11 +156,11 @@ public class GTInfo {
for (int i = 0; i < colBlocks.length; i++) {
merge = merge.or(colBlocks[i]);
}
- if (merge.equals(colAll) == false)
+ if (!merge.equals(colAll))
throw new IllegalStateException();
// primary key must be the first column block
- if (primaryKey.equals(colBlocks[0]) == false)
+ if (!primaryKey.equals(colBlocks[0]))
throw new IllegalStateException();
// drop empty column block
@@ -177,7 +171,7 @@ public class GTInfo {
if (cb.isEmpty())
it.remove();
}
- colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]);
+ colBlocks = list.toArray(new ImmutableBitSet[list.size()]);
}
public static class Builder {
@@ -228,12 +222,6 @@ public class GTInfo {
}
/** optional */
- public Builder enableSharding(int nShards) {
- info.nShards = nShards;
- return this;
- }
-
- /** optional */
public Builder setColumnPreferIndex(ImmutableBitSet colPreferIndex) {
info.colPreferIndex = colPreferIndex;
return this;
@@ -256,8 +244,12 @@ public class GTInfo {
return KryoUtils.serialize(info);
}
}
-
+
public static GTInfo deserialize(byte[] bytes) {
return KryoUtils.deserialize(bytes, GTInfo.class);
}
+
+ public IGTCodeSystem getCodeSystem() {
+ return codeSystem;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index dbfdf57..0f4eb3d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -2,7 +2,7 @@ package org.apache.kylin.gridtable;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.BitSet;
+import java.util.List;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -159,7 +159,7 @@ public class GTRecord implements Comparable<GTRecord> {
return false;
for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
int c = maskForEqualHashComp.trueBitAt(i);
- if (this.cols[c].equals(o.cols[c]) == false) {
+ if (!this.cols[c].equals(o.cols[c])) {
return false;
}
}
@@ -228,19 +228,6 @@ public class GTRecord implements Comparable<GTRecord> {
buf.setLength(pos);
}
- /** write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not*/
- public void exportAllColumns(ByteBuffer buf) {
- for (int i = 0; i < info.colAll.trueBitCount(); i++) {
- int c = info.colAll.trueBitAt(i);
- if (cols[c] == null || cols[c].array() == null) {
- buf.put((byte) 0);
- } else {
- buf.put((byte) 1);
- buf.put(cols[c].array(), cols[c].offset(), cols[c].length());
- }
- }
- }
-
/** write data to given buffer, like serialize */
public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
for (int i = 0; i < selectedCols.trueBitCount(); i++) {
@@ -261,34 +248,11 @@ public class GTRecord implements Comparable<GTRecord> {
}
/** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void loadPrimaryKey(ByteBuffer buf) {
- loadColumns(info.primaryKey, buf);
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
public void loadCellBlock(int c, ByteBuffer buf) {
loadColumns(info.colBlocks[c], buf);
}
/** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void loadAllColumns(ByteBuffer buf) {
- int pos = buf.position();
- for (int i = 0; i < info.colAll.trueBitCount(); i++) {
- int c = info.colAll.trueBitAt(i);
-
- byte exist = buf.get();
- pos++;
-
- if (exist == 1) {
- int len = info.codeSystem.codeLength(c, buf);
- cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
- pos += len;
- buf.position(pos);
- }
- }
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
int pos = buf.position();
for (int i = 0; i < selectedCols.trueBitCount(); i++) {
@@ -300,30 +264,19 @@ public class GTRecord implements Comparable<GTRecord> {
}
}
- /** similar to export(primaryKey) but will stop at the first null value */
- public static ByteArray exportScanKey(GTRecord rec) {
- if (rec == null)
- return null;
-
- GTInfo info = rec.getInfo();
-
- BitSet selectedColumns = new BitSet();
- int len = 0;
- for (int i = 0; i < info.primaryKey.trueBitCount(); i++) {
- int c = info.primaryKey.trueBitAt(i);
- if (rec.cols[c].array() == null) {
- break;
- }
- selectedColumns.set(c);
- len += rec.cols[c].length();
+ /** change pointers to point to data in given buffer, UNLIKE deserialize
+ * unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this
+ * method allows to defined specific columns(in order) to load
+ */
+ public void loadColumns(List<Integer> selectedCols, ByteBuffer buf) {
+ int pos = buf.position();
+ for (int i = 0; i < selectedCols.size(); i++) {
+ int c = selectedCols.get(i);
+ int len = info.codeSystem.codeLength(c, buf);
+ cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
+ pos += len;
+ buf.position(pos);
}
-
- if (selectedColumns.cardinality() == 0)
- return null;
-
- ByteArray buf = ByteArray.allocate(len);
- rec.exportColumns(new ImmutableBitSet(selectedColumns), buf);
- return buf;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
index 197fde4..eefe88e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
@@ -7,42 +7,26 @@ public class GTScanRange {
final public GTRecord pkStart; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
- final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys
+ final public List<GTRecord> fuzzyKeys; // partial matching primary keys
public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
this(pkStart, pkEnd, null);
}
- public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) {
+ public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys) {
GTInfo info = pkStart.info;
assert info == pkEnd.info;
- validateRangeKey(pkStart);
- validateRangeKey(pkEnd);
-
this.pkStart = pkStart;
this.pkEnd = pkEnd;
- this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord> emptyList() : hbaseFuzzyKeys;
- }
-
- private void validateRangeKey(GTRecord pk) {
- pk.maskForEqualHashComp(pk.info.primaryKey);
- boolean afterNull = false;
- for (int i = 0; i < pk.info.primaryKey.trueBitCount(); i++) {
- int c = pk.info.primaryKey.trueBitAt(i);
- if (afterNull) {
- pk.cols[c].set(null, 0, 0);
- } else {
- afterNull = pk.cols[c].array() == null;
- }
- }
+ this.fuzzyKeys = fuzzyKeys == null ? Collections.<GTRecord> emptyList() : fuzzyKeys;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode());
+ result = prime * result + ((fuzzyKeys == null) ? 0 : fuzzyKeys.hashCode());
result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode());
result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode());
return result;
@@ -57,10 +41,10 @@ public class GTScanRange {
if (getClass() != obj.getClass())
return false;
GTScanRange other = (GTScanRange) obj;
- if (hbaseFuzzyKeys == null) {
- if (other.hbaseFuzzyKeys != null)
+ if (fuzzyKeys == null) {
+ if (other.fuzzyKeys != null)
return false;
- } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys))
+ } else if (!fuzzyKeys.equals(other.fuzzyKeys))
return false;
if (pkEnd == null) {
if (other.pkEnd != null)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index c09ecf0..d860090 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -1,6 +1,7 @@
package org.apache.kylin.gridtable;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -11,35 +12,50 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.common.FuzzyValueCombination;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class GTScanRangePlanner {
+ private static final Logger logger = LoggerFactory.getLogger(GTScanRangePlanner.class);
+
private static final int MAX_HBASE_FUZZY_KEYS = 100;
final private GTInfo info;
- final private ComparatorEx<ByteArray> byteUnknownIsSmaller;
- final private ComparatorEx<ByteArray> byteUnknownIsBigger;
- final private ComparatorEx<GTRecord> recordUnknownIsSmaller;
- final private ComparatorEx<GTRecord> recordUnknownIsBigger;
+ final private Pair<ByteArray, ByteArray> segmentStartAndEnd;
+ final private TblColRef partitionColRef;
+
+ final private RecordComparator rangeStartComparator;
+ final private RecordComparator rangeEndComparator;
+ final private RecordComparator rangeStartEndComparator;
- public GTScanRangePlanner(GTInfo info) {
+ public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> segmentStartAndEnd, TblColRef partitionColRef) {
this.info = info;
+ this.segmentStartAndEnd = segmentStartAndEnd;
+ this.partitionColRef = partitionColRef;
IGTComparator comp = info.codeSystem.getComparator();
- this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(comp);
- this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(comp);
- this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(comp);
- this.recordUnknownIsBigger = recordComparatorTreatsUnknownBigger(comp);
+
+ //start key GTRecord compare to start key GTRecord
+ this.rangeStartComparator = getRangeStartComparator(comp);
+ //stop key GTRecord compare to stop key GTRecord
+ this.rangeEndComparator = getRangeEndComparator(comp);
+ //start key GTRecord compare to stop key GTRecord
+ this.rangeStartEndComparator = getRangeStartEndComparator(comp);
}
// return empty list meaning filter is always false
@@ -57,7 +73,8 @@ public class GTScanRangePlanner {
List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
GTScanRange scanRange = newScanRange(andDimRanges);
- scanRanges.add(scanRange);
+ if (scanRange != null)
+ scanRanges.add(scanRange);
}
List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
@@ -69,28 +86,64 @@ public class GTScanRangePlanner {
private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
GTRecord pkStart = new GTRecord(info);
GTRecord pkEnd = new GTRecord(info);
- List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList();
+ Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
+
+ List<GTRecord> fuzzyKeys;
for (ColumnRange range : andDimRanges) {
+
+ if (partitionColRef != null && range.column.equals(partitionColRef)) {
+
+ if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 //
+ && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0) {
+ //segment range is [Closed,Open)
+ } else {
+ return null;
+ }
+ }
+
int col = range.column.getColumnDesc().getZeroBasedIndex();
- if (info.primaryKey.get(col) == false)
+ if (!info.primaryKey.get(col))
continue;
pkStart.set(col, range.begin);
pkEnd.set(col, range.end);
- if (range.equals != null) {
- ImmutableBitSet fuzzyMask = new ImmutableBitSet(col);
- for (ByteArray v : range.equals) {
- GTRecord fuzzy = new GTRecord(info);
- fuzzy.set(col, v);
- fuzzy.maskForEqualHashComp(fuzzyMask);
- hbaseFuzzyKeys.add(fuzzy);
- }
+ if (range.valueSet != null && !range.valueSet.isEmpty()) {
+ fuzzyValues.put(col, range.valueSet);
}
}
- return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys);
+ fuzzyKeys = buildFuzzyKeys(fuzzyValues);
+
+ return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
+ }
+
+ private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) {
+ ArrayList<GTRecord> result = Lists.newArrayList();
+
+ if (fuzzyValueSet.isEmpty())
+ return result;
+
+ // debug/profiling purpose
+ if (BackdoorToggles.getDisableFuzzyKey()) {
+ logger.info("The execution of this query will not use fuzzy key");
+ return result;
+ }
+
+ List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS);
+
+ for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
+ GTRecord fuzzy = new GTRecord(info);
+ BitSet bitSet = new BitSet(info.getColumnCount());
+ for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
+ bitSet.set(entry.getKey());
+ fuzzy.set(entry.getKey(), entry.getValue());
+ }
+ fuzzy.maskForEqualHashComp(new ImmutableBitSet(bitSet));
+ result.add(fuzzy);
+ }
+ return result;
}
private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
@@ -194,7 +247,7 @@ public class GTScanRangePlanner {
Collections.sort(ranges, new Comparator<GTScanRange>() {
@Override
public int compare(GTScanRange a, GTScanRange b) {
- return recordUnknownIsSmaller.compare(a.pkStart, b.pkStart);
+ return rangeStartComparator.compare(a.pkStart, b.pkStart);
}
});
@@ -202,13 +255,12 @@ public class GTScanRangePlanner {
List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
int mergeBeginIndex = 0;
GTRecord mergeEnd = ranges.get(0).pkEnd;
- for (int index = 0; index < ranges.size(); index++) {
+ for (int index = 1; index < ranges.size(); index++) {
GTScanRange range = ranges.get(index);
// if overlap, swallow it
- if (recordUnknownIsSmaller.min(range.pkStart, mergeEnd) == range.pkStart //
- || recordUnknownIsBigger.max(mergeEnd, range.pkStart) == mergeEnd) {
- mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+ if (rangeStartEndComparator.compare(range.pkStart, mergeEnd) <= 0) {
+ mergeEnd = rangeEndComparator.max(mergeEnd, range.pkEnd);
continue;
}
@@ -218,7 +270,7 @@ public class GTScanRangePlanner {
// start new split
mergeBeginIndex = index;
- mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+ mergeEnd = range.pkEnd;
}
// don't miss the last range
@@ -239,9 +291,9 @@ public class GTScanRangePlanner {
boolean hasNonFuzzyRange = false;
for (GTScanRange range : ranges) {
- hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty();
- newFuzzyKeys.addAll(range.hbaseFuzzyKeys);
- end = recordUnknownIsBigger.max(end, range.pkEnd);
+ hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty();
+ newFuzzyKeys.addAll(range.fuzzyKeys);
+ end = rangeEndComparator.max(end, range.pkEnd);
}
// if any range is non-fuzzy, then all fuzzy keys must be cleared
@@ -269,7 +321,7 @@ public class GTScanRangePlanner {
private TblColRef column;
private ByteArray begin = ByteArray.EMPTY;
private ByteArray end = ByteArray.EMPTY;
- private Set<ByteArray> equals;
+ private Set<ByteArray> valueSet;
public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
this.column = column;
@@ -277,16 +329,16 @@ public class GTScanRangePlanner {
switch (op) {
case EQ:
case IN:
- equals = new HashSet<ByteArray>(values);
+ valueSet = new HashSet<ByteArray>(values);
refreshBeginEndFromEquals();
break;
case LT:
case LTE:
- end = byteUnknownIsBigger.max(values);
+ end = rangeEndComparator.comparator.max(values);
break;
case GT:
case GTE:
- begin = byteUnknownIsSmaller.min(values);
+ begin = rangeStartComparator.comparator.min(values);
break;
case NEQ:
case NOTIN:
@@ -303,16 +355,16 @@ public class GTScanRangePlanner {
this.column = column;
this.begin = beginValue;
this.end = endValue;
- this.equals = equalValues;
+ this.valueSet = equalValues;
}
private void refreshBeginEndFromEquals() {
- if (equals.isEmpty()) {
+ if (valueSet.isEmpty()) {
begin = ByteArray.EMPTY;
end = ByteArray.EMPTY;
} else {
- begin = byteUnknownIsSmaller.min(equals);
- end = byteUnknownIsBigger.max(equals);
+ begin = rangeStartComparator.comparator.min(valueSet);
+ end = rangeEndComparator.comparator.max(valueSet);
}
}
@@ -321,8 +373,8 @@ public class GTScanRangePlanner {
}
public boolean satisfyNone() {
- if (equals != null) {
- return equals.isEmpty();
+ if (valueSet != null) {
+ return valueSet.isEmpty();
} else if (begin.array() != null && end.array() != null) {
return info.codeSystem.getComparator().compare(begin, end) > 0;
} else {
@@ -338,36 +390,36 @@ public class GTScanRangePlanner {
}
if (this.satisfyAll()) {
- copy(another.column, another.begin, another.end, another.equals);
+ copy(another.column, another.begin, another.end, another.valueSet);
return;
}
- if (this.equals != null && another.equals != null) {
- this.equals.retainAll(another.equals);
+ if (this.valueSet != null && another.valueSet != null) {
+ this.valueSet.retainAll(another.valueSet);
refreshBeginEndFromEquals();
return;
}
- if (this.equals != null) {
- this.equals = filter(this.equals, another.begin, another.end);
+ if (this.valueSet != null) {
+ this.valueSet = filter(this.valueSet, another.begin, another.end);
refreshBeginEndFromEquals();
return;
}
- if (another.equals != null) {
- this.equals = filter(another.equals, this.begin, this.end);
+ if (another.valueSet != null) {
+ this.valueSet = filter(another.valueSet, this.begin, this.end);
refreshBeginEndFromEquals();
return;
}
- this.begin = byteUnknownIsSmaller.max(this.begin, another.begin);
- this.end = byteUnknownIsBigger.min(this.end, another.end);
+ this.begin = rangeStartComparator.comparator.max(this.begin, another.begin);
+ this.end = rangeEndComparator.comparator.min(this.end, another.end);
}
private Set<ByteArray> filter(Set<ByteArray> equalValues, ByteArray beginValue, ByteArray endValue) {
Set<ByteArray> result = Sets.newHashSetWithExpectedSize(equalValues.size());
for (ByteArray v : equalValues) {
- if (byteUnknownIsSmaller.compare(beginValue, v) <= 0 && byteUnknownIsBigger.compare(v, endValue) <= 0) {
+ if (rangeStartEndComparator.comparator.compare(beginValue, v) <= 0 && rangeStartEndComparator.comparator.compare(v, endValue) <= 0) {
result.add(v);
}
}
@@ -375,10 +427,10 @@ public class GTScanRangePlanner {
}
public String toString() {
- if (equals == null) {
+ if (valueSet == null) {
return column.getName() + " between " + begin + " and " + end;
} else {
- return column.getName() + " in " + equals;
+ return column.getName() + " in " + valueSet;
}
}
}
@@ -424,40 +476,55 @@ public class GTScanRangePlanner {
}
}
- public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownSmaller(final IGTComparator comp) {
- return new ComparatorEx<ByteArray>() {
+ public static RecordComparator getRangeStartComparator(final IGTComparator comp) {
+ return new RecordComparator(new ComparatorEx<ByteArray>() {
@Override
public int compare(ByteArray a, ByteArray b) {
- if (a.array() == null)
- return -1;
- else if (b.array() == null)
+ if (a.array() == null) {
+ if (b.array() == null) {
+ return 0;
+ } else {
+ return -1;
+ }
+ } else if (b.array() == null) {
return 1;
- else
+ } else {
return comp.compare(a, b);
+ }
}
- };
+ });
}
- public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownBigger(final IGTComparator comp) {
- return new ComparatorEx<ByteArray>() {
+ public static RecordComparator getRangeEndComparator(final IGTComparator comp) {
+ return new RecordComparator(new ComparatorEx<ByteArray>() {
@Override
public int compare(ByteArray a, ByteArray b) {
- if (a.array() == null)
- return 1;
- else if (b.array() == null)
+ if (a.array() == null) {
+ if (b.array() == null) {
+ return 0;
+ } else {
+ return 1;
+ }
+ } else if (b.array() == null) {
return -1;
- else
+ } else {
return comp.compare(a, b);
+ }
}
- };
- }
-
- public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownSmaller(IGTComparator comp) {
- return new RecordComparator(byteComparatorTreatsUnknownSmaller(comp));
+ });
}
- public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownBigger(IGTComparator comp) {
- return new RecordComparator(byteComparatorTreatsUnknownBigger(comp));
+ public static RecordComparator getRangeStartEndComparator(final IGTComparator comp) {
+ return new AsymmetricRecordComparator(new ComparatorEx<ByteArray>() {
+ @Override
+ public int compare(ByteArray a, ByteArray b) {
+ if (a.array() == null || b.array() == null) {
+ return -1;
+ } else {
+ return comp.compare(a, b);
+ }
+ }
+ });
}
private static class RecordComparator extends ComparatorEx<GTRecord> {
@@ -473,7 +540,7 @@ public class GTScanRangePlanner {
assert a.maskForEqualHashComp() == b.maskForEqualHashComp();
ImmutableBitSet mask = a.maskForEqualHashComp();
- int comp = 0;
+ int comp;
for (int i = 0; i < mask.trueBitCount(); i++) {
int c = mask.trueBitAt(i);
comp = comparator.compare(a.cols[c], b.cols[c]);
@@ -483,4 +550,35 @@ public class GTScanRangePlanner {
return 0; // equals
}
}
+
+ /**
+ * asymmetric means compare(a,b) > 0 does not cause compare(b,a) < 0
+ * so min max functions will not bu supported
+ */
+ private static class AsymmetricRecordComparator extends RecordComparator {
+
+ AsymmetricRecordComparator(ComparatorEx<ByteArray> byteComparator) {
+ super(byteComparator);
+ }
+
+ public GTRecord min(Collection<GTRecord> v) {
+ throw new UnsupportedOperationException();
+ }
+
+ public GTRecord max(Collection<GTRecord> v) {
+ throw new UnsupportedOperationException();
+ }
+
+ public GTRecord min(GTRecord a, GTRecord b) {
+ throw new UnsupportedOperationException();
+ }
+
+ public GTRecord max(GTRecord a, GTRecord b) {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean between(GTRecord v, GTRecord start, GTRecord end) {
+ throw new UnsupportedOperationException();
+ }
+ }
}