You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:11 UTC
[03/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
new file mode 100644
index 0000000..97fe4ac
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionMapper;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RangeKeyDistributionMapperTest {
+
+ @SuppressWarnings("rawtypes")
+ MapDriver mapDriver;
+ String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
+
+ @Before
+ public void setUp() {
+ RangeKeyDistributionMapper mapper = new RangeKeyDistributionMapper();
+ mapDriver = MapDriver.newMapDriver(mapper);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMapperWithoutHeader() throws IOException {
+
+ Text inputKey1 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey2 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 122, 1, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey3 = new Text(new byte[] { 2, 2, 2, 2, 2, 2, 2, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey4 = new Text(new byte[] { 3, 3, 3, 3, 3, 3, 3, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey5 = new Text(new byte[] { 4, 4, 4, 4, 4, 4, 4, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey6 = new Text(new byte[] { 5, 5, 5, 5, 5, 5, 5, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey7 = new Text(new byte[] { 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+
+ mapDriver.addInput(inputKey1, new Text("abc"));
+ mapDriver.addInput(inputKey2, new Text("abc"));
+ mapDriver.addInput(inputKey3, new Text("abc"));
+ mapDriver.addInput(inputKey4, new Text("abc"));
+ mapDriver.addInput(inputKey5, new Text("abc"));
+ mapDriver.addInput(inputKey6, new Text("abc"));
+ mapDriver.addInput(inputKey7, new Text("abc"));
+
+ List<Pair<Text, LongWritable>> result = mapDriver.run();
+
+ assertEquals(1, result.size());
+
+ byte[] key1 = result.get(0).getFirst().getBytes();
+ LongWritable value1 = result.get(0).getSecond();
+ assertArrayEquals(new byte[] { 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 }, key1);
+ assertEquals(147, value1.get());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMapperWithHeader() throws IOException {
+
+ Text inputKey1 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey2 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 0, 0, 0, 0, 0, 0, 0, 127, 11, 122, 1, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey3 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 2, 2, 2, 2, 2, 2, 2, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey4 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 3, 3, 3, 3, 3, 3, 3, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey5 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 4, 4, 4, 4, 4, 4, 4, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey6 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 5, 5, 5, 5, 5, 5, 5, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+ Text inputKey7 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+
+ mapDriver.addInput(inputKey1, new Text("abc"));
+ mapDriver.addInput(inputKey2, new Text("abc"));
+ mapDriver.addInput(inputKey3, new Text("abc"));
+ mapDriver.addInput(inputKey4, new Text("abc"));
+ mapDriver.addInput(inputKey5, new Text("abc"));
+ mapDriver.addInput(inputKey6, new Text("abc"));
+ mapDriver.addInput(inputKey7, new Text("abc"));
+
+ List<Pair<Text, LongWritable>> result = mapDriver.run();
+
+ assertEquals(1, result.size());
+
+ byte[] key1 = result.get(0).getFirst().getBytes();
+ LongWritable value1 = result.get(0).getSecond();
+ assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 }, key1);
+ assertEquals(273, value1.get());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
new file mode 100644
index 0000000..cbf0657
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionReducer;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RangeKeyDistributionReducerTest {
+
+ ReduceDriver<Text, LongWritable, Text, LongWritable> reduceDriver;
+ String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
+
+ @Before
+ public void setUp() {
+ RangeKeyDistributionReducer reducer = new RangeKeyDistributionReducer();
+ reduceDriver = ReduceDriver.newReduceDriver(reducer);
+ }
+
+ @Test
+ public void testReducer() throws IOException {
+ // TODO
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/pom.xml
----------------------------------------------------------------------
diff --git a/storage/pom.xml b/storage/pom.xml
index e557363..efc1a4b 100644
--- a/storage/pom.xml
+++ b/storage/pom.xml
@@ -40,11 +40,6 @@
<artifactId>kylin-storage-hbase</artifactId>
<version>${project.parent.version}</version>
</dependency>
- <dependency>
- <groupId>net.sf.ehcache</groupId>
- <artifactId>ehcache</artifactId>
- <version>2.8.1</version>
- </dependency>
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
deleted file mode 100644
index 179202e..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.kylin.storage;
-
-import com.google.common.collect.Range;
-
-/**
- */
-public interface ICachableStorageQuery extends IStorageQuery {
- /**
- *
- * being dynamic => getVolatilePeriod() return not null
- * being dynamic => partition column of its realization not null
- *
- * @return true for static storage like cubes
- * false for dynamic storage like II
- */
- boolean isDynamic();
-
- /**
- * volatile period is the period of time in which the returned data is not stable
- * e.g. inverted index's last several minutes' data is dynamic as time goes by.
- * data in this period cannot be cached
- *
- * This method should not be called before ITupleIterator.close() is called
- *
- * @return null if the underlying storage guarantees the data is static
- */
- Range<Long> getVolatilePeriod();
-
- /**
- * get the uuid for the realization that serves this query
- */
- String getStorageUUID();
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorage.java b/storage/src/main/java/org/apache/kylin/storage/IStorage.java
deleted file mode 100644
index 89b96e9..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/IStorage.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.metadata.realization.IRealization;
-
-public interface IStorage {
-
- public IStorageQuery createStorageQuery(IRealization realization);
-
- public <I> I adaptToBuildEngine(Class<I> engineInterface);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
deleted file mode 100644
index f090ebb..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-
-/**
- *
- * @author xjiang
- *
- */
-public interface IStorageQuery {
-
- ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo);
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
deleted file mode 100644
index caa2439..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-
-/**
- * @author xjiang
- */
-public class StorageContext {
-
- public static final int HARD_THRESHOLD = 4000000;
- public static final int DEFAULT_THRESHOLD = 1000000;
-
- public enum OrderEnum {
- ASCENDING, DESCENDING
- }
-
- private String connUrl;
- private int threshold;
- private int limit;
- private boolean hasSort;
- private List<MeasureDesc> sortMeasures;
- private List<OrderEnum> sortOrders;
- private boolean acceptPartialResult;
-
- private boolean exactAggregation;
- private boolean enableLimit;
- private boolean enableCoprocessor;
-
- private long totalScanCount;
- private Cuboid cuboid;
- private boolean partialResultReturned;
-
- public StorageContext() {
- this.threshold = DEFAULT_THRESHOLD;
- this.limit = DEFAULT_THRESHOLD;
- this.totalScanCount = 0;
- this.cuboid = null;
- this.hasSort = false;
- this.sortOrders = new ArrayList<OrderEnum>();
- this.sortMeasures = new ArrayList<MeasureDesc>();
-
- this.exactAggregation = false;
- this.enableLimit = false;
- this.enableCoprocessor = false;
-
- this.acceptPartialResult = false;
- this.partialResultReturned = false;
- }
-
- public String getConnUrl() {
- return connUrl;
- }
-
- public void setConnUrl(String connUrl) {
- this.connUrl = connUrl;
- }
-
- public int getThreshold() {
- return threshold;
- }
-
- public void setThreshold(int t) {
- threshold = Math.min(t, HARD_THRESHOLD);
- }
-
- public int getLimit() {
- return limit;
- }
-
- public void setLimit(int l) {
- this.limit = l;
- }
-
- public void enableLimit() {
- this.enableLimit = true;
- }
-
- public boolean isLimitEnabled() {
- return this.enableLimit;
- }
-
- public void addSort(MeasureDesc measure, OrderEnum order) {
- if (measure != null) {
- sortMeasures.add(measure);
- sortOrders.add(order);
- }
- }
-
- public void markSort() {
- this.hasSort = true;
- }
-
- public boolean hasSort() {
- return this.hasSort;
- }
-
- public void setCuboid(Cuboid c) {
- cuboid = c;
- }
-
- public Cuboid getCuboid() {
- return cuboid;
- }
-
- public long getTotalScanCount() {
- return totalScanCount;
- }
-
- public void setTotalScanCount(long totalScanCount) {
- this.totalScanCount = totalScanCount;
- }
-
- public boolean isAcceptPartialResult() {
- return acceptPartialResult;
- }
-
- public void setAcceptPartialResult(boolean acceptPartialResult) {
- this.acceptPartialResult = acceptPartialResult;
- }
-
- public boolean isPartialResultReturned() {
- return partialResultReturned;
- }
-
- public void setPartialResultReturned(boolean partialResultReturned) {
- this.partialResultReturned = partialResultReturned;
- }
-
- public void setExactAggregation(boolean isExactAggregation) {
- this.exactAggregation = isExactAggregation;
- }
-
- public boolean isExactAggregation() {
- return this.exactAggregation;
- }
-
- public void enableCoprocessor() {
- this.enableCoprocessor = true;
- }
-
- public boolean isCoprocessorEnabled() {
- return this.enableCoprocessor;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
deleted file mode 100644
index 2e07c24..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ /dev/null
@@ -1,287 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.IGTCodeSystem;
-import org.apache.kylin.storage.gridtable.IGTComparator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by shaoshi on 3/23/15.
- * This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check
- * its data type to serialize/deserialize it;
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class CubeCodeSystem implements IGTCodeSystem {
- private static final Logger logger = LoggerFactory.getLogger(CubeCodeSystem.class);
-
- // ============================================================================
-
- private GTInfo info;
- private Map<Integer, Dictionary> dictionaryMap; // column index ==> dictionary of column
- private Map<Integer, Integer> fixLenMap; // column index ==> fixed length of column
- private Map<Integer, Integer> dependentMetricsMap;
- private IGTComparator comparator;
- private DataTypeSerializer[] serializers;
-
- public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap) {
- this(dictionaryMap, Collections.<Integer, Integer>emptyMap(), Collections.<Integer, Integer>emptyMap());
- }
-
- public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap, Map<Integer, Integer> fixLenMap, Map<Integer, Integer> dependentMetricsMap) {
- this.dictionaryMap = dictionaryMap;
- this.fixLenMap = fixLenMap;
- this.dependentMetricsMap = dependentMetricsMap;
- }
-
- @Override
- public void init(GTInfo info) {
- this.info = info;
-
- serializers = new DataTypeSerializer[info.getColumnCount()];
- for (int i = 0; i < info.getColumnCount(); i++) {
- // dimension with dictionary
- if (dictionaryMap.get(i) != null) {
- serializers[i] = new DictionarySerializer(dictionaryMap.get(i));
- }
- // dimension of fixed length
- else if (fixLenMap.get(i) != null) {
- serializers[i] = new FixLenSerializer(fixLenMap.get(i));
- }
- // metrics
- else {
- serializers[i] = DataTypeSerializer.create(info.getColumnType(i));
- }
- }
-
- this.comparator = new IGTComparator() {
- @Override
- public boolean isNull(ByteArray code) {
- // all 0xff is null
- byte[] array = code.array();
- for (int i = 0, j = code.offset(), n = code.length(); i < n; i++, j++) {
- if (array[j] != Dictionary.NULL)
- return false;
- }
- return true;
- }
-
- @Override
- public int compare(ByteArray code1, ByteArray code2) {
- return code1.compareTo(code2);
- }
- };
- }
-
- @Override
- public IGTComparator getComparator() {
- return comparator;
- }
-
- @Override
- public int codeLength(int col, ByteBuffer buf) {
- return serializers[col].peekLength(buf);
- }
-
- @Override
- public int maxCodeLength(int col) {
- return serializers[col].maxLength();
- }
-
- @Override
- public void encodeColumnValue(int col, Object value, ByteBuffer buf) {
- encodeColumnValue(col, value, 0, buf);
- }
-
- @Override
- public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
- // this is a bit too complicated, but encoding only happens once at build time, so it is OK
- DataTypeSerializer serializer = serializers[col];
- try {
- if (serializer instanceof DictionarySerializer) {
- ((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf);
- } else {
- serializer.serialize(value, buf);
- }
- } catch (ClassCastException ex) {
- // try convert string into a correct object type
- try {
- if (value instanceof String) {
- Object converted = serializer.valueOf((String) value);
- if ((converted instanceof String) == false) {
- encodeColumnValue(col, converted, roundingFlag, buf);
- return;
- }
- }
- } catch (Throwable e) {
- logger.error("Fail to encode value '" + value + "'", e);
- }
- throw ex;
- }
- }
-
- @Override
- public Object decodeColumnValue(int col, ByteBuffer buf) {
- return serializers[col].deserialize(buf);
- }
-
- @Override
- public MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions) {
- assert columns.trueBitCount() == aggrFunctions.length;
-
- MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length];
- for (int i = 0; i < result.length; i++) {
- int col = columns.trueBitAt(i);
- result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString());
- }
-
- // deal with holistic distinct count
- if (dependentMetricsMap != null) {
- for (Integer child : dependentMetricsMap.keySet()) {
- if (columns.get(child)) {
- Integer parent = dependentMetricsMap.get(child);
- if (columns.get(parent) == false)
- throw new IllegalStateException();
-
- int childIdx = columns.trueBitIndexOf(child);
- int parentIdx = columns.trueBitIndexOf(parent);
- result[childIdx].setDependentAggregator(result[parentIdx]);
- }
- }
- }
-
- return result;
- }
-
- static class DictionarySerializer extends DataTypeSerializer {
- private Dictionary dictionary;
-
- DictionarySerializer(Dictionary dictionary) {
- this.dictionary = dictionary;
- }
-
- public void serializeWithRounding(Object value, int roundingFlag, ByteBuffer buf) {
- int id = dictionary.getIdFromValue(value, roundingFlag);
- BytesUtil.writeUnsigned(id, dictionary.getSizeOfId(), buf);
- }
-
- @Override
- public void serialize(Object value, ByteBuffer buf) {
- int id = dictionary.getIdFromValue(value);
- BytesUtil.writeUnsigned(id, dictionary.getSizeOfId(), buf);
- }
-
- @Override
- public Object deserialize(ByteBuffer in) {
- int id = BytesUtil.readUnsigned(in, dictionary.getSizeOfId());
- return dictionary.getValueFromId(id);
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- return dictionary.getSizeOfId();
- }
-
- @Override
- public int maxLength() {
- return dictionary.getSizeOfId();
- }
-
- @Override
- public Object valueOf(byte[] value) {
- throw new UnsupportedOperationException();
- }
- }
-
- static class FixLenSerializer extends DataTypeSerializer {
-
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
-
- private int fixLen;
-
- FixLenSerializer(int fixLen) {
- this.fixLen = fixLen;
- }
-
- private byte[] currentBuf() {
- byte[] buf = current.get();
- if (buf == null) {
- buf = new byte[fixLen];
- current.set(buf);
- }
- return buf;
- }
-
- @Override
- public void serialize(Object value, ByteBuffer out) {
- byte[] buf = currentBuf();
- if (value == null) {
- Arrays.fill(buf, Dictionary.NULL);
- out.put(buf);
- } else {
- byte[] bytes = Bytes.toBytes(value.toString());
- if (bytes.length > fixLen) {
- throw new IllegalStateException("Expect at most " + fixLen + " bytes, but got " + bytes.length + ", value string: " + value.toString());
- }
- out.put(bytes);
- for (int i = bytes.length; i < fixLen; i++) {
- out.put(RowConstants.ROWKEY_PLACE_HOLDER_BYTE);
- }
- }
- }
-
- @Override
- public Object deserialize(ByteBuffer in) {
- byte[] buf = currentBuf();
- in.get(buf);
-
- int tail = fixLen;
- while (tail > 0 && (buf[tail - 1] == RowConstants.ROWKEY_PLACE_HOLDER_BYTE || buf[tail - 1] == Dictionary.NULL)) {
- tail--;
- }
-
- if (tail == 0) {
- return buf[0] == Dictionary.NULL ? null : "";
- }
-
- return Bytes.toString(buf, 0, tail);
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- return fixLen;
- }
-
- @Override
- public int maxLength() {
- return fixLen;
- }
-
- @Override
- public Object valueOf(byte[] value) {
- try {
- return new String(value, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- // does not happen
- throw new RuntimeException(e);
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
deleted file mode 100644
index f792402..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTInfo;
-
-import com.google.common.collect.Maps;
-
-@SuppressWarnings("rawtypes")
-public class CubeGridTable {
-
- public static Map<TblColRef, Dictionary<?>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) {
- CubeDesc cubeDesc = cubeSeg.getCubeDesc();
- CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-
- // build a dictionary map
- Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
- List<TblColRef> dimCols = Cuboid.findById(cubeDesc, cuboidId).getColumns();
- for (TblColRef col : dimCols) {
- Dictionary<?> dictionary = cubeMgr.getDictionary(cubeSeg, col);
- if (dictionary != null) {
- dictionaryMap.put(col, dictionary);
- }
- }
- return dictionaryMap;
- }
-
- public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) {
- Map<TblColRef, Dictionary<?>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId);
- return newGTInfo(cubeSeg.getCubeDesc(), cuboidId, dictionaryMap);
- }
-
- public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<?>> dictionaryMap) {
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
- CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
-
- Map<Integer, Dictionary> dictionaryByColIdx = Maps.newHashMap();
- Map<Integer, Integer> fixLenByColIdx = Maps.newHashMap();
-
- for (TblColRef dim : cuboid.getColumns()) {
- int colIndex = mapping.getIndexOf(dim);
- if (cubeDesc.getRowkey().isUseDictionary(dim)) {
- Dictionary dict = dictionaryMap.get(dim);
- dictionaryByColIdx.put(colIndex, dict);
- } else {
- int len = cubeDesc.getRowkey().getColumnLength(dim);
- if (len == 0)
- throw new IllegalStateException();
-
- fixLenByColIdx.put(colIndex, len);
- }
- }
-
- GTInfo.Builder builder = GTInfo.builder();
- builder.setTableName("Cuboid " + cuboidId);
- builder.setCodeSystem(new CubeCodeSystem(dictionaryByColIdx, fixLenByColIdx, mapping.getDependentMetricsMap()));
- builder.setColumns(mapping.getDataTypes());
- builder.setPrimaryKey(mapping.getPrimaryKey());
- builder.enableColumnBlock(mapping.getColumnBlocks());
- return builder.build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
index e4b8657..8001fbd 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
@@ -21,12 +21,12 @@ import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTRowBlock.Writer;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.gridtable.GTRowBlock.Writer;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
index 038df9d..6af83d1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
@@ -11,17 +11,19 @@ import java.util.Set;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRawScanner;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRangePlanner;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRawScanner;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRange;
-import org.apache.kylin.storage.gridtable.GTScanRangePlanner;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GTUtil;
-import org.apache.kylin.storage.gridtable.IGTScanner;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
index 7e64975..0657a69 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
@@ -29,10 +29,10 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.tuple.Tuple;
import org.apache.kylin.storage.tuple.TupleInfo;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
deleted file mode 100644
index 0bf4573..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
+++ /dev/null
@@ -1,172 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class CuboidToGridTableMapping {
-
- final private Cuboid cuboid;
-
- private List<DataType> gtDataTypes;
- private List<ImmutableBitSet> gtColBlocks;
-
- private int nDimensions;
- private Map<TblColRef, Integer> dim2gt;
- private ImmutableBitSet gtPrimaryKey;
-
- private int nMetrics;
- private ListMultimap<FunctionDesc, Integer> metrics2gt; // because count distinct may have a holistic version
-
- public CuboidToGridTableMapping(Cuboid cuboid) {
- this.cuboid = cuboid;
- init();
- }
-
- private void init() {
- int gtColIdx = 0;
- gtDataTypes = Lists.newArrayList();
- gtColBlocks = Lists.newArrayList();
-
- // dimensions
- dim2gt = Maps.newHashMap();
- BitSet pk = new BitSet();
- for (TblColRef dimension : cuboid.getColumns()) {
- gtDataTypes.add(dimension.getType());
- dim2gt.put(dimension, gtColIdx);
- pk.set(gtColIdx);
- gtColIdx++;
- }
- gtPrimaryKey = new ImmutableBitSet(pk);
- gtColBlocks.add(gtPrimaryKey);
-
- nDimensions = gtColIdx;
- assert nDimensions == cuboid.getColumns().size();
-
- // metrics
- metrics2gt = LinkedListMultimap.create();
- for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- BitSet colBlock = new BitSet();
- for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
- // Count distinct & holistic count distinct are equals() but different.
- // Ensure the holistic version if exists is always the first.
- FunctionDesc func = measure.getFunction();
- if (func.isHolisticCountDistinct()) {
- List<Integer> existing = metrics2gt.removeAll(func);
- metrics2gt.put(func, gtColIdx);
- metrics2gt.putAll(func, existing);
- } else {
- metrics2gt.put(func, gtColIdx);
- }
- gtDataTypes.add(func.getReturnDataType());
- colBlock.set(gtColIdx);
- gtColIdx++;
- }
- gtColBlocks.add(new ImmutableBitSet(colBlock));
- }
- }
- nMetrics = gtColIdx - nDimensions;
- assert nMetrics == cuboid.getCube().getMeasures().size();
- }
-
- public int getColumnCount() {
- return nDimensions + nMetrics;
- }
-
- public int getDimensionCount() {
- return nDimensions;
- }
-
- public int getMetricsCount() {
- return nMetrics;
- }
-
- public DataType[] getDataTypes() {
- return (DataType[]) gtDataTypes.toArray(new DataType[gtDataTypes.size()]);
- }
-
- public ImmutableBitSet getPrimaryKey() {
- return gtPrimaryKey;
- }
-
- public ImmutableBitSet[] getColumnBlocks() {
- return (ImmutableBitSet[]) gtColBlocks.toArray(new ImmutableBitSet[gtColBlocks.size()]);
- }
-
- public int getIndexOf(TblColRef dimension) {
- Integer i = dim2gt.get(dimension);
- return i == null ? -1 : i.intValue();
- }
-
- public int getIndexOf(FunctionDesc metric) {
- List<Integer> list = metrics2gt.get(metric);
- // normal case
- if (list.size() == 1) {
- return list.get(0);
- }
- // count distinct & its holistic version
- else if (list.size() == 2) {
- assert metric.isCountDistinct();
- return metric.isHolisticCountDistinct() ? list.get(0) : list.get(1);
- }
- // unexpected
- else
- return -1;
- }
-
- public List<TblColRef> getCuboidDimensionsInGTOrder() {
- return cuboid.getColumns();
- }
-
- public Map<Integer, Integer> getDependentMetricsMap() {
- Map<Integer, Integer> result = Maps.newHashMap();
- List<MeasureDesc> measures = cuboid.getCube().getMeasures();
- for (MeasureDesc child : measures) {
- if (child.getDependentMeasureRef() != null) {
- boolean ok = false;
- for (MeasureDesc parent : measures) {
- if (parent.getName().equals(child.getDependentMeasureRef())) {
- int childIndex = getIndexOf(child.getFunction());
- int parentIndex = getIndexOf(parent.getFunction());
- result.put(childIndex, parentIndex);
- ok = true;
- break;
- }
- }
- if (!ok)
- throw new IllegalStateException("Cannot find dependent measure: " + child.getDependentMeasureRef());
- }
- }
- return result.isEmpty() ? Collections.<Integer, Integer>emptyMap() : result;
- }
-
- public static MeasureDesc[] getMeasureSequenceOnGridTable(CubeDesc cube) {
- MeasureDesc[] result = new MeasureDesc[cube.getMeasures().size()];
- int i = 0;
- for (HBaseColumnFamilyDesc familyDesc : cube.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- for (MeasureDesc m : hbaseColDesc.getMeasures()) {
- result[i++] = m;
- }
- }
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
index 10ff129..366e3c3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
@@ -7,11 +7,11 @@ import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.tuple.Tuple;
import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
deleted file mode 100644
index 25b217c..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ /dev/null
@@ -1,268 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.measure.HLLCAggregator;
-import org.apache.kylin.metadata.measure.LDCAggregator;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class GTAggregateScanner implements IGTScanner {
-
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
-
- final GTInfo info;
- final ImmutableBitSet dimensions; // dimensions to return, can be more than group by
- final ImmutableBitSet groupBy;
- final ImmutableBitSet metrics;
- final String[] metricsAggrFuncs;
- final IGTScanner inputScanner;
- final AggregationCache aggrCache;
-
- public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
- if (req.hasAggregation() == false)
- throw new IllegalStateException();
-
- this.info = inputScanner.getInfo();
- this.dimensions = req.getColumns().andNot(req.getAggrMetrics());
- this.groupBy = req.getAggrGroupBy();
- this.metrics = req.getAggrMetrics();
- this.metricsAggrFuncs = req.getAggrMetricsFuncs();
- this.inputScanner = inputScanner;
- this.aggrCache = new AggregationCache();
- }
-
- @Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public int getScannedRowCount() {
- return inputScanner.getScannedRowCount();
- }
-
- @Override
- public int getScannedRowBlockCount() {
- return inputScanner.getScannedRowBlockCount();
- }
-
- @Override
- public void close() throws IOException {
- inputScanner.close();
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- for (GTRecord r : inputScanner) {
- aggrCache.aggregate(r);
- }
- return aggrCache.iterator();
- }
-
- /** return the estimate memory size of aggregation cache */
- public long getEstimateSizeOfAggrCache() {
- return aggrCache.esitmateMemSize();
- }
-
- public Object[] getTotalSumForSanityCheck() {
- return aggrCache.calculateTotalSumSanityCheck();
- }
-
- class AggregationCache {
- final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
- final int keyLength;
- final boolean[] compareMask;
-
- public AggregationCache() {
- compareMask = createCompareMask();
- keyLength = compareMask.length;
- aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
- @Override
- public int compare(byte[] o1, byte[] o2) {
- int result = 0;
- // profiler shows this check is slow
- // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
- for (int i = 0; i < keyLength; ++i) {
- if (compareMask[i]) {
- int a = (o1[i] & 0xff);
- int b = (o2[i] & 0xff);
- result = a - b;
- if (result == 0) {
- continue;
- } else {
- return result;
- }
- }
- }
- return result;
- }
- });
- }
-
- private boolean[] createCompareMask() {
- int keyLength = 0;
- for (int i = 0; i < dimensions.trueBitCount(); i++) {
- int c = dimensions.trueBitAt(i);
- int l = info.codeSystem.maxCodeLength(c);
- keyLength += l;
- }
-
- boolean[] mask = new boolean[keyLength];
- int p = 0;
- for (int i = 0; i < dimensions.trueBitCount(); i++) {
- int c = dimensions.trueBitAt(i);
- int l = info.codeSystem.maxCodeLength(c);
- boolean m = groupBy.get(c) ? true : false;
- for (int j = 0; j < l; j++) {
- mask[p++] = m;
- }
- }
- return mask;
- }
-
- private byte[] createKey(GTRecord record) {
- byte[] result = new byte[keyLength];
- int offset = 0;
- for (int i = 0; i < dimensions.trueBitCount(); i++) {
- int c = dimensions.trueBitAt(i);
- final ByteArray byteArray = record.cols[c];
- final int columnLength = info.codeSystem.maxCodeLength(c);
- System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, byteArray.length());
- offset += columnLength;
- }
- assert offset == result.length;
- return result;
- }
-
- void aggregate(GTRecord r) {
- final byte[] key = createKey(r);
- MeasureAggregator[] aggrs = aggBufMap.get(key);
- if (aggrs == null) {
- aggrs = newAggregators();
- aggBufMap.put(key, aggrs);
- }
- for (int i = 0; i < aggrs.length; i++) {
- int col = metrics.trueBitAt(i);
- Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
- aggrs[i].aggregate(metrics);
- }
- }
-
- private MeasureAggregator[] newAggregators() {
- return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
- }
-
- public Object[] calculateTotalSumSanityCheck() {
- MeasureAggregator[] totalSum = newAggregators();
-
- // skip expensive aggregation
- for (int i = 0; i < totalSum.length; i++) {
- if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator)
- totalSum[i] = null;
- }
-
- for (MeasureAggregator[] entry : aggBufMap.values()) {
- for (int i = 0; i < totalSum.length; i++) {
- if (totalSum[i] != null)
- totalSum[i].aggregate(entry[i].getState());
- }
- }
- Object[] result = new Object[totalSum.length];
- for (int i = 0; i < totalSum.length; i++) {
- if (totalSum[i] != null)
- result[i] = totalSum[i].getState();
- }
- return result;
- }
-
- public long esitmateMemSize() {
- if (aggBufMap.isEmpty())
- return 0;
-
- byte[] sampleKey = aggBufMap.firstKey();
- MeasureAggregator<?>[] sampleValue = aggBufMap.get(sampleKey);
- return estimateSizeOfAggrCache(sampleKey, sampleValue, aggBufMap.size());
- }
-
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
-
- final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
- final GTRecord secondRecord = new GTRecord(info);
-
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override
- public GTRecord next() {
- Entry<byte[], MeasureAggregator[]> entry = it.next();
- create(entry.getKey(), entry.getValue());
- return secondRecord;
- }
-
- private void create(byte[] key, MeasureAggregator[] value) {
- int offset = 0;
- for (int i = 0; i < dimensions.trueBitCount(); i++) {
- int c = dimensions.trueBitAt(i);
- final int columnLength = info.codeSystem.maxCodeLength(c);
- secondRecord.set(c, new ByteArray(key, offset, columnLength));
- offset += columnLength;
- }
- metricsBuf.clear();
- for (int i = 0; i < value.length; i++) {
- int col = metrics.trueBitAt(i);
- int pos = metricsBuf.position();
- info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
- secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- }
-
- public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
- // Aggregation cache is basically a tree map. The tree map entry overhead is
- // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
- // - 41~52 according to AggregationCacheMemSizeTest
- return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
- }
-
- public static long estimateSizeOf(MeasureAggregator[] aggrs) {
- // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
- // Memory alignment to 8 bytes
- long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
- for (MeasureAggregator aggr : aggrs) {
- if (aggr != null)
- est += aggr.getMemBytesEstimate();
- }
- return est;
- }
-
- public static long estimateSizeOf(byte[] bytes) {
- // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
- // Memory alignment to 8 bytes
- return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
deleted file mode 100644
index 7552ab3..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-
-import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreWriter;
-
-public class GTBuilder implements Closeable, Flushable {
-
- @SuppressWarnings("unused")
- final private GTInfo info;
- final private IGTStoreWriter storeWriter;
-
- final private GTRowBlock block;
- final private GTRowBlock.Writer blockWriter;
-
- private int writtenRowCount;
- private int writtenRowBlockCount;
-
- GTBuilder(GTInfo info, int shard, IGTStore store) throws IOException {
- this(info, shard, store, false);
- }
-
- GTBuilder(GTInfo info, int shard, IGTStore store, boolean append) throws IOException {
- this.info = info;
-
- block = GTRowBlock.allocate(info);
- blockWriter = block.getWriter();
- if (append) {
- storeWriter = store.append(shard, blockWriter);
- if (block.isFull()) {
- blockWriter.clearForNext();
- }
- } else {
- storeWriter = store.rebuild(shard);
- }
- }
-
- public void write(GTRecord r) throws IOException {
- blockWriter.append(r);
- writtenRowCount++;
-
- if (block.isFull()) {
- flush();
- }
- }
-
- @Override
- public void flush() throws IOException {
- blockWriter.readyForFlush();
- storeWriter.write(block);
- writtenRowBlockCount++;
- if (block.isFull()) {
- blockWriter.clearForNext();
- }
- }
-
- @Override
- public void close() throws IOException {
- if (block.isEmpty() == false) {
- flush();
- }
- storeWriter.close();
- }
-
- public int getWrittenRowCount() {
- return writtenRowCount;
- }
-
- public int getWrittenRowBlockCount() {
- return writtenRowBlockCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
deleted file mode 100644
index 42b577a..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.IOException;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.storage.gridtable.diskstore.GTDiskStore;
-import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class GTComboStore implements IGTStore {
-
- private static final Logger logger = LoggerFactory.getLogger(GTComboStore.class);
-
- private final GTInfo gtInfo;
-
- private void convert(IGTStore input, IGTStore output) throws IOException {
- final IGTStoreScanner scanner = input.scan(null, null, null, null);
- final IGTStoreWriter writer = output.rebuild(-1);
- while (scanner.hasNext()) {
- writer.write(scanner.next());
- }
- }
-
- private GTDiskStore gtDiskStore;
- private GTSimpleMemStore gtSimpleMemStore;
-
- public GTComboStore(GTInfo gtInfo) {
- this(gtInfo, true);
- }
-
- public GTComboStore(GTInfo gtInfo, boolean useMemStore) {
- this.gtInfo = gtInfo;
- if (useMemStore) {
- this.gtSimpleMemStore = new GTSimpleMemStore(gtInfo);
- } else {
- this.gtDiskStore = new GTDiskStore(gtInfo);
- }
- }
-
- @Override
- public GTInfo getInfo() {
- return gtInfo;
- }
-
- private IGTStore getCurrent() {
- if (gtSimpleMemStore != null) {
- return gtSimpleMemStore;
- } else {
- return gtDiskStore;
- }
- }
-
- public long memoryUsage() {
- if (gtSimpleMemStore != null) {
- return gtSimpleMemStore.memoryUsage();
- } else {
- return gtDiskStore.memoryUsage();
- }
- }
-
- public void switchToMemStore() {
- try {
- if (gtSimpleMemStore == null) {
- gtSimpleMemStore = new GTSimpleMemStore(gtInfo);
- convert(gtDiskStore, gtSimpleMemStore);
- gtDiskStore.drop();
- gtDiskStore = null;
- }
- } catch (IOException e) {
- logger.error("fail to switch to mem store", e);
- throw new RuntimeException(e);
- }
- }
-
- public void switchToDiskStore() {
- try {
- if (gtDiskStore == null) {
- gtDiskStore = new GTDiskStore(gtInfo);
- convert(gtSimpleMemStore, gtDiskStore);
- gtSimpleMemStore.drop();
- gtSimpleMemStore = null;
- }
- } catch (IOException e) {
- logger.error("fail to switch to disk store", e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public IGTStoreWriter rebuild(int shard) throws IOException {
- return getCurrent().rebuild(shard);
- }
-
- @Override
- public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
- return getCurrent().append(shard, fillLast);
- }
-
- @Override
- public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
- return getCurrent().scan(pkStart, pkEnd, selectedColBlocks, additionalPushDown);
- }
-
- public void drop() throws IOException {
- if (gtSimpleMemStore != null) {
- gtSimpleMemStore.drop();
- }
- if (gtDiskStore != null) {
- gtDiskStore.drop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
deleted file mode 100644
index d60e4ba..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.metadata.filter.IFilterCodeSystem;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
-
-public class GTFilterScanner implements IGTScanner {
-
- final private IGTScanner inputScanner;
- final private TupleFilter filter;
- final private IEvaluatableTuple oneTuple; // avoid instance creation
-
- private GTRecord next = null;
-
- public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
- this.inputScanner = inputScanner;
- this.filter = req.getFilterPushDown();
- this.oneTuple = new IEvaluatableTuple() {
- @Override
- public Object getValue(TblColRef col) {
- return next.get(col.getColumnDesc().getZeroBasedIndex());
- }
- };
-
- if (TupleFilter.isEvaluableRecursively(filter) == false)
- throw new IllegalArgumentException();
- }
-
- @Override
- public GTInfo getInfo() {
- return inputScanner.getInfo();
- }
-
- @Override
- public int getScannedRowCount() {
- return inputScanner.getScannedRowCount();
- }
-
- @Override
- public int getScannedRowBlockCount() {
- return inputScanner.getScannedRowBlockCount();
- }
-
- @Override
- public void close() throws IOException {
- inputScanner.close();
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- private Iterator<GTRecord> inputIterator = inputScanner.iterator();
-
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
-
- IFilterCodeSystem<ByteArray> filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
-
- while (inputIterator.hasNext()) {
- next = inputIterator.next();
- if (filter != null && filter.evaluate(oneTuple, filterCodeSystem) == false) {
- continue;
- }
- return true;
- }
- next = null;
- return false;
- }
-
- @Override
- public GTRecord next() {
- // fetch next record
- if (next == null) {
- hasNext();
- if (next == null)
- throw new NoSuchElementException();
- }
-
- GTRecord result = next;
- next = null;
- return result;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
deleted file mode 100644
index 1c69f8e..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ /dev/null
@@ -1,246 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-
-public class GTInfo {
-
- public static Builder builder() {
- return new Builder();
- }
-
- String tableName;
- IGTCodeSystem codeSystem;
-
- // column schema
- int nColumns;
- DataType[] colTypes;
- ImmutableBitSet colAll;
- ImmutableBitSet colPreferIndex;
- transient TblColRef[] colRefs;
-
- // grid info
- ImmutableBitSet primaryKey; // order by, uniqueness is not required
- ImmutableBitSet[] colBlocks; // primary key must be the first column block
- ImmutableBitSet colBlocksAll;
- int rowBlockSize; // 0: disable row block
-
- // sharding
- int nShards; // 0: no sharding
-
- // must create from builder
- private GTInfo() {
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public IGTCodeSystem getCodeSystem() {
- return codeSystem;
- }
-
- public int getColumnCount() {
- return nColumns;
- }
-
- public DataType getColumnType(int i) {
- return colTypes[i];
- }
-
- public ImmutableBitSet getPrimaryKey() {
- return primaryKey;
- }
-
- public boolean isShardingEnabled() {
- return nShards > 0;
- }
-
- public boolean isRowBlockEnabled() {
- return rowBlockSize > 0;
- }
-
- public int getRowBlockSize() {
- return rowBlockSize;
- }
-
- public int getMaxRecordLength() {
- return getMaxColumnLength(colAll);
- }
-
- public int getMaxColumnLength(ImmutableBitSet selectedCols) {
- int result = 0;
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- result += codeSystem.maxCodeLength(c);
- }
- return result;
- }
-
- public int getMaxColumnLength() {
- int max = 0;
- for (int i = 0; i < nColumns; i++)
- max = Math.max(max, codeSystem.maxCodeLength(i));
- return max;
- }
-
- public ImmutableBitSet selectColumnBlocks(ImmutableBitSet columns) {
- if (columns == null)
- columns = colAll;
-
- BitSet result = new BitSet();
- for (int i = 0; i < colBlocks.length; i++) {
- ImmutableBitSet cb = colBlocks[i];
- if (cb.intersects(columns)) {
- result.set(i);
- }
- }
- return new ImmutableBitSet(result);
- }
-
- public TblColRef colRef(int i) {
- if (colRefs == null) {
- colRefs = new TblColRef[nColumns];
- }
- if (colRefs[i] == null) {
- colRefs[i] = GTUtil.tblColRef(i, colTypes[i].toString());
- }
- return colRefs[i];
- }
-
- public void validateColRef(TblColRef ref) {
- TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex());
- if (expected.equals(ref) == false)
- throw new IllegalArgumentException();
- }
-
- void validate() {
-
- if (codeSystem == null)
- throw new IllegalStateException();
-
- if (primaryKey.cardinality() == 0)
- throw new IllegalStateException();
-
- codeSystem.init(this);
-
- validateColumnBlocks();
- }
-
- private void validateColumnBlocks() {
- colAll = new ImmutableBitSet(0, nColumns);
-
- if (colBlocks == null) {
- colBlocks = new ImmutableBitSet[2];
- colBlocks[0] = primaryKey;
- colBlocks[1] = colAll.andNot(primaryKey);
- }
-
- colBlocksAll = new ImmutableBitSet(0, colBlocks.length);
-
- if (colPreferIndex == null)
- colPreferIndex = ImmutableBitSet.EMPTY;
-
- // column blocks must not overlap
- for (int i = 0; i < colBlocks.length; i++) {
- for (int j = i + 1; j < colBlocks.length; j++) {
- if (colBlocks[i].intersects(colBlocks[j]))
- throw new IllegalStateException();
- }
- }
-
- // column block must cover all columns
- ImmutableBitSet merge = ImmutableBitSet.EMPTY;
- for (int i = 0; i < colBlocks.length; i++) {
- merge = merge.or(colBlocks[i]);
- }
- if (merge.equals(colAll) == false)
- throw new IllegalStateException();
-
- // primary key must be the first column block
- if (primaryKey.equals(colBlocks[0]) == false)
- throw new IllegalStateException();
-
- // drop empty column block
- LinkedList<ImmutableBitSet> list = new LinkedList<ImmutableBitSet>(Arrays.asList(colBlocks));
- Iterator<ImmutableBitSet> it = list.iterator();
- while (it.hasNext()) {
- ImmutableBitSet cb = it.next();
- if (cb.isEmpty())
- it.remove();
- }
- colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]);
- }
-
- public static class Builder {
- final GTInfo info;
-
- private Builder() {
- this.info = new GTInfo();
- }
-
- /** optional */
- public Builder setTableName(String name) {
- info.tableName = name;
- return this;
- }
-
- /** required */
- public Builder setCodeSystem(IGTCodeSystem cs) {
- info.codeSystem = cs;
- return this;
- }
-
- /** required */
- public Builder setColumns(DataType... colTypes) {
- info.nColumns = colTypes.length;
- info.colTypes = colTypes;
- return this;
- }
-
- /** required */
- public Builder setPrimaryKey(ImmutableBitSet primaryKey) {
- info.primaryKey = primaryKey;
- return this;
- }
-
- /** optional */
- public Builder enableColumnBlock(ImmutableBitSet[] columnBlocks) {
- info.colBlocks = new ImmutableBitSet[columnBlocks.length];
- for (int i = 0; i < columnBlocks.length; i++) {
- info.colBlocks[i] = columnBlocks[i];
- }
- return this;
- }
-
- /** optional */
- public Builder enableRowBlock(int rowBlockSize) {
- info.rowBlockSize = rowBlockSize;
- return this;
- }
-
- /** optional */
- public Builder enableSharding(int nShards) {
- info.nShards = nShards;
- return this;
- }
-
- /** optional */
- public Builder setColumnPreferIndex(ImmutableBitSet colPreferIndex) {
- info.colPreferIndex = colPreferIndex;
- return this;
- }
-
- public GTInfo build() {
- info.validate();
- return info;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
deleted file mode 100644
index ceb1463..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
+++ /dev/null
@@ -1,205 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-
-/**
- * A thread-safe inverted index of row blocks in memory.
- *
- * Note function not() must return all blocks, because index only know what block contains a value,
- * but not sure what block does not contain a value.
- *
- * @author yangli9
- */
-public class GTInvertedIndex {
-
- private final GTInfo info;
- private final ImmutableBitSet colPreferIndex;
- private final ImmutableBitSet colBlocks;
- private final GTInvertedIndexOfColumn[] index; // for each column
-
- private volatile int nIndexedBlocks;
-
- public GTInvertedIndex(GTInfo info) {
- this.info = info;
- this.colPreferIndex = info.colPreferIndex;
- this.colBlocks = info.selectColumnBlocks(colPreferIndex);
-
- index = new GTInvertedIndexOfColumn[info.getColumnCount()];
- for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
- int c = colPreferIndex.trueBitAt(i);
- index[c] = new GTInvertedIndexOfColumn(info.codeSystem.getComparator());
- }
- }
-
- public void add(GTRowBlock block) {
-
- @SuppressWarnings("unchecked")
- Set<ByteArray>[] distinctValues = new Set[info.getColumnCount()];
- for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
- int c = colPreferIndex.trueBitAt(i);
- distinctValues[c] = new HashSet<ByteArray>();
- }
-
- GTRowBlock.Reader reader = block.getReader(colBlocks);
- GTRecord record = new GTRecord(info);
- while (reader.hasNext()) {
- reader.fetchNext(record);
- for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
- int c = colPreferIndex.trueBitAt(i);
- distinctValues[c].add(record.get(c));
- }
- }
-
- for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
- int c = colPreferIndex.trueBitAt(i);
- index[c].add(distinctValues[c], block.getSequenceId());
- }
-
- nIndexedBlocks = Math.max(nIndexedBlocks, block.seqId + 1);
- }
-
- public ConciseSet filter(TupleFilter filter) {
- return filter(filter, nIndexedBlocks);
- }
-
- public ConciseSet filter(TupleFilter filter, int totalBlocks) {
- // number of indexed blocks may increase as we do evaluation
- int indexedBlocks = nIndexedBlocks;
-
- Evaluator evaluator = new Evaluator(indexedBlocks);
- ConciseSet r = evaluator.evaluate(filter);
-
- // add blocks that have not been indexed
- for (int i = indexedBlocks; i < totalBlocks; i++) {
- r.add(i);
- }
-
- return r;
- }
-
- private class Evaluator {
- private int indexedBlocks;
-
- Evaluator(int indexedBlocks) {
- this.indexedBlocks = indexedBlocks;
- }
-
- public ConciseSet evaluate(TupleFilter filter) {
- if (filter == null) {
- return all();
- }
-
- if (filter instanceof LogicalTupleFilter)
- return evalLogical((LogicalTupleFilter) filter);
-
- if (filter instanceof CompareTupleFilter)
- return evalCompare((CompareTupleFilter) filter);
-
- // unable to evaluate
- return all();
- }
-
- @SuppressWarnings("unchecked")
- private ConciseSet evalCompare(CompareTupleFilter filter) {
- int col = col(filter);
- if (index[col] == null)
- return all();
-
- switch (filter.getOperator()) {
- case ISNULL:
- return index[col].getNull();
- case ISNOTNULL:
- return all();
- case EQ:
- return index[col].getEquals((ByteArray) filter.getFirstValue());
- case NEQ:
- return all();
- case IN:
- return index[col].getIn((Iterable<ByteArray>) filter.getValues());
- case NOTIN:
- return all();
- case LT:
- return index[col].getRange(null, false, (ByteArray) filter.getFirstValue(), false);
- case LTE:
- return index[col].getRange(null, false, (ByteArray) filter.getFirstValue(), true);
- case GT:
- return index[col].getRange((ByteArray) filter.getFirstValue(), false, null, false);
- case GTE:
- return index[col].getRange((ByteArray) filter.getFirstValue(), true, null, false);
- default:
- throw new IllegalStateException("Unsupported operator " + filter.getOperator());
- }
- }
-
- private ConciseSet evalLogical(LogicalTupleFilter filter) {
- List<? extends TupleFilter> children = filter.getChildren();
-
- switch (filter.getOperator()) {
- case AND:
- return evalLogicalAnd(children);
- case OR:
- return evalLogicalOr(children);
- case NOT:
- return evalLogicalNot(children);
- default:
- throw new IllegalStateException("Unsupported operator " + filter.getOperator());
- }
- }
-
- private ConciseSet evalLogicalAnd(List<? extends TupleFilter> children) {
- ConciseSet set = all();
-
- for (TupleFilter c : children) {
- ConciseSet t = evaluate(c);
- if (t == null)
- continue; // because it's AND
-
- set.retainAll(t);
- }
- return set;
- }
-
- private ConciseSet evalLogicalOr(List<? extends TupleFilter> children) {
- ConciseSet set = new ConciseSet();
-
- for (TupleFilter c : children) {
- ConciseSet t = evaluate(c);
- if (t == null)
- return null; // because it's OR
-
- set.addAll(t);
- }
- return set;
- }
-
- private ConciseSet evalLogicalNot(List<? extends TupleFilter> children) {
- return all();
- }
-
- private ConciseSet all() {
- return not(new ConciseSet());
- }
-
- private ConciseSet not(ConciseSet set) {
- set.add(indexedBlocks);
- set.complement();
- return set;
- }
-
- private int col(CompareTupleFilter filter) {
- return filter.getColumn().getColumnDesc().getZeroBasedIndex();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndexOfColumn.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndexOfColumn.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndexOfColumn.java
deleted file mode 100644
index 10a944c..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndexOfColumn.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
-import java.util.NavigableMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.kylin.common.util.ByteArray;
-
-import com.google.common.collect.Maps;
-
-public class GTInvertedIndexOfColumn {
-
- final private IGTComparator comparator;
- final private ReentrantReadWriteLock rwLock;
-
- private int nBlocks;
- private NavigableMap<ByteArray, ConciseSet> rangeIndex;
- private ConciseSet nullIndex;
-
- public GTInvertedIndexOfColumn(IGTComparator comparator) {
- this.comparator = comparator;
- this.rwLock = new ReentrantReadWriteLock();
- this.rangeIndex = Maps.newTreeMap(comparator);
- this.nullIndex = new ConciseSet();
- }
-
- public void add(Iterable<ByteArray> codes, int blockId) {
- rwLock.writeLock().lock();
- try {
- for (ByteArray code : codes) {
- if (comparator.isNull(code)) {
- nullIndex.add(blockId);
- continue;
- }
- ConciseSet set = rangeIndex.get(code);
- if (set == null) {
- set = new ConciseSet();
- rangeIndex.put(code.copy(), set);
- }
- set.add(blockId);
- }
-
- if (blockId >= nBlocks) {
- nBlocks = blockId + 1;
- }
-
- } finally {
- rwLock.writeLock().unlock();
- }
- }
-
- public ConciseSet getNull() {
- rwLock.readLock().lock();
- try {
- return nullIndex.clone();
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- public ConciseSet getEquals(ByteArray code) {
- rwLock.readLock().lock();
- try {
- ConciseSet set = rangeIndex.get(code);
- if (set == null)
- return new ConciseSet();
- else
- return set.clone();
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- public ConciseSet getIn(Iterable<ByteArray> codes) {
- rwLock.readLock().lock();
- try {
- ConciseSet r = new ConciseSet();
- for (ByteArray code : codes) {
- ConciseSet set = rangeIndex.get(code);
- if (set != null)
- r.addAll(set);
- }
- return r;
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- public ConciseSet getRange(ByteArray from, boolean fromInclusive, ByteArray to, boolean toInclusive) {
- rwLock.readLock().lock();
- try {
- ConciseSet r = new ConciseSet();
- if (from == null && to == null) {
- r.add(nBlocks);
- r.complement();
- return r;
- }
- NavigableMap<ByteArray, ConciseSet> subMap;
- if (from == null) {
- subMap = rangeIndex.headMap(to, toInclusive);
- } else if (to == null) {
- subMap = rangeIndex.tailMap(from, fromInclusive);
- } else {
- subMap = rangeIndex.subMap(from, fromInclusive, to, toInclusive);
- }
- for (ConciseSet set : subMap.values()) {
- r.addAll(set);
- }
- return r;
- } finally {
- rwLock.readLock().unlock();
- }
- }
-}