You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/15 08:58:38 UTC
incubator-kylin git commit: KYLIN-655, merge from local branch,
compile pass
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 25acaf926 -> 62cf7c032
KYLIN-655, merge from local branch, compile pass
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/62cf7c03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/62cf7c03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/62cf7c03
Branch: refs/heads/0.8.0
Commit: 62cf7c032d6e7243772156966b3fe6994c86cd53
Parents: 25acaf9
Author: Li, Yang <ya...@ebay.com>
Authored: Fri May 15 14:54:45 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri May 15 14:58:20 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/ByteArray.java | 56 ++++-
.../job/hadoop/cubev2/InMemCubeBuilder.java | 52 ++--
.../kylin/storage/cube/CubeCodeSystem.java | 6 +-
.../kylin/storage/cube/CubeGridTable.java | 29 ++-
.../storage/cube/CubeHBaseReadonlyStore.java | 14 +-
.../apache/kylin/storage/cube/CubeScanner.java | 6 +-
.../kylin/storage/cube/CubeStorageEngine.java | 81 ++++---
.../kylin/storage/cube/CubeTupleConverter.java | 239 +++++++++++++++++++
.../cube/SequentialCubeTupleIterator.java | 111 +++++++++
.../kylin/storage/gridtable/GTComboStore.java | 19 +-
.../apache/kylin/storage/gridtable/GTInfo.java | 16 +-
.../kylin/storage/gridtable/GTRecord.java | 12 +
.../storage/gridtable/GTSampleCodeSystem.java | 1 -
.../storage/gridtable/GTScanRangePlanner.java | 17 +-
.../kylin/storage/gridtable/IGTStore.java | 4 +-
.../gridtable/diskstore/GTDiskStore.java | 24 +-
.../gridtable/memstore/GTSimpleMemStore.java | 21 +-
17 files changed, 599 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
index e63f904..460ca69 100644
--- a/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
+++ b/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
@@ -29,6 +29,8 @@ import org.apache.kylin.common.util.Bytes;
public class ByteArray implements Comparable<ByteArray>, Serializable {
private static final long serialVersionUID = 1L;
+
+ public static final ByteArray EMPTY = new ImmutableByteArray();
public static ByteArray allocate(int length) {
return new ByteArray(new byte[length]);
@@ -47,19 +49,21 @@ public class ByteArray implements Comparable<ByteArray>, Serializable {
private int length;
public ByteArray() {
- set(null, 0, 0);
+ this(null, 0, 0);
}
public ByteArray(int capacity) {
- set(new byte[capacity], 0, capacity);
+ this(new byte[capacity], 0, capacity);
}
public ByteArray(byte[] data) {
- set(data, 0, data == null ? 0 : data.length);
+ this(data, 0, data == null ? 0 : data.length);
}
public ByteArray(byte[] data, int offset, int length) {
- set(data, offset, length);
+ this.data = data;
+ this.offset = offset;
+ this.length = length;
}
public byte[] array() {
@@ -156,5 +160,49 @@ public class ByteArray implements Comparable<ByteArray>, Serializable {
else
return Bytes.toStringBinary(data, offset, length);
}
+
+ // ============================================================================
+
+ public static class ImmutableByteArray extends ByteArray {
+
+ private static final long serialVersionUID = 1L;
+
+ public ImmutableByteArray() {
+ super();
+ }
+
+ public ImmutableByteArray(byte[] data, int offset, int length) {
+ super(data, offset, length);
+ }
+
+ public ImmutableByteArray(byte[] data) {
+ super(data);
+ }
+
+ @Override
+ public void set(byte[] array) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void set(byte[] array, int offset, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void set(ByteArray o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setLength(int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void copyFrom(ByteArray other) {
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
index 83e4a79..413b22c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
@@ -33,15 +33,24 @@
*/
package org.apache.kylin.job.hadoop.cubev2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.util.StringUtils;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -53,19 +62,23 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
+import org.apache.kylin.storage.gridtable.GTAggregateScanner;
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTComboStore;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
import org.apache.kylin.storage.util.SizeOfUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
*/
-@SuppressWarnings("rawtypes")
public class InMemCubeBuilder implements Runnable {
//estimation of (size of aggregation cache) / (size of mem store)
@@ -361,7 +374,7 @@ public class InMemCubeBuilder implements Runnable {
createNDCuboidGT(tree, baseCuboidId, childId);
}
}
- baseCuboidGT.getStore().drop();
+ dropStore(baseCuboidGT);
} catch (IOException e) {
logger.error("Fail to build cube", e);
@@ -396,8 +409,7 @@ public class InMemCubeBuilder implements Runnable {
return true;
} else {
logger.info("memory is low, try to select one node to flush to disk from:" + StringUtils.join(",", gridTables));
- final IGTStore store = gridTable.data.getStore();
- assert store instanceof GTComboStore;
+ final GTComboStore store = (GTComboStore) gridTable.data.getStore();
if (store.memoryUsage() > 0) {
final long storeSize = SizeOfUtil.deepSizeOf(store);
memoryLeft += storeSize;
@@ -420,10 +432,11 @@ public class InMemCubeBuilder implements Runnable {
private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
long startTime = System.currentTimeMillis();
- assert parentNode.data.getStore() instanceof GTComboStore;
- if (parentNode.data.getStore().memoryUsage() <= 0) {
+
+ GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
+ if (parentStore.memoryUsage() <= 0) {
long t = System.currentTimeMillis();
- ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
+ parentStore.switchToMemStore();
logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
}
@@ -448,12 +461,17 @@ public class InMemCubeBuilder implements Runnable {
startTime = System.currentTimeMillis();
//output the grid table
outputGT(cuboidId, currentCuboid);
- currentCuboid.getStore().drop();
+ dropStore(currentCuboid);
parentNode.children.remove(node);
logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
}
+ private void dropStore(GridTable gt) throws IOException {
+ ((GTComboStore) gt.getStore()).drop();
+ }
+
+
private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
IGTScanner scanner = gridTable.scan(req);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
index 4ca8e78..786f154 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
@@ -6,9 +6,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dict.Dictionary;
@@ -21,6 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ * Created by shaoshi on 3/23/15.
* This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check
* its data type to serialize/deserialize it;
*/
@@ -146,7 +146,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
@Override
public Object decodeColumnValue(int col, ByteBuffer buf) {
- return serializers[col].deserialize(buf);
+ return serializers[col].deserialize(buf);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
index de3c6a8..6ed07b6 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
@@ -1,5 +1,6 @@
package org.apache.kylin.storage.cube;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -8,6 +9,8 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -16,6 +19,7 @@ import org.apache.kylin.storage.gridtable.GTInfo;
import com.google.common.collect.Maps;
+@SuppressWarnings("rawtypes")
public class CubeGridTable {
public static Map<TblColRef, Dictionary<?>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) {
@@ -38,17 +42,17 @@ public class CubeGridTable {
Map<TblColRef, Dictionary<?>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId);
return newGTInfo(cubeSeg.getCubeDesc(), cuboidId, dictionaryMap);
}
-
- @SuppressWarnings("rawtypes")
+
public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<?>> dictionaryMap) {
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
List<TblColRef> dimCols = cuboid.getColumns();
int nColumns = dimCols.size() + cubeDesc.getMeasures().size();
+ ArrayList<BitSet> colBlocks = new ArrayList<BitSet>();
BitSet dimensions = new BitSet();
dimensions.set(0, dimCols.size());
- BitSet metrics = new BitSet();
- metrics.set(dimCols.size(), nColumns);
+ colBlocks.add(dimensions);
+
DataType[] dataTypes = new DataType[nColumns];
Map<Integer, Dictionary> dictionaryByColIdx = Maps.newHashMap();
Map<Integer, Integer> fixLenByColIdx = Maps.newHashMap();
@@ -72,16 +76,23 @@ public class CubeGridTable {
colIndex++;
}
- for (MeasureDesc measure : cubeDesc.getMeasures()) {
- dataTypes[colIndex] = measure.getFunction().getReturnDataType();
- colIndex++;
+ for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ BitSet colBlock = new BitSet();
+ for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+ dataTypes[colIndex] = measure.getFunction().getReturnDataType();
+ colBlock.set(colIndex);
+ colIndex++;
+ }
+ colBlocks.add(colBlock);
+ }
}
-
+
GTInfo.Builder builder = GTInfo.builder();
builder.setCodeSystem(new CubeCodeSystem(dictionaryByColIdx, fixLenByColIdx));
builder.setColumns(dataTypes);
builder.setPrimaryKey(dimensions);
- builder.enableColumnBlock(new BitSet[] { dimensions, metrics });
+ builder.enableColumnBlock((BitSet[]) colBlocks.toArray(new BitSet[colBlocks.size()]));
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
index 0784587..8e5dab3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
@@ -12,8 +12,8 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.cube.CubeSegment;
@@ -49,8 +49,8 @@ public class CubeHBaseReadonlyStore implements IGTStore {
}
@Override
- public long memoryUsage() {
- return 0;
+ public GTInfo getInfo() {
+ return info;
}
@Override
@@ -122,11 +122,6 @@ public class CubeHBaseReadonlyStore implements IGTStore {
};
}
- @Override
- public void drop() throws IOException {
- throw new UnsupportedOperationException();
- }
-
private Scan buildScan(ByteArray pkStart, ByteArray pkEnd, List<Pair<byte[], byte[]>> selectedColumns) {
Scan scan = new Scan();
scan.setCaching(SCAN_CACHE);
@@ -141,6 +136,7 @@ public class CubeHBaseReadonlyStore implements IGTStore {
scan.setStartRow(makeRowKeyToScan(pkStart, (byte) 0x00));
scan.setStopRow(makeRowKeyToScan(pkEnd, (byte) 0xff));
+ //TODO fuzzy match
return scan;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
index 09e0d76..2bf9a85 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
@@ -30,6 +30,7 @@ public class CubeScanner implements IGTScanner {
private static final int MAX_SCAN_RANGES = 200;
+ final CubeSegment cubeSeg;
final GTInfo info;
final CubeHBaseReadonlyStore store;
final List<GTScanRequest> scanRequests;
@@ -37,8 +38,9 @@ public class CubeScanner implements IGTScanner {
public CubeScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
Collection<FunctionDesc> metrics, TupleFilter filter) {
- info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
- store = new CubeHBaseReadonlyStore(info, cubeSeg, cuboid);
+ this.cubeSeg = cubeSeg;
+ this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
+ this.store = new CubeHBaseReadonlyStore(info, cubeSeg, cuboid);
TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, cuboid.getColumns(), groups);
BitSet gtDimensions = makeGridTableColumns(cuboid, dimensions);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
index 5fbcf98..5535853 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
@@ -1,8 +1,11 @@
package org.apache.kylin.storage.cube;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -17,20 +20,22 @@ import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.IStorageEngine;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.DerivedFilterTranslator;
import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
-public class CubeStorageEngine implements ICachableStorageEngine {
+public class CubeStorageEngine implements IStorageEngine {
private static final Logger logger = LoggerFactory.getLogger(CubeStorageEngine.class);
@@ -38,12 +43,10 @@ public class CubeStorageEngine implements ICachableStorageEngine {
private final CubeInstance cubeInstance;
private final CubeDesc cubeDesc;
- private final String uuid;
public CubeStorageEngine(CubeInstance cube) {
this.cubeInstance = cube;
this.cubeDesc = cube.getDescriptor();
- this.uuid = cube.getUuid();
}
@Override
@@ -52,8 +55,8 @@ public class CubeStorageEngine implements ICachableStorageEngine {
TupleFilter filter = sqlDigest.filter;
// build dimension & metrics
- Collection<TblColRef> dimensions = new HashSet<TblColRef>();
- Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>();
+ Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
+ Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
// all dimensions = groups + filter dimensions
@@ -68,7 +71,7 @@ public class CubeStorageEngine implements ICachableStorageEngine {
derivedPostAggregation.removeAll(groups);
// identify cuboid
- Set<TblColRef> dimensionsD = Sets.newHashSet();
+ Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
dimensionsD.addAll(groupsD);
dimensionsD.addAll(filterDimsD);
Cuboid cuboid = identifyCuboid(dimensionsD);
@@ -78,6 +81,10 @@ public class CubeStorageEngine implements ICachableStorageEngine {
Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
context.setExactAggregation(isExactAggregation);
+
+ if (isExactAggregation) {
+ metrics = replaceHolisticCountDistinct(metrics);
+ }
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
TupleFilter filterD = translateDerived(filter, groupsD);
@@ -92,28 +99,17 @@ public class CubeStorageEngine implements ICachableStorageEngine {
scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD));
}
- return new SerializedCubeTupleIterator(scanners);
- }
-
- @Override
- public Range<Long> getVolatilePeriod() {
- return null;
- }
-
- @Override
- public String getStorageUUID() {
- return this.uuid;
- }
-
- @Override
- public boolean isDynamic() {
- return false;
+ if (scanners.isEmpty())
+ return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+
+ return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo);
}
private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
for (FunctionDesc func : sqlDigest.aggregations) {
if (!func.isDimensionAsMetric()) {
- metrics.add(func);
+ // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
+ metrics.add(findAggrFuncFromCubeDesc(func));
}
}
@@ -125,6 +121,14 @@ public class CubeStorageEngine implements ICachableStorageEngine {
dimensions.add(column);
}
}
+
+ private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ if (measure.getFunction().equals(aggrFunc))
+ return measure.getFunction();
+ }
+ return aggrFunc;
+ }
private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
Set<TblColRef> expanded = Sets.newHashSet();
@@ -219,6 +223,27 @@ public class CubeStorageEngine implements ICachableStorageEngine {
return exact;
}
+ private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
+ // for count distinct, try use its holistic version if possible
+ Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
+ for (FunctionDesc metric : metrics) {
+ if (metric.isCountDistinct() == false) {
+ result.add(metric);
+ continue;
+ }
+
+ FunctionDesc holisticVersion = null;
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ FunctionDesc measureFunc = measure.getFunction();
+ if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
+ holisticVersion = measureFunc;
+ }
+ }
+ result.add(holisticVersion == null ? metric : holisticVersion);
+ }
+ return result;
+ }
+
@SuppressWarnings("unchecked")
private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
if (filter == null)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
new file mode 100644
index 0000000..7e64975
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.cube;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import com.google.common.collect.Lists;
+
+public class CubeTupleConverter {
+
+ final CubeSegment cubeSeg;
+ final Cuboid cuboid;
+ final TupleInfo tupleInfo;
+ final List<IDerivedColumnFiller> derivedColFillers;
+
+ final int[] gtColIdx;
+ final int[] tupleIdx;
+ final Object[] tmpValues;
+ final int nSelectedDims;
+
+ public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
+ Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+ this.cubeSeg = cubeSeg;
+ this.cuboid = cuboid;
+ this.tupleInfo = returnTupleInfo;
+ this.derivedColFillers = Lists.newArrayList();
+
+ List<TblColRef> cuboidDims = cuboid.getColumns();
+ List<MeasureDesc> cuboidMeasures = cuboid.getCube().getMeasures();
+
+ nSelectedDims = selectedDimensions.size();
+ gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
+ tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
+ tmpValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
+
+ int iii = 0;
+
+ // pre-calculate dimension index mapping to tuple
+ for (int i = 0; i < cuboidDims.size(); i++) {
+ TblColRef col = cuboidDims.get(i);
+ if (selectedDimensions.contains(col)) {
+ gtColIdx[iii] = i;
+ tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
+ iii++;
+ }
+ }
+
+ // pre-calculate metrics index mapping to tuple
+ for (int i = 0; i < cuboidMeasures.size(); i++) {
+ FunctionDesc aggrFunc = cuboidMeasures.get(i).getFunction();
+ if (contains(selectedMetrics, aggrFunc)) {
+ gtColIdx[iii] = cuboidDims.size() + i;
+ // a rewrite metrics is identified by its rewrite field name
+ if (aggrFunc.needRewrite()) {
+ String rewriteFieldName = aggrFunc.getRewriteFieldName();
+ tupleIdx[iii] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
+ }
+ // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column
+ else {
+ TblColRef col = aggrFunc.getParameter().getColRefs().get(0);
+ tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
+ }
+ iii++;
+ }
+ }
+
+ // prepare derived columns and filler
+ Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(cuboidDims, null);
+ for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
+ TblColRef[] hostCols = entry.getKey().data;
+ for (DeriveInfo deriveInfo : entry.getValue()) {
+ IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo);
+ if (filler != null) {
+ derivedColFillers.add(filler);
+ }
+ }
+ }
+ }
+
+ private boolean contains(Set<FunctionDesc> selectedMetrics, FunctionDesc aggrFunc) {
+ if (aggrFunc.isCountDistinct()) {
+ // need special attention for count distinct and its holistic version, they are equals() but not exactly the same
+ for (FunctionDesc selected : selectedMetrics) {
+ if (selected.equals(aggrFunc) && selected.isHolisticCountDistinct() == aggrFunc.isHolisticCountDistinct())
+ return true;
+ }
+ return false;
+ } else
+ return selectedMetrics.contains(aggrFunc);
+ }
+
+ public void translateResult(GTRecord record, Tuple tuple) {
+
+ record.getValues(gtColIdx, tmpValues);
+
+ // dimensions
+ for (int i = 0; i < nSelectedDims; i++) {
+ int ti = tupleIdx[i];
+ if (ti >= 0) {
+ tuple.setDimensionValue(ti, toString(tmpValues[i]));
+ }
+ }
+
+ // measures
+ for (int i = nSelectedDims; i < gtColIdx.length; i++) {
+ int ti = tupleIdx[i];
+ if (ti >= 0) {
+ tuple.setMeasureValue(ti, tmpValues[i]);
+ }
+ }
+
+ // derived
+ for (IDerivedColumnFiller filler : derivedColFillers) {
+ filler.fillDerivedColumns(tmpValues, tuple);
+ }
+ }
+
+ private interface IDerivedColumnFiller {
+ public void fillDerivedColumns(Object[] tmpValues, Tuple tuple);
+ }
+
+ private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) {
+ boolean allHostsPresent = true;
+ final int[] hostTmpIdx = new int[hostCols.length];
+ for (int i = 0; i < hostCols.length; i++) {
+ hostTmpIdx[i] = indexOnTheTmpValues(hostCols[i]);
+ allHostsPresent = allHostsPresent && hostTmpIdx[i] >= 0;
+ }
+
+ boolean needCopyDerived = false;
+ final int[] derivedTupleIdx = new int[deriveInfo.columns.length];
+ for (int i = 0; i < deriveInfo.columns.length; i++) {
+ TblColRef col = deriveInfo.columns[i];
+ derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
+ needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0;
+ }
+
+ if ((allHostsPresent && needCopyDerived) == false)
+ return null;
+
+ switch (deriveInfo.type) {
+ case LOOKUP:
+ return new IDerivedColumnFiller() {
+ CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+ LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
+ int[] derivedColIdx = initDerivedColIdx();
+ Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
+
+ private int[] initDerivedColIdx() {
+ int[] idx = new int[deriveInfo.columns.length];
+ for (int i = 0; i < idx.length; i++) {
+ idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+ }
+ return idx;
+ }
+
+ @Override
+ public void fillDerivedColumns(Object[] tmpValues, Tuple tuple) {
+ for (int i = 0; i < hostTmpIdx.length; i++) {
+ lookupKey.data[i] = CubeTupleConverter.toString(tmpValues[hostTmpIdx[i]]);
+ }
+
+ String[] lookupRow = lookupTable.getRow(lookupKey);
+
+ if (lookupRow != null) {
+ for (int i = 0; i < derivedTupleIdx.length; i++) {
+ if (derivedTupleIdx[i] >= 0) {
+ String value = lookupRow[derivedColIdx[i]];
+ tuple.setDimensionValue(derivedTupleIdx[i], value);
+ }
+ }
+ } else {
+ for (int i = 0; i < derivedTupleIdx.length; i++) {
+ if (derivedTupleIdx[i] >= 0) {
+ tuple.setDimensionValue(derivedTupleIdx[i], null);
+ }
+ }
+ }
+ }
+ };
+ case PK_FK:
+ return new IDerivedColumnFiller() {
+ @Override
+ public void fillDerivedColumns(Object[] tmpValues, Tuple tuple) {
+ // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
+ tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(tmpValues[hostTmpIdx[0]]));
+ }
+ };
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private int indexOnTheTmpValues(TblColRef col) {
+ List<TblColRef> cuboidDims = cuboid.getColumns();
+ int cuboidIdx = cuboidDims.indexOf(col);
+ for (int i = 0; i < gtColIdx.length; i++) {
+ if (gtColIdx[i] == cuboidIdx)
+ return i;
+ }
+ return -1;
+ }
+
+ private static String toString(Object o) {
+ return o == null ? null : o.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
new file mode 100644
index 0000000..10ff129
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
@@ -0,0 +1,111 @@
+package org.apache.kylin.storage.cube;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequentialCubeTupleIterator implements ITupleIterator {
+
+ private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
+
+ private final Cuboid cuboid;
+ private final Set<TblColRef> selectedDimensions;
+ private final Set<FunctionDesc> selectedMetrics;
+ private final TupleInfo tupleInfo;
+ private final Tuple tuple;
+ private final Iterator<CubeScanner> scannerIterator;
+
+ private CubeScanner curScanner;
+ private Iterator<GTRecord> curRecordIterator;
+ private CubeTupleConverter curTupleConverter;
+ private Tuple next;
+
+ public SequentialCubeTupleIterator(List<CubeScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+ Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+ this.cuboid = cuboid;
+ this.selectedDimensions = selectedDimensions;
+ this.selectedMetrics = selectedMetrics;
+ this.tupleInfo = returnTupleInfo;
+ this.tuple = new Tuple(returnTupleInfo);
+ this.scannerIterator = scanners.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ if (curScanner == null) {
+ if (scannerIterator.hasNext()) {
+ curScanner = scannerIterator.next();
+ curRecordIterator = curScanner.iterator();
+ curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+ } else {
+ return false;
+ }
+ }
+
+ if (curRecordIterator.hasNext()) {
+ curTupleConverter.translateResult(curRecordIterator.next(), tuple);
+ next = tuple;
+ return true;
+ } else {
+ close(curScanner);
+ curScanner = null;
+ curRecordIterator = null;
+ curTupleConverter = null;
+ return hasNext();
+ }
+ }
+
+ @Override
+ public ITuple next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ ITuple result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ if (curScanner != null)
+ close(curScanner);
+
+ while (scannerIterator.hasNext()) {
+ close(scannerIterator.next());
+ }
+ }
+
+ private void close(CubeScanner scanner) {
+ try {
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("Exception when close CubeScanner", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
index f07b516..c0d56c3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
@@ -40,6 +40,11 @@ public class GTComboStore implements IGTStore {
this.gtDiskStore = new GTDiskStore(gtInfo);
}
}
+
+ @Override
+ public GTInfo getInfo() {
+ return gtInfo;
+ }
private IGTStore getCurrent() {
if (gtSimpleMemStore != null) {
@@ -48,6 +53,14 @@ public class GTComboStore implements IGTStore {
return gtDiskStore;
}
}
+
+ public long memoryUsage() {
+ if (gtSimpleMemStore != null) {
+ return gtSimpleMemStore.memoryUsage();
+ } else {
+ return gtDiskStore.memoryUsage();
+ }
+ }
public void switchToMemStore() {
try {
@@ -78,11 +91,6 @@ public class GTComboStore implements IGTStore {
}
@Override
- public long memoryUsage() {
- return getCurrent().memoryUsage();
- }
-
- @Override
public IGTStoreWriter rebuild(int shard) throws IOException {
return getCurrent().rebuild(shard);
}
@@ -97,7 +105,6 @@ public class GTComboStore implements IGTStore {
return getCurrent().scan(pkStart, pkEnd, selectedColBlocks, additionalPushDown);
}
- @Override
public void drop() throws IOException {
if (gtSimpleMemStore != null) {
gtSimpleMemStore.drop();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
index fdabb60..6c9968f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
@@ -17,6 +17,7 @@ public class GTInfo {
IGTCodeSystem codeSystem;
// column schema
+ int nColumns;
DataType[] colTypes;
BitSet colAll;
BitSet colPreferIndex;
@@ -34,9 +35,13 @@ public class GTInfo {
// must create from builder
private GTInfo() {
}
+
+ public IGTCodeSystem getCodeSystem() {
+ return codeSystem;
+ }
public int getColumnCount() {
- return colTypes.length;
+ return nColumns;
}
public DataType getColumnType(int i) {
@@ -73,7 +78,7 @@ public class GTInfo {
public int getMaxColumnLength() {
int max = 0;
- for (int i = 0; i < colTypes.length; i++)
+ for (int i = 0; i < nColumns; i++)
max = Math.max(max, codeSystem.maxCodeLength(i));
return max;
}
@@ -94,7 +99,7 @@ public class GTInfo {
public TblColRef colRef(int i) {
if (colRefs == null) {
- colRefs = new TblColRef[colTypes.length];
+ colRefs = new TblColRef[nColumns];
}
if (colRefs[i] == null) {
colRefs[i] = GTUtil.tblColRef(i, colTypes[i].toString());
@@ -104,7 +109,7 @@ public class GTInfo {
public void validateColRef(TblColRef ref) {
TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex());
- if (expected != ref)
+ if (expected.equals(ref) == false)
throw new IllegalArgumentException();
}
@@ -123,7 +128,7 @@ public class GTInfo {
private void validateColumnBlocks() {
colAll = new BitSet();
- colAll.flip(0, colTypes.length);
+ colAll.flip(0, nColumns);
if (colBlocks == null) {
colBlocks = new BitSet[2];
@@ -184,6 +189,7 @@ public class GTInfo {
/** required */
public Builder setColumns(DataType... colTypes) {
+ info.nColumns = colTypes.length;
info.colTypes = colTypes;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
index 2a38731..61ff25d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
@@ -70,6 +70,18 @@ public class GTRecord implements Comparable<GTRecord> {
return result;
}
+ public Object[] getValues(int[] selectedColumns, Object[] result) {
+ assert selectedColumns.length <= result.length;
+ for (int i = 0; i < selectedColumns.length; i++) {
+ int c = selectedColumns[i];
+ if (cols[c].array() == null)
+ result[i] = null;
+ else
+ result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+ }
+ return result;
+ }
+
public GTRecord copy() {
return copy(info.colAll);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
index 9c758fa..ceeeb84 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
@@ -3,7 +3,6 @@ package org.apache.kylin.storage.gridtable;
import java.nio.ByteBuffer;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.filter.IFilterCodeSystem;
import org.apache.kylin.metadata.measure.MeasureAggregator;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
index d3418f7..9fddffe 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
@@ -269,8 +269,8 @@ public class GTScanRangePlanner {
private class ColumnRange {
private TblColRef column;
- private ByteArray begin = new ByteArray();
- private ByteArray end = new ByteArray();
+ private ByteArray begin = ByteArray.EMPTY;
+ private ByteArray end = ByteArray.EMPTY;
private Set<ByteArray> equals;
public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
@@ -309,8 +309,13 @@ public class GTScanRangePlanner {
}
private void refreshBeginEndFromEquals() {
- this.begin = byteUnknownIsSmaller.min(this.equals);
- this.end = byteUnknownIsBigger.max(this.equals);
+ if (equals.isEmpty()) {
+ begin = ByteArray.EMPTY;
+ end = ByteArray.EMPTY;
+ } else {
+ begin = byteUnknownIsSmaller.min(equals);
+ end = byteUnknownIsBigger.max(equals);
+ }
}
public boolean satisfyAll() {
@@ -383,7 +388,7 @@ public class GTScanRangePlanner {
public static abstract class ComparatorEx<T> implements Comparator<T> {
public T min(Collection<T> v) {
- if (v.size() < 0) {
+ if (v.size() <= 0) {
return null;
}
@@ -396,7 +401,7 @@ public class GTScanRangePlanner {
}
public T max(Collection<T> v) {
- if (v.size() < 0) {
+ if (v.size() <= 0) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
index 0152571..4defa24 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
@@ -9,7 +9,7 @@ import org.apache.kylin.common.util.ByteArray;
public interface IGTStore {
- long memoryUsage();
+ GTInfo getInfo();
IGTStoreWriter rebuild(int shard) throws IOException;
@@ -17,8 +17,6 @@ public interface IGTStore {
IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException;
- void drop() throws IOException;
-
interface IGTStoreWriter extends Closeable {
void write(GTRowBlock block) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
index 25d66f0..9684d8d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
@@ -1,8 +1,16 @@
package org.apache.kylin.storage.gridtable.diskstore;
-import com.google.common.base.Preconditions;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.UUID;
+
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.storage.gridtable.GTInfo;
import org.apache.kylin.storage.gridtable.GTRowBlock;
import org.apache.kylin.storage.gridtable.GTScanRequest;
@@ -10,10 +18,7 @@ import org.apache.kylin.storage.gridtable.IGTStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.UUID;
+import com.google.common.base.Preconditions;
/**
*/
@@ -35,6 +40,11 @@ public class GTDiskStore implements IGTStore {
deleteTmpFilesOnExit();
}
+ @Override
+ public GTInfo getInfo() {
+ return gtInfo;
+ }
+
private String generateIdentifier(FileSystem fs) {
while (true) {
String identifier = UUID.randomUUID().toString();
@@ -75,7 +85,6 @@ public class GTDiskStore implements IGTStore {
}
}
- @Override
public long memoryUsage() {
return 0;
}
@@ -146,7 +155,6 @@ public class GTDiskStore implements IGTStore {
return new DiskStoreScanner(fileSystem.getReader(getRowBlockFile(identifier)));
}
- @Override
public void drop() throws IOException {
try {
writer.close();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/62cf7c03/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index 329c048..c59ee04 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -1,29 +1,35 @@
package org.apache.kylin.storage.gridtable.memstore;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.storage.gridtable.GTInfo;
import org.apache.kylin.storage.gridtable.GTRowBlock;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.IGTStore;
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-
public class GTSimpleMemStore implements IGTStore {
+ final GTInfo info;
final List<GTRowBlock> rowBlockList;
public GTSimpleMemStore(GTInfo info) {
- this.rowBlockList = Lists.newLinkedList();
+ this.info = info;
+ this.rowBlockList = new ArrayList<GTRowBlock>();
if (info.isShardingEnabled())
throw new UnsupportedOperationException();
}
@Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
public long memoryUsage() {
if (rowBlockList.size() == 0) {
return 0;
@@ -98,7 +104,6 @@ public class GTSimpleMemStore implements IGTStore {
};
}
- @Override
public void drop() throws IOException {
//will there be any concurrent issue? If yes, ArrayList should be replaced
rowBlockList.clear();