You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/04/11 00:48:00 UTC
[23/34] incubator-kylin git commit: KYLIN-655, milestone,
IGTScanner done, pending transform to tuple
KYLIN-655, milestone, IGTScanner done, pending transform to tuple
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5e633b79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5e633b79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5e633b79
Branch: refs/heads/streaming-localdict
Commit: 5e633b791b7d13887e0fdbbc140f22bb467be4f7
Parents: 84183f9
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Apr 9 11:10:05 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu Apr 9 11:10:05 2015 +0800
----------------------------------------------------------------------
.../storage/cube/CubeHBaseReadonlyStore.java | 48 ++++--
.../apache/kylin/storage/cube/CubeScanner.java | 145 +++++++++++++++----
.../storage/gridtable/GTAggregateScanner.java | 3 +
.../storage/gridtable/GTFilterScanner.java | 113 +++------------
.../storage/gridtable/GTInvertedIndex.java | 2 +-
.../kylin/storage/gridtable/GTRawScanner.java | 136 +++++++++++++++++
.../kylin/storage/gridtable/GTRowBlock.java | 18 ++-
.../kylin/storage/gridtable/GridTable.java | 6 +-
.../gridtable/memstore/GTSimpleMemStore.java | 2 +-
9 files changed, 332 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
index e8d5d23..6af9b99 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
@@ -5,6 +5,8 @@ import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
@@ -16,7 +18,7 @@ import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowValueDecoder;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.cube.model.HBaseMappingDesc;
@@ -25,7 +27,6 @@ import org.apache.kylin.storage.gridtable.GTRowBlock;
import org.apache.kylin.storage.gridtable.GTRowBlock.Writer;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.IGTStore;
-import org.apache.kylin.storage.hbase.CubeSegmentTupleIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,18 +69,23 @@ public class CubeHBaseReadonlyStore implements IGTStore {
}
@Override
- public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColumnBlocks, GTScanRequest additionalPushDown) throws IOException {
// TODO enable coprocessor
+ // primary key (also the 0th column block) is always selected
+ final BitSet selectedColBlocks = (BitSet) selectedColumnBlocks.clone();
+ selectedColBlocks.set(0);
+
// globally shared connection, does not require close
HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-
+
final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
Scan hbaseScan = buildScan(pkStart, pkEnd, hbaseColumns);
final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
final Iterator<Result> iterator = scanner.iterator();
+ final GTRowBlock oneBlock = new GTRowBlock(info); // avoid object creation
return new IGTStoreScanner() {
@@ -90,12 +96,22 @@ public class CubeHBaseReadonlyStore implements IGTStore {
@Override
public GTRowBlock next() {
+ // row block is always disabled for cubes, row block contains only one record
Result result = iterator.next();
- result.getRow();
- for (Pair<byte[], byte[]> hbaseColumn : hbaseColumns) {
- result.getColumnLatestCell(hbaseColumn.getFirst(), hbaseColumn.getSecond());
+
+ // dimensions, set to primary key, also the 0th column block
+ byte[] rowkey = result.getRow();
+ oneBlock.getPrimaryKey().set(rowkey, RowConstants.ROWKEY_CUBOIDID_LEN, rowkey.length - RowConstants.ROWKEY_CUBOIDID_LEN);
+ oneBlock.getCellBlock(0).set(rowkey, RowConstants.ROWKEY_CUBOIDID_LEN, rowkey.length - RowConstants.ROWKEY_CUBOIDID_LEN);
+
+ // metrics
+ int hbaseColIdx = 0;
+ for (int colBlockIdx = selectedColBlocks.nextSetBit(1); colBlockIdx >= 0; colBlockIdx = selectedColBlocks.nextSetBit(colBlockIdx + 1)) {
+ Pair<byte[], byte[]> hbaseColumn = hbaseColumns.get(hbaseColIdx++);
+ Cell cell = result.getColumnLatestCell(hbaseColumn.getFirst(), hbaseColumn.getSecond());
+ oneBlock.getCellBlock(colBlockIdx).set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
- return null;
+ return oneBlock;
}
@Override
@@ -123,15 +139,25 @@ public class CubeHBaseReadonlyStore implements IGTStore {
scan.addColumn(byteFamily, byteQualifier);
}
- scan.setStartRow(pkStart.copy().array());
- scan.setStopRow(pkEnd.copy().array());
+ scan.setStartRow(makeRowKeyToScan(pkStart));
+ scan.setStopRow(makeRowKeyToScan(pkEnd));
return scan;
}
+ private byte[] makeRowKeyToScan(ByteArray pk) {
+ if (pk == null || pk.array() == null)
+ return HConstants.EMPTY_BYTE_ARRAY; // from the very beginning, or to the end
+
+ byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_CUBOIDID_LEN];
+ System.arraycopy(cuboid.getBytes(), 0, buf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+ System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_CUBOIDID_LEN, pk.length());
+ return buf;
+ }
+
private List<Pair<byte[], byte[]>> makeHBaseColumns(BitSet selectedColBlocks) {
List<Pair<byte[], byte[]>> result = Lists.newArrayList();
- int colBlockIdx = 0;
+ int colBlockIdx = 1; // start from 1; the 0th column block is primary key which maps to rowkey
HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping();
for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) {
byte[] byteFamily = Bytes.toBytes(familyDesc.getName());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
index a8a0547..ea5526d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
@@ -5,6 +5,7 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.kylin.cube.CubeSegment;
@@ -15,6 +16,7 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRawScanner;
import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTScanRange;
import org.apache.kylin.storage.gridtable.GTScanRangePlanner;
@@ -24,33 +26,38 @@ import org.apache.kylin.storage.gridtable.IGTScanner;
import com.google.common.collect.Lists;
-public class CubeScanner {
+public class CubeScanner implements IGTScanner {
private static final int MAX_SCAN_RANGES = 200;
-
+
final GTInfo info;
+ final CubeHBaseReadonlyStore store;
final List<GTScanRequest> scanRequests;
+ final Scanner scanner;
public CubeScanner(CubeSegment cubeSeg, Set<TblColRef> dimensions, Set<TblColRef> groups, //
Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context) {
Cuboid cuboid = context.getCuboid();
info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
-
+ store = new CubeHBaseReadonlyStore(info, cubeSeg, cuboid);
+
TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, cuboid.getColumns(), groups);
BitSet gtDimensions = makeGridTableColumns(cuboid, dimensions);
BitSet gtAggrGroups = makeGridTableColumns(cuboid, groups);
BitSet gtAggrMetrics = makeGridTableColumns(cubeSeg.getCubeDesc(), cuboid, metrics);
String[] gtAggrFuncs = makeAggrFuncs(metrics);
-
+
GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(info);
List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
-
+
scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
for (GTScanRange range : scanRanges) {
scanRequests.add(new GTScanRequest(info, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter));
}
+
+ scanner = new Scanner();
}
-
+
private BitSet makeGridTableColumns(Cuboid cuboid, Set<TblColRef> dimensions) {
BitSet result = new BitSet();
List<TblColRef> dimCols = cuboid.getColumns();
@@ -69,7 +76,7 @@ public class CubeScanner {
int index = cubeDesc.getMeasures().indexOf(metric);
if (index < 0)
throw new IllegalStateException(metric + " not found in " + cubeDesc);
-
+
result.set(metricsIndexStart + index);
}
return result;
@@ -84,44 +91,118 @@ public class CubeScanner {
return result;
}
- public IGTScanner scan() {
- return new Scanner();
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return scanner.iterator();
}
-
- private class Scanner implements IGTScanner {
- int curRequestIndex = 0;
-
- // TODO hbase metrics
- int scannedRowCount = 0;
- int scannedRowBlockCount = 0;
- @Override
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return scanner.getScannedRowCount();
+ }
+
+ @Override
+ public int getScannedRowBlockCount() {
+ return scanner.getScannedRowBlockCount();
+ }
+
+ private class Scanner {
+ final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
+ int cur = 0;
+ Iterator<GTRecord> curIterator = null;
+ GTRecord next = null;
+
public Iterator<GTRecord> iterator() {
- // TODO Auto-generated method stub
- return null;
- }
+ return new Iterator<GTRecord>() {
- @Override
- public void close() throws IOException {
- // TODO Auto-generated method stub
-
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ if (curIterator == null) {
+ if (cur >= scanRequests.size())
+ return false;
+
+ try {
+ inputScanners[cur] = new GTRawScanner(info, store, scanRequests.get(cur));
+ curIterator = inputScanners[cur].iterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (curIterator.hasNext() == false) {
+ curIterator = null;
+ cur++;
+ return hasNext();
+ }
+
+ next = curIterator.next();
+ return true;
+ }
+
+ @Override
+ public GTRecord next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ GTRecord result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
}
- @Override
- public GTInfo getInfo() {
- return info;
+ public void close() throws IOException {
+ for (int i = 0; i < inputScanners.length; i++) {
+ if (inputScanners[i] != null) {
+ inputScanners[i].close();
+ }
+ }
}
- @Override
public int getScannedRowCount() {
- return scannedRowCount;
+ int result = 0;
+ for (int i = 0; i < inputScanners.length; i++) {
+ if (inputScanners[i] == null)
+ break;
+
+ result += inputScanners[i].getScannedRowCount();
+ }
+ return result;
}
- @Override
public int getScannedRowBlockCount() {
- return scannedRowBlockCount;
+ int result = 0;
+ for (int i = 0; i < inputScanners.length; i++) {
+ if (inputScanners[i] == null)
+ break;
+
+ result += inputScanners[i].getScannedRowBlockCount();
+ }
+ return result;
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index 7db8eed..14a3efa 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -21,6 +21,9 @@ public class GTAggregateScanner implements IGTScanner {
final IGTScanner inputScanner;
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+ if (req.hasAggregation() == false)
+ throw new IllegalStateException();
+
this.info = inputScanner.getInfo();
this.dimensions = (BitSet) req.getColumns().clone();
this.dimensions.andNot(req.getAggrMetrics());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
index 63090e6..ebb6ce9 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
@@ -1,7 +1,6 @@
package org.apache.kylin.storage.gridtable;
import java.io.IOException;
-import java.util.BitSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -10,123 +9,73 @@ import org.apache.kylin.metadata.filter.IFilterCodeSystem;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
-import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreScanner;
public class GTFilterScanner implements IGTScanner {
- final GTInfo info;
- final IGTStoreScanner storeScanner;
- final TupleFilter filter;
- final BitSet selectedColBlocks;
+ final private IGTScanner inputScanner;
+ final private TupleFilter filter;
+ final private IEvaluatableTuple oneTuple; // avoid instance creation
+
+ private GTRecord next = null;
- private GTRowBlock.Reader curBlockReader;
- private GTRecord next;
- final private GTRecord oneRecord; // avoid instance creation
- final private TupleAdapter oneTuple; // avoid instance creation
-
- private int scannedRowCount = 0;
- private int scannedRowBlockCount = 0;
-
- public GTFilterScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
- this.info = info;
+ public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
+ this.inputScanner = inputScanner;
this.filter = req.getFilterPushDown();
+ this.oneTuple = new IEvaluatableTuple() {
+ @Override
+ public Object getValue(TblColRef col) {
+ return next.get(col.getColumn().getZeroBasedIndex());
+ }
+ };
if (TupleFilter.isEvaluableRecursively(filter) == false)
throw new IllegalArgumentException();
-
- ByteArray start = makeScanKey(req.getPkStart());
- ByteArray end = makeScanKey(req.getPkEnd());
- this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
-
- this.storeScanner = store.scan(start, end, selectedColBlocks, req);
- this.oneRecord = new GTRecord(info);
- this.oneTuple = new TupleAdapter(oneRecord);
- }
-
- private ByteArray makeScanKey(GTRecord rec) {
- int firstPKCol = info.primaryKey.nextSetBit(0);
- if (rec == null || rec.cols[firstPKCol].array() == null)
- return null;
-
- BitSet selectedColumns = new BitSet();
- int len = 0;
- for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) {
- if (rec.cols[i].array() == null) {
- break;
- }
- selectedColumns.set(i);
- len += rec.cols[i].length();
- }
-
- ByteArray buf = ByteArray.allocate(len);
- rec.exportColumns(selectedColumns, buf);
- return buf;
}
@Override
public GTInfo getInfo() {
- return info;
+ return inputScanner.getInfo();
}
@Override
public int getScannedRowCount() {
- return scannedRowCount;
+ return inputScanner.getScannedRowCount();
}
@Override
public int getScannedRowBlockCount() {
- return scannedRowBlockCount;
+ return inputScanner.getScannedRowBlockCount();
}
@Override
public void close() throws IOException {
- storeScanner.close();
+ inputScanner.close();
}
@Override
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
+
+ private Iterator<GTRecord> inputIterator = inputScanner.iterator();
@Override
public boolean hasNext() {
if (next != null)
return true;
- IFilterCodeSystem<ByteArray> filterCodeSystem = info.codeSystem.getFilterCodeSystem();
+ IFilterCodeSystem<ByteArray> filterCodeSystem = getInfo().codeSystem.getFilterCodeSystem();
- while (fetchNext()) {
+ while (inputIterator.hasNext()) {
+ next = inputIterator.next();
if (filter != null && filter.evaluate(oneTuple, filterCodeSystem) == false) {
continue;
}
- next = oneRecord;
return true;
}
+ next = null;
return false;
}
- private boolean fetchNext() {
- while (true) {
- // get a block
- if (curBlockReader == null) {
- if (storeScanner.hasNext()) {
- curBlockReader = storeScanner.next().getReader(selectedColBlocks);
- scannedRowBlockCount++;
- } else {
- return false;
- }
- }
- // if block exhausted, try next block
- if (curBlockReader.hasNext() == false) {
- curBlockReader = null;
- continue;
- }
- // fetch a row
- curBlockReader.fetchNext(oneRecord);
- scannedRowCount++;
- return true;
- }
- }
-
@Override
public GTRecord next() {
// fetch next record
@@ -148,20 +97,4 @@ public class GTFilterScanner implements IGTScanner {
};
}
-
- private static class TupleAdapter implements IEvaluatableTuple {
-
- private GTRecord r;
-
- private TupleAdapter(GTRecord r) {
- this.r = r;
- }
-
- @Override
- public Object getValue(TblColRef col) {
- return r.get(col.getColumn().getZeroBasedIndex());
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
index 3c8d862..449b174 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
@@ -58,7 +58,7 @@ public class GTInvertedIndex {
}
for (int i = colPreferIndex.nextSetBit(0); i >= 0; i = colPreferIndex.nextSetBit(i + 1)) {
- index[i].add(distinctValues[i], block.sequenceId());
+ index[i].add(distinctValues[i], block.getSequenceId());
}
nIndexedBlocks = Math.max(nIndexedBlocks, block.seqId + 1);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
new file mode 100644
index 0000000..895ccf3
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
@@ -0,0 +1,136 @@
+package org.apache.kylin.storage.gridtable;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreScanner;
+
+public class GTRawScanner implements IGTScanner {
+
+ final GTInfo info;
+ final IGTStoreScanner storeScanner;
+ final BitSet selectedColBlocks;
+
+ private GTRowBlock.Reader curBlockReader;
+ private GTRecord next;
+ final private GTRecord oneRecord; // avoid instance creation
+
+ private int scannedRowCount = 0;
+ private int scannedRowBlockCount = 0;
+
+ public GTRawScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
+ this.info = info;
+
+ ByteArray start = makeScanKey(req.getPkStart());
+ ByteArray end = makeScanKey(req.getPkEnd());
+ this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
+
+ this.storeScanner = store.scan(start, end, selectedColBlocks, req);
+ this.oneRecord = new GTRecord(info);
+ }
+
+ private ByteArray makeScanKey(GTRecord rec) {
+ int firstPKCol = info.primaryKey.nextSetBit(0);
+ if (rec == null || rec.cols[firstPKCol].array() == null)
+ return null;
+
+ BitSet selectedColumns = new BitSet();
+ int len = 0;
+ for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) {
+ if (rec.cols[i].array() == null) {
+ break;
+ }
+ selectedColumns.set(i);
+ len += rec.cols[i].length();
+ }
+
+ ByteArray buf = ByteArray.allocate(len);
+ rec.exportColumns(selectedColumns, buf);
+ return buf;
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return scannedRowCount;
+ }
+
+ @Override
+ public int getScannedRowBlockCount() {
+ return scannedRowBlockCount;
+ }
+
+ @Override
+ public void close() throws IOException {
+ storeScanner.close();
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ if (fetchOneRecord()) {
+ next = oneRecord;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean fetchOneRecord() {
+ while (true) {
+ // get a block
+ if (curBlockReader == null) {
+ if (storeScanner.hasNext()) {
+ curBlockReader = storeScanner.next().getReader(selectedColBlocks);
+ scannedRowBlockCount++;
+ } else {
+ return false;
+ }
+ }
+ // if block exhausted, try next block
+ if (curBlockReader.hasNext() == false) {
+ curBlockReader = null;
+ continue;
+ }
+ // fetch a row
+ curBlockReader.fetchNext(oneRecord);
+ scannedRowCount++;
+ return true;
+ }
+ }
+
+ @Override
+ public GTRecord next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ GTRecord result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
index 62da0b4..4732142 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
@@ -32,17 +32,25 @@ public class GTRowBlock {
/** create a row block that has no underlying space */
public GTRowBlock(GTInfo info) {
this.info = info;
- primaryKey = new ByteArray();
- cellBlocks = new ByteArray[info.colBlocks.length];
- for (int i = 0; i < cellBlocks.length; i++) {
- cellBlocks[i] = new ByteArray();
+ this.primaryKey = new ByteArray();
+ this.cellBlocks = new ByteArray[info.colBlocks.length];
+ for (int i = 0; i < this.cellBlocks.length; i++) {
+ this.cellBlocks[i] = new ByteArray();
}
}
- public int sequenceId() {
+ public int getSequenceId() {
return seqId;
}
+ public ByteArray getPrimaryKey() {
+ return primaryKey;
+ }
+
+ public ByteArray getCellBlock(int i) {
+ return cellBlocks[i];
+ }
+
public Writer getWriter() {
return new Writer();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
index f2f775b..20b543a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
@@ -32,7 +32,11 @@ public class GridTable {
}
public IGTScanner scan(GTScanRequest req) throws IOException {
- IGTScanner result = new GTFilterScanner(info, store, req);
+ IGTScanner result = new GTRawScanner(info, store, req);
+
+ if (req.hasFilterPushDown()) {
+ result = new GTFilterScanner(result, req);
+ }
if (req.hasAggregation()) {
result = new GTAggregateScanner(result, req);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index ba92f8d..32c7f36 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -58,7 +58,7 @@ public class GTSimpleMemStore implements IGTStore {
@Override
public void write(GTRowBlock block) throws IOException {
GTRowBlock copy = block.copy();
- int id = block.sequenceId();
+ int id = block.getSequenceId();
if (id < rowBlockList.size()) {
rowBlockList.set(id, copy);
} else {