You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:32 UTC
[24/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTRawScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRawScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRawScanner.java
new file mode 100644
index 0000000..3e34bf4
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRawScanner.java
@@ -0,0 +1,111 @@
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.IGTStore.IGTStoreScanner;
+
+public class GTRawScanner implements IGTScanner {
+
+ final GTInfo info;
+ final IGTStoreScanner storeScanner;
+ final ImmutableBitSet selectedColBlocks;
+
+ private GTRowBlock.Reader curBlockReader;
+ private GTRecord next;
+ final private GTRecord oneRecord; // avoid instance creation
+
+ private int scannedRowCount = 0;
+ private int scannedRowBlockCount = 0;
+
+ public GTRawScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
+ this.info = info;
+ this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
+ this.storeScanner = store.scan(req.getPkStart(), req.getPkEnd(), selectedColBlocks, req);
+ this.oneRecord = new GTRecord(info);
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return scannedRowCount;
+ }
+
+ @Override
+ public int getScannedRowBlockCount() {
+ return scannedRowBlockCount;
+ }
+
+ @Override
+ public void close() throws IOException {
+ storeScanner.close();
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ if (fetchOneRecord()) {
+ next = oneRecord;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean fetchOneRecord() {
+ while (true) {
+ // get a block
+ if (curBlockReader == null) {
+ if (storeScanner.hasNext()) {
+ curBlockReader = storeScanner.next().getReader(selectedColBlocks);
+ scannedRowBlockCount++;
+ } else {
+ return false;
+ }
+ }
+ // if block exhausted, try next block
+ if (curBlockReader.hasNext() == false) {
+ curBlockReader = null;
+ continue;
+ }
+ // fetch a row
+ curBlockReader.fetchNext(oneRecord);
+ scannedRowCount++;
+ return true;
+ }
+ }
+
+ @Override
+ public GTRecord next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ GTRecord result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
new file mode 100644
index 0000000..ef2efe0
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -0,0 +1,285 @@
+package org.apache.kylin.gridtable;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+
+public class GTRecord implements Comparable<GTRecord> {
+
+ final GTInfo info;
+ final ByteArray[] cols;
+
+ private ImmutableBitSet maskForEqualHashComp;
+
+ public GTRecord(GTInfo info, ImmutableBitSet maskForEqualHashComp) {
+ this.info = info;
+ this.cols = new ByteArray[info.getColumnCount()];
+ for (int i = 0; i < cols.length; i++) {
+ if (maskForEqualHashComp.get(i)) {
+ this.cols[i] = new ByteArray();
+ }
+ }
+ this.maskForEqualHashComp = maskForEqualHashComp;
+ }
+
+ public GTRecord(GTInfo info) {
+ this(info, info.colAll);
+ }
+
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ public ByteArray get(int i) {
+ return cols[i];
+ }
+
+ public void set(int i, ByteArray data) {
+ cols[i].set(data.array(), data.offset(), data.length());
+ }
+
+ /** set record to the codes of specified values, new space allocated to hold the codes */
+ public GTRecord setValues(Object... values) {
+ setValues(info.colAll, new ByteArray(info.getMaxRecordLength()), values);
+ return this;
+ }
+
+ /** set record to the codes of specified values, reuse given space to hold the codes */
+ public GTRecord setValues(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
+ assert selectedCols.cardinality() == values.length;
+
+ ByteBuffer buf = space.asBuffer();
+ int pos = buf.position();
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ info.codeSystem.encodeColumnValue(c, values[i], buf);
+ int newPos = buf.position();
+ cols[c].set(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+ pos = newPos;
+ }
+ return this;
+ }
+
+ /** decode and return the values of this record */
+ public Object[] getValues() {
+ return getValues(info.colAll, new Object[info.getColumnCount()]);
+ }
+
+ /** decode and return the values of this record */
+ public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) {
+ assert selectedCols.cardinality() == result.length;
+
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ if (cols[c] == null || cols[c].array() == null) {
+ result[i] = null;
+ } else {
+ result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+ }
+ }
+ return result;
+ }
+
+ public Object[] getValues(int[] selectedColumns, Object[] result) {
+ assert selectedColumns.length <= result.length;
+ for (int i = 0; i < selectedColumns.length; i++) {
+ int c = selectedColumns[i];
+ if (cols[c].array() == null) {
+ result[i] = null;
+ } else {
+ result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+ }
+ }
+ return result;
+ }
+
+ public GTRecord copy() {
+ return copy(info.colAll);
+ }
+
+ public GTRecord copy(ImmutableBitSet selectedCols) {
+ int len = 0;
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ len += cols[c].length();
+ }
+
+ byte[] space = new byte[len];
+
+ GTRecord copy = new GTRecord(info, this.maskForEqualHashComp);
+ int pos = 0;
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ System.arraycopy(cols[c].array(), cols[c].offset(), space, pos, cols[c].length());
+ copy.cols[c].set(space, pos, cols[c].length());
+ pos += cols[c].length();
+ }
+
+ return copy;
+ }
+
+ public ImmutableBitSet maskForEqualHashComp() {
+ return maskForEqualHashComp;
+ }
+
+ public void maskForEqualHashComp(ImmutableBitSet set) {
+ this.maskForEqualHashComp = set;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ GTRecord o = (GTRecord) obj;
+ if (this.info != o.info)
+ return false;
+ if (this.maskForEqualHashComp != o.maskForEqualHashComp)
+ return false;
+ for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
+ int c = maskForEqualHashComp.trueBitAt(i);
+ if (this.cols[c].equals(o.cols[c]) == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
+ int c = maskForEqualHashComp.trueBitAt(i);
+ hash = (31 * hash) + cols[c].hashCode();
+ }
+ return hash;
+ }
+
+ @Override
+ public int compareTo(GTRecord o) {
+ assert this.info == o.info;
+ assert this.maskForEqualHashComp == o.maskForEqualHashComp; // reference equal for performance
+ IGTComparator comparator = info.codeSystem.getComparator();
+
+ int comp = 0;
+ for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
+ int c = maskForEqualHashComp.trueBitAt(i);
+ comp = comparator.compare(cols[c], o.cols[c]);
+ if (comp != 0)
+ return comp;
+ }
+ return comp;
+ }
+
+ @Override
+ public String toString() {
+ return toString(maskForEqualHashComp);
+ }
+
+ public String toString(ImmutableBitSet selectedColumns) {
+ Object[] values = new Object[selectedColumns.cardinality()];
+ getValues(selectedColumns, values);
+ return Arrays.toString(values);
+ }
+
+ // ============================================================================
+
+ public ByteArray exportColumns(ImmutableBitSet selectedCols) {
+ int len = 0;
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ len += cols[c].length();
+ }
+
+ ByteArray buf = ByteArray.allocate(len);
+ exportColumns(info.primaryKey, buf);
+ return buf;
+ }
+
+ /** write data to given buffer, like serialize */
+ public void exportColumns(ImmutableBitSet selectedCols, ByteArray buf) {
+ int pos = 0;
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ System.arraycopy(cols[c].array(), cols[c].offset(), buf.array(), buf.offset() + pos, cols[c].length());
+ pos += cols[c].length();
+ }
+ buf.setLength(pos);
+ }
+
+ /** write data to given buffer, like serialize */
+ public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ buf.put(cols[c].array(), cols[c].offset(), cols[c].length());
+ }
+ }
+
+ public void exportColumns(int[] fieldIndex, ByteBuffer buf) {
+ for (int i : fieldIndex) {
+ buf.put(cols[i].array(), cols[i].offset(), cols[i].length());
+ }
+ }
+
+
+ /** write data to given buffer, like serialize */
+ public void exportColumnBlock(int c, ByteBuffer buf) {
+ exportColumns(info.colBlocks[c], buf);
+ }
+
+ /** change pointers to point to data in given buffer, UNLIKE deserialize */
+ public void loadPrimaryKey(ByteBuffer buf) {
+ loadColumns(info.primaryKey, buf);
+ }
+
+ /** change pointers to point to data in given buffer, UNLIKE deserialize */
+ public void loadCellBlock(int c, ByteBuffer buf) {
+ loadColumns(info.colBlocks[c], buf);
+ }
+
+ /** change pointers to point to data in given buffer, UNLIKE deserialize */
+ public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
+ int pos = buf.position();
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ int len = info.codeSystem.codeLength(c, buf);
+ cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
+ pos += len;
+ buf.position(pos);
+ }
+ }
+
+ /** similar to export(primaryKey) but will stop at the first null value */
+ public static ByteArray exportScanKey(GTRecord rec) {
+ if (rec == null)
+ return null;
+
+ GTInfo info = rec.getInfo();
+
+ BitSet selectedColumns = new BitSet();
+ int len = 0;
+ for (int i = 0; i < info.primaryKey.trueBitCount(); i++) {
+ int c = info.primaryKey.trueBitAt(i);
+ if (rec.cols[c].array() == null) {
+ break;
+ }
+ selectedColumns.set(c);
+ len += rec.cols[c].length();
+ }
+
+ if (selectedColumns.cardinality() == 0)
+ return null;
+
+ ByteArray buf = ByteArray.allocate(len);
+ rec.exportColumns(new ImmutableBitSet(selectedColumns), buf);
+ return buf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRowBlock.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRowBlock.java
new file mode 100644
index 0000000..2c5111b
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRowBlock.java
@@ -0,0 +1,259 @@
+package org.apache.kylin.gridtable;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+
+public class GTRowBlock {
+
+ /** create a row block, allocate memory, get ready for writing */
+ public static GTRowBlock allocate(GTInfo info) {
+ GTRowBlock b = new GTRowBlock(info);
+
+ byte[] array = new byte[info.getMaxColumnLength(info.primaryKey)];
+ b.primaryKey.set(array);
+
+ int maxRows = info.isRowBlockEnabled() ? info.rowBlockSize : 1;
+ for (int i = 0; i < b.cellBlocks.length; i++) {
+ array = new byte[info.getMaxColumnLength(info.colBlocks[i]) * maxRows];
+ b.cellBlocks[i].set(array);
+ }
+ return b;
+ }
+
+ final GTInfo info;
+
+ int seqId; // 0, 1, 2...
+ int nRows;
+ ByteArray primaryKey; // the primary key of the first (smallest) row
+ ByteArray[] cellBlocks; // cells for each column block
+
+ /** create a row block that has no underlying space */
+ public GTRowBlock(GTInfo info) {
+ this.info = info;
+ this.primaryKey = new ByteArray();
+ this.cellBlocks = new ByteArray[info.colBlocks.length];
+ for (int i = 0; i < this.cellBlocks.length; i++) {
+ this.cellBlocks[i] = new ByteArray();
+ }
+ }
+
+ public int getSequenceId() {
+ return seqId;
+ }
+
+ public ByteArray getPrimaryKey() {
+ return primaryKey;
+ }
+
+ public ByteArray getCellBlock(int i) {
+ return cellBlocks[i];
+ }
+
+ public Writer getWriter() {
+ return new Writer();
+ }
+
+ public class Writer {
+ ByteBuffer[] cellBlockBuffers;
+
+ Writer() {
+ cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
+ for (int i = 0; i < cellBlockBuffers.length; i++) {
+ cellBlockBuffers[i] = cellBlocks[i].asBuffer();
+ }
+ }
+
+ public void copyFrom(GTRowBlock other) {
+ assert info == other.info;
+
+ seqId = other.seqId;
+ nRows = other.nRows;
+ primaryKey.copyFrom(other.primaryKey);
+ for (int i = 0; i < info.colBlocks.length; i++) {
+ cellBlockBuffers[i].clear();
+ cellBlockBuffers[i].put(other.cellBlocks[i].array(), other.cellBlocks[i].offset(), other.cellBlocks[i].length());
+ }
+ }
+
+ public void append(GTRecord r) {
+ // add record to block
+ if (isEmpty()) {
+ r.exportColumns(info.primaryKey, primaryKey);
+ }
+ for (int i = 0; i < info.colBlocks.length; i++) {
+ r.exportColumnBlock(i, cellBlockBuffers[i]);
+ }
+ nRows++;
+ }
+
+ public void readyForFlush() {
+ for (int i = 0; i < cellBlocks.length; i++) {
+ cellBlocks[i].setLength(cellBlockBuffers[i].position());
+ }
+ }
+
+ public void clearForNext() {
+ seqId++;
+ nRows = 0;
+ for (int i = 0; i < cellBlockBuffers.length; i++) {
+ cellBlockBuffers[i].clear();
+ }
+ }
+ }
+
+ public Reader getReader() {
+ return new Reader(info.colBlocksAll);
+ }
+
+ public Reader getReader(ImmutableBitSet selectedColBlocks) {
+ return new Reader(selectedColBlocks);
+ }
+
+ public class Reader {
+ int cur;
+ ByteBuffer primaryKeyBuffer;
+ ByteBuffer[] cellBlockBuffers;
+ ImmutableBitSet selectedColBlocks;
+
+ Reader(ImmutableBitSet selectedColBlocks) {
+ primaryKeyBuffer = primaryKey.asBuffer();
+ cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
+ for (int i = 0; i < cellBlockBuffers.length; i++) {
+ cellBlockBuffers[i] = cellBlocks[i].asBuffer();
+ }
+ this.selectedColBlocks = selectedColBlocks;
+ }
+
+ public boolean hasNext() {
+ return cur < nRows;
+ }
+
+ public void fetchNext(GTRecord result) {
+ if (hasNext() == false)
+ throw new IllegalArgumentException();
+
+ for (int i = 0; i < selectedColBlocks.trueBitCount(); i++) {
+ int c = selectedColBlocks.trueBitAt(i);
+ result.loadCellBlock(c, cellBlockBuffers[c]);
+ }
+ cur++;
+ }
+ }
+
+ public GTRowBlock copy() {
+ GTRowBlock copy = new GTRowBlock(info);
+
+ ByteBuffer buf = ByteBuffer.allocate(this.exportLength());
+ this.export(buf);
+ buf.clear();
+ copy.load(buf);
+
+ return copy;
+ }
+
+ public boolean isEmpty() {
+ return nRows == 0;
+ }
+
+ public boolean isFull() {
+ if (info.isRowBlockEnabled())
+ return nRows >= info.rowBlockSize;
+ else
+ return nRows > 0;
+ }
+
+ public int getNumberOfRows() {
+ return nRows;
+ }
+
+ public void setNumberOfRows(int nRows) {
+ this.nRows = nRows;
+ }
+
+ // ============================================================================
+
+ public int exportLength() {
+ int len = 4; // seq Id
+ if (info.isRowBlockEnabled())
+ len += 4; // nRows
+ len += 4 + primaryKey.length(); // PK byte array
+ for (ByteArray array : cellBlocks) {
+ len += 4 + array.length(); // cell block byte array
+ }
+ return len;
+ }
+
+ /** write data to given output stream, like serialize */
+ public void export(DataOutputStream out) throws IOException {
+ out.writeInt(seqId);
+ if (info.isRowBlockEnabled())
+ out.writeInt(nRows);
+ export(out, primaryKey);
+ for (ByteArray cb : cellBlocks) {
+ export(out, cb);
+ }
+ }
+
+ private void export(DataOutputStream out, ByteArray array) throws IOException {
+ out.writeInt(array.length());
+ out.write(array.array(), array.offset(), array.length());
+ }
+
+ /** write data to given buffer, like serialize */
+ public void export(ByteBuffer buf) {
+ buf.putInt(seqId);
+ if (info.isRowBlockEnabled())
+ buf.putInt(nRows);
+ export(primaryKey, buf);
+ for (ByteArray cb : cellBlocks) {
+ export(cb, buf);
+ }
+ }
+
+ private void export(ByteArray array, ByteBuffer buf) {
+ buf.putInt(array.length());
+ buf.put(array.array(), array.offset(), array.length());
+ }
+
+ /** read data from given input stream, like deserialize */
+ public void importFrom(DataInputStream in) throws IOException {
+ seqId = in.readInt();
+ nRows = info.isRowBlockEnabled() ? in.readInt() : 1;
+ importFrom(in, primaryKey);
+ for (int i = 0; i < info.colBlocks.length; i++) {
+ ByteArray cb = cellBlocks[i];
+ importFrom(in, cb);
+ }
+ }
+
+ private void importFrom(DataInputStream in, ByteArray result) throws IOException {
+ byte[] data = result.array();
+ int len = in.readInt();
+ in.read(data, 0, len);
+ result.set(data, 0, len);
+ }
+
+ /** change pointers to point to data in given buffer, UNLIKE deserialize */
+ public void load(ByteBuffer buf) {
+ seqId = buf.getInt();
+ nRows = info.isRowBlockEnabled() ? buf.getInt() : 1;
+ load(primaryKey, buf);
+ for (int i = 0; i < info.colBlocks.length; i++) {
+ ByteArray cb = cellBlocks[i];
+ load(cb, buf);
+ }
+ }
+
+ private void load(ByteArray array, ByteBuffer buf) {
+ int len = buf.getInt();
+ int pos = buf.position();
+ array.set(buf.array(), buf.arrayOffset() + pos, len);
+ buf.position(pos + len);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
new file mode 100644
index 0000000..e6e00b7
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -0,0 +1,101 @@
+package org.apache.kylin.gridtable;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+/**
+ * This is just for example and is INCORRECT when numbers are encoded to bytes and compared in filter.
+ *
+ * A correct implementation must ensure dimension values preserve order after encoded, e.g. by using an
+ * order preserving dictionary.
+ *
+ * @author yangli9
+ */
+public class GTSampleCodeSystem implements IGTCodeSystem {
+
+ private GTInfo info;
+ private DataTypeSerializer[] serializers;
+ private IGTComparator comparator;
+
+ public GTSampleCodeSystem() {
+ }
+
+ @Override
+ public void init(GTInfo info) {
+ this.info = info;
+
+ this.serializers = new DataTypeSerializer[info.getColumnCount()];
+ for (int i = 0; i < info.getColumnCount(); i++) {
+ this.serializers[i] = DataTypeSerializer.create(info.colTypes[i]);
+ }
+
+ this.comparator = new IGTComparator() {
+ @Override
+ public boolean isNull(ByteArray code) {
+ // all 0xff is null
+ byte[] array = code.array();
+ for (int i = 0, j = code.offset(), n = code.length(); i < n; i++, j++) {
+ if (array[j] != (byte) 0xff)
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int compare(ByteArray code1, ByteArray code2) {
+ return code1.compareTo(code2);
+ }
+ };
+ }
+
+ @Override
+ public int codeLength(int col, ByteBuffer buf) {
+ return serializers[col].peekLength(buf);
+ }
+
+ @Override
+ public int maxCodeLength(int col) {
+ return serializers[col].maxLength();
+ }
+
+ @Override
+ public IGTComparator getComparator() {
+ return comparator;
+ }
+
+ // ============================================================================
+
+ @Override
+ public MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions) {
+ assert columns.trueBitCount() == aggrFunctions.length;
+
+ MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length];
+ for (int i = 0; i < result.length; i++) {
+ int col = columns.trueBitAt(i);
+ result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString());
+ }
+ return result;
+ }
+
+ @Override
+ public void encodeColumnValue(int col, Object value, ByteBuffer buf) {
+ serializers[col].serialize(value, buf);
+ }
+
+ @Override
+ public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
+ // ignore rounding
+ encodeColumnValue(col, value, buf);
+ }
+
+ @Override
+ public Object decodeColumnValue(int col, ByteBuffer buf) {
+ return serializers[col].deserialize(buf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
new file mode 100644
index 0000000..197fde4
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.gridtable;
+
+import java.util.Collections;
+import java.util.List;
+
+public class GTScanRange {
+
+ final public GTRecord pkStart; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
+ final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
+ final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys
+
+ public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
+ this(pkStart, pkEnd, null);
+ }
+
+ public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) {
+ GTInfo info = pkStart.info;
+ assert info == pkEnd.info;
+
+ validateRangeKey(pkStart);
+ validateRangeKey(pkEnd);
+
+ this.pkStart = pkStart;
+ this.pkEnd = pkEnd;
+ this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord> emptyList() : hbaseFuzzyKeys;
+ }
+
+ private void validateRangeKey(GTRecord pk) {
+ pk.maskForEqualHashComp(pk.info.primaryKey);
+ boolean afterNull = false;
+ for (int i = 0; i < pk.info.primaryKey.trueBitCount(); i++) {
+ int c = pk.info.primaryKey.trueBitAt(i);
+ if (afterNull) {
+ pk.cols[c].set(null, 0, 0);
+ } else {
+ afterNull = pk.cols[c].array() == null;
+ }
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode());
+ result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode());
+ result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ GTScanRange other = (GTScanRange) obj;
+ if (hbaseFuzzyKeys == null) {
+ if (other.hbaseFuzzyKeys != null)
+ return false;
+ } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys))
+ return false;
+ if (pkEnd == null) {
+ if (other.pkEnd != null)
+ return false;
+ } else if (!pkEnd.equals(other.pkEnd))
+ return false;
+ if (pkStart == null) {
+ if (other.pkStart != null)
+ return false;
+ } else if (!pkStart.equals(other.pkStart))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return (pkStart == null ? "null" : pkStart.toString(pkStart.info.primaryKey)) //
+ + "-" + (pkEnd == null ? "null" : pkEnd.toString(pkEnd.info.primaryKey));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
new file mode 100644
index 0000000..c09ecf0
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -0,0 +1,486 @@
+package org.apache.kylin.gridtable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class GTScanRangePlanner {
+
+ private static final int MAX_HBASE_FUZZY_KEYS = 100;
+
+ final private GTInfo info;
+ final private ComparatorEx<ByteArray> byteUnknownIsSmaller;
+ final private ComparatorEx<ByteArray> byteUnknownIsBigger;
+ final private ComparatorEx<GTRecord> recordUnknownIsSmaller;
+ final private ComparatorEx<GTRecord> recordUnknownIsBigger;
+
+ public GTScanRangePlanner(GTInfo info) {
+ this.info = info;
+
+ IGTComparator comp = info.codeSystem.getComparator();
+ this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(comp);
+ this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(comp);
+ this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(comp);
+ this.recordUnknownIsBigger = recordComparatorTreatsUnknownBigger(comp);
+ }
+
+ // return empty list meaning filter is always false
+ public List<GTScanRange> planScanRanges(TupleFilter filter) {
+ return planScanRanges(filter, Integer.MAX_VALUE);
+ }
+
+ // return empty list meaning filter is always false
+ public List<GTScanRange> planScanRanges(TupleFilter filter, int maxRanges) {
+
+ TupleFilter flatFilter = flattenToOrAndFilter(filter);
+
+ List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
+
+ List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
+ for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
+ GTScanRange scanRange = newScanRange(andDimRanges);
+ scanRanges.add(scanRange);
+ }
+
+ List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
+ mergedRanges = mergeTooManyRanges(mergedRanges, maxRanges);
+
+ return mergedRanges;
+ }
+
+ private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
+ GTRecord pkStart = new GTRecord(info);
+ GTRecord pkEnd = new GTRecord(info);
+ List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList();
+
+ for (ColumnRange range : andDimRanges) {
+ int col = range.column.getColumnDesc().getZeroBasedIndex();
+ if (info.primaryKey.get(col) == false)
+ continue;
+
+ pkStart.set(col, range.begin);
+ pkEnd.set(col, range.end);
+
+ if (range.equals != null) {
+ ImmutableBitSet fuzzyMask = new ImmutableBitSet(col);
+ for (ByteArray v : range.equals) {
+ GTRecord fuzzy = new GTRecord(info);
+ fuzzy.set(col, v);
+ fuzzy.maskForEqualHashComp(fuzzyMask);
+ hbaseFuzzyKeys.add(fuzzy);
+ }
+ }
+ }
+
+ return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys);
+ }
+
+ private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
+ if (filter == null)
+ return null;
+
+ TupleFilter flatFilter = filter.flatFilter();
+
+ // normalize to OR-AND filter
+ if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
+ LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
+ f.addChild(flatFilter);
+ flatFilter = f;
+ }
+
+ if (flatFilter.getOperator() != FilterOperatorEnum.OR)
+ throw new IllegalStateException();
+
+ return flatFilter;
+ }
+
+ private List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) {
+ List<Collection<ColumnRange>> result = Lists.newArrayList();
+
+ if (flatFilter == null) {
+ result.add(Collections.<ColumnRange> emptyList());
+ return result;
+ }
+
+ for (TupleFilter andFilter : flatFilter.getChildren()) {
+ if (andFilter.getOperator() != FilterOperatorEnum.AND)
+ throw new IllegalStateException("Filter should be AND instead of " + andFilter);
+
+ Collection<ColumnRange> andRanges = translateToAndDimRanges(andFilter.getChildren());
+ result.add(andRanges);
+ }
+
+ return preEvaluateConstantConditions(result);
+ }
+
+ private Collection<ColumnRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters) {
+ Map<TblColRef, ColumnRange> rangeMap = new HashMap<TblColRef, ColumnRange>();
+ for (TupleFilter filter : andFilters) {
+ if ((filter instanceof CompareTupleFilter) == false) {
+ continue;
+ }
+
+ CompareTupleFilter comp = (CompareTupleFilter) filter;
+ if (comp.getColumn() == null) {
+ continue;
+ }
+
+ @SuppressWarnings("unchecked")
+ ColumnRange newRange = new ColumnRange(comp.getColumn(), (Set<ByteArray>) comp.getValues(), comp.getOperator());
+ ColumnRange existing = rangeMap.get(newRange.column);
+ if (existing == null) {
+ rangeMap.put(newRange.column, newRange);
+ } else {
+ existing.andMerge(newRange);
+ }
+ }
+ return rangeMap.values();
+ }
+
+ private List<Collection<ColumnRange>> preEvaluateConstantConditions(List<Collection<ColumnRange>> orAndRanges) {
+ boolean globalAlwaysTrue = false;
+ Iterator<Collection<ColumnRange>> iterator = orAndRanges.iterator();
+ while (iterator.hasNext()) {
+ Collection<ColumnRange> andRanges = iterator.next();
+ Iterator<ColumnRange> iterator2 = andRanges.iterator();
+ boolean hasAlwaysFalse = false;
+ while (iterator2.hasNext()) {
+ ColumnRange range = iterator2.next();
+ if (range.satisfyAll())
+ iterator2.remove();
+ else if (range.satisfyNone())
+ hasAlwaysFalse = true;
+ }
+ if (hasAlwaysFalse) {
+ iterator.remove();
+ } else if (andRanges.isEmpty()) {
+ globalAlwaysTrue = true;
+ break;
+ }
+ }
+ // return empty OR list means global false
+ // return an empty AND collection inside OR list means global true
+ if (globalAlwaysTrue) {
+ orAndRanges.clear();
+ orAndRanges.add(Collections.<ColumnRange> emptyList());
+ }
+ return orAndRanges;
+ }
+
+ private List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
+ if (ranges.size() <= 1) {
+ return ranges;
+ }
+
+ // sort ranges by start key
+ Collections.sort(ranges, new Comparator<GTScanRange>() {
+ @Override
+ public int compare(GTScanRange a, GTScanRange b) {
+ return recordUnknownIsSmaller.compare(a.pkStart, b.pkStart);
+ }
+ });
+
+ // merge the overlap range
+ List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
+ int mergeBeginIndex = 0;
+ GTRecord mergeEnd = ranges.get(0).pkEnd;
+ for (int index = 0; index < ranges.size(); index++) {
+ GTScanRange range = ranges.get(index);
+
+ // if overlap, swallow it
+ if (recordUnknownIsSmaller.min(range.pkStart, mergeEnd) == range.pkStart //
+ || recordUnknownIsBigger.max(mergeEnd, range.pkStart) == mergeEnd) {
+ mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+ continue;
+ }
+
+ // not overlap, split here
+ GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index));
+ mergedRanges.add(mergedRange);
+
+ // start new split
+ mergeBeginIndex = index;
+ mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+ }
+
+ // don't miss the last range
+ GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size()));
+ mergedRanges.add(mergedRange);
+
+ return mergedRanges;
+ }
+
+ private GTScanRange mergeKeyRange(List<GTScanRange> ranges) {
+ GTScanRange first = ranges.get(0);
+ if (ranges.size() == 1)
+ return first;
+
+ GTRecord start = first.pkStart;
+ GTRecord end = first.pkEnd;
+ List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>();
+
+ boolean hasNonFuzzyRange = false;
+ for (GTScanRange range : ranges) {
+ hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty();
+ newFuzzyKeys.addAll(range.hbaseFuzzyKeys);
+ end = recordUnknownIsBigger.max(end, range.pkEnd);
+ }
+
+ // if any range is non-fuzzy, then all fuzzy keys must be cleared
+ // also too many fuzzy keys will slow down HBase scan
+ if (hasNonFuzzyRange || newFuzzyKeys.size() > MAX_HBASE_FUZZY_KEYS) {
+ newFuzzyKeys.clear();
+ }
+
+ return new GTScanRange(start, end, newFuzzyKeys);
+ }
+
+ private List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
+ if (ranges.size() <= maxRanges) {
+ return ranges;
+ }
+
+ // TODO: check the distance between range and merge the large distance range
+ List<GTScanRange> result = new ArrayList<GTScanRange>(1);
+ GTScanRange mergedRange = mergeKeyRange(ranges);
+ result.add(mergedRange);
+ return result;
+ }
+
+ private class ColumnRange {
+ private TblColRef column;
+ private ByteArray begin = ByteArray.EMPTY;
+ private ByteArray end = ByteArray.EMPTY;
+ private Set<ByteArray> equals;
+
+ public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
+ this.column = column;
+
+ switch (op) {
+ case EQ:
+ case IN:
+ equals = new HashSet<ByteArray>(values);
+ refreshBeginEndFromEquals();
+ break;
+ case LT:
+ case LTE:
+ end = byteUnknownIsBigger.max(values);
+ break;
+ case GT:
+ case GTE:
+ begin = byteUnknownIsSmaller.min(values);
+ break;
+ case NEQ:
+ case NOTIN:
+ case ISNULL:
+ case ISNOTNULL:
+ // let Optiq filter it!
+ break;
+ default:
+ throw new UnsupportedOperationException(op.name());
+ }
+ }
+
+ void copy(TblColRef column, ByteArray beginValue, ByteArray endValue, Set<ByteArray> equalValues) {
+ this.column = column;
+ this.begin = beginValue;
+ this.end = endValue;
+ this.equals = equalValues;
+ }
+
+ private void refreshBeginEndFromEquals() {
+ if (equals.isEmpty()) {
+ begin = ByteArray.EMPTY;
+ end = ByteArray.EMPTY;
+ } else {
+ begin = byteUnknownIsSmaller.min(equals);
+ end = byteUnknownIsBigger.max(equals);
+ }
+ }
+
+ public boolean satisfyAll() {
+ return begin.array() == null && end.array() == null; // the NEQ case
+ }
+
+ public boolean satisfyNone() {
+ if (equals != null) {
+ return equals.isEmpty();
+ } else if (begin.array() != null && end.array() != null) {
+ return info.codeSystem.getComparator().compare(begin, end) > 0;
+ } else {
+ return false;
+ }
+ }
+
+ public void andMerge(ColumnRange another) {
+ assert this.column.equals(another.column);
+
+ if (another.satisfyAll()) {
+ return;
+ }
+
+ if (this.satisfyAll()) {
+ copy(another.column, another.begin, another.end, another.equals);
+ return;
+ }
+
+ if (this.equals != null && another.equals != null) {
+ this.equals.retainAll(another.equals);
+ refreshBeginEndFromEquals();
+ return;
+ }
+
+ if (this.equals != null) {
+ this.equals = filter(this.equals, another.begin, another.end);
+ refreshBeginEndFromEquals();
+ return;
+ }
+
+ if (another.equals != null) {
+ this.equals = filter(another.equals, this.begin, this.end);
+ refreshBeginEndFromEquals();
+ return;
+ }
+
+ this.begin = byteUnknownIsSmaller.max(this.begin, another.begin);
+ this.end = byteUnknownIsBigger.min(this.end, another.end);
+ }
+
+ private Set<ByteArray> filter(Set<ByteArray> equalValues, ByteArray beginValue, ByteArray endValue) {
+ Set<ByteArray> result = Sets.newHashSetWithExpectedSize(equalValues.size());
+ for (ByteArray v : equalValues) {
+ if (byteUnknownIsSmaller.compare(beginValue, v) <= 0 && byteUnknownIsBigger.compare(v, endValue) <= 0) {
+ result.add(v);
+ }
+ }
+ return equalValues;
+ }
+
+ public String toString() {
+ if (equals == null) {
+ return column.getName() + " between " + begin + " and " + end;
+ } else {
+ return column.getName() + " in " + equals;
+ }
+ }
+ }
+
+ public static abstract class ComparatorEx<T> implements Comparator<T> {
+
+ public T min(Collection<T> v) {
+ if (v.size() <= 0) {
+ return null;
+ }
+
+ Iterator<T> iterator = v.iterator();
+ T min = iterator.next();
+ while (iterator.hasNext()) {
+ min = min(min, iterator.next());
+ }
+ return min;
+ }
+
+ public T max(Collection<T> v) {
+ if (v.size() <= 0) {
+ return null;
+ }
+
+ Iterator<T> iterator = v.iterator();
+ T max = iterator.next();
+ while (iterator.hasNext()) {
+ max = max(max, iterator.next());
+ }
+ return max;
+ }
+
+ public T min(T a, T b) {
+ return compare(a, b) <= 0 ? a : b;
+ }
+
+ public T max(T a, T b) {
+ return compare(a, b) >= 0 ? a : b;
+ }
+
+ public boolean between(T v, T start, T end) {
+ return compare(start, v) <= 0 && compare(v, end) <= 0;
+ }
+ }
+
+ public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownSmaller(final IGTComparator comp) {
+ return new ComparatorEx<ByteArray>() {
+ @Override
+ public int compare(ByteArray a, ByteArray b) {
+ if (a.array() == null)
+ return -1;
+ else if (b.array() == null)
+ return 1;
+ else
+ return comp.compare(a, b);
+ }
+ };
+ }
+
+ public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownBigger(final IGTComparator comp) {
+ return new ComparatorEx<ByteArray>() {
+ @Override
+ public int compare(ByteArray a, ByteArray b) {
+ if (a.array() == null)
+ return 1;
+ else if (b.array() == null)
+ return -1;
+ else
+ return comp.compare(a, b);
+ }
+ };
+ }
+
+ public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownSmaller(IGTComparator comp) {
+ return new RecordComparator(byteComparatorTreatsUnknownSmaller(comp));
+ }
+
+ public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownBigger(IGTComparator comp) {
+ return new RecordComparator(byteComparatorTreatsUnknownBigger(comp));
+ }
+
+ private static class RecordComparator extends ComparatorEx<GTRecord> {
+ final ComparatorEx<ByteArray> comparator;
+
+ RecordComparator(ComparatorEx<ByteArray> byteComparator) {
+ this.comparator = byteComparator;
+ }
+
+ @Override
+ public int compare(GTRecord a, GTRecord b) {
+ assert a.info == b.info;
+ assert a.maskForEqualHashComp() == b.maskForEqualHashComp();
+ ImmutableBitSet mask = a.maskForEqualHashComp();
+
+ int comp = 0;
+ for (int i = 0; i < mask.trueBitCount(); i++) {
+ int c = mask.trueBitAt(i);
+ comp = comparator.compare(a.cols[c], b.cols[c]);
+ if (comp != 0)
+ return comp;
+ }
+ return 0; // equals
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
new file mode 100644
index 0000000..c5443b6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -0,0 +1,155 @@
+package org.apache.kylin.gridtable;
+
+import java.util.Arrays;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Sets;
+
+public class GTScanRequest {
+
+ // basic
+ private GTInfo info;
+ private GTScanRange range;
+ private ImmutableBitSet columns;
+
+ // optional filtering
+ private TupleFilter filterPushDown;
+
+ // optional aggregation
+ private ImmutableBitSet aggrGroupBy;
+ private ImmutableBitSet aggrMetrics;
+ private String[] aggrMetricsFuncs;
+
+ public GTScanRequest(GTInfo info) {
+ this(info, null, null, null);
+ }
+
+ public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet columns, TupleFilter filterPushDown) {
+ this.info = info;
+ this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
+ this.columns = columns;
+ this.filterPushDown = filterPushDown;
+ validate();
+ }
+
+ public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet aggrGroupBy, ImmutableBitSet aggrMetrics, //
+ String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
+ this(info, range, null, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown);
+ }
+
+ public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+ ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
+ this.info = info;
+ this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
+ this.columns = dimensions;
+ this.filterPushDown = filterPushDown;
+
+ this.aggrGroupBy = aggrGroupBy;
+ this.aggrMetrics = aggrMetrics;
+ this.aggrMetricsFuncs = aggrMetricsFuncs;
+
+ validate();
+ }
+
+ private void validate() {
+ if (range == null)
+ range = new GTScanRange(null, null);
+
+ if (hasAggregation()) {
+ if (aggrGroupBy.intersects(aggrMetrics))
+ throw new IllegalStateException();
+ if (aggrMetrics.cardinality() != aggrMetricsFuncs.length)
+ throw new IllegalStateException();
+
+ if (columns == null)
+ columns = ImmutableBitSet.EMPTY;
+ columns = columns.or(aggrGroupBy);
+ columns = columns.or(aggrMetrics);
+ }
+
+ if (columns == null)
+ columns = info.colAll;
+
+ if (hasFilterPushDown()) {
+ validateFilterPushDown();
+ }
+ }
+
+ private void validateFilterPushDown() {
+ if (hasFilterPushDown() == false)
+ return;
+
+ Set<TblColRef> filterColumns = Sets.newHashSet();
+ TupleFilter.collectColumns(filterPushDown, filterColumns);
+
+ for (TblColRef col : filterColumns) {
+ // filter columns must belong to the table
+ info.validateColRef(col);
+ // filter columns must be returned to satisfy upper layer evaluation (calcite)
+ columns = columns.set(col.getColumnDesc().getZeroBasedIndex());
+ }
+
+ // un-evaluatable filter must be removed
+ if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) {
+ Set<TblColRef> unevaluableColumns = Sets.newHashSet();
+ filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns);
+
+ // columns in un-evaluatable filter must be returned without loss so upper layer can do final evaluation
+ if (hasAggregation()) {
+ for (TblColRef col : unevaluableColumns) {
+ aggrGroupBy = aggrGroupBy.set(col.getColumnDesc().getZeroBasedIndex());
+ }
+ }
+ }
+ }
+
+ public boolean hasFilterPushDown() {
+ return filterPushDown != null;
+ }
+
+ public boolean hasAggregation() {
+ return aggrGroupBy != null && aggrMetrics != null && aggrMetricsFuncs != null;
+ }
+
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ public GTRecord getPkStart() {
+ return range.pkStart;
+ }
+
+ public GTRecord getPkEnd() {
+ return range.pkEnd;
+ }
+
+ public ImmutableBitSet getColumns() {
+ return columns;
+ }
+
+ public TupleFilter getFilterPushDown() {
+ return filterPushDown;
+ }
+
+ public ImmutableBitSet getAggrGroupBy() {
+ return aggrGroupBy;
+ }
+
+ public ImmutableBitSet getAggrMetrics() {
+ return aggrMetrics;
+ }
+
+ public String[] getAggrMetricsFuncs() {
+ return aggrMetricsFuncs;
+ }
+
+ @Override
+ public String toString() {
+ return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
new file mode 100644
index 0000000..94ac755
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -0,0 +1,221 @@
+package org.apache.kylin.gridtable;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilterSerializer;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Sets;
+
+public class GTUtil {
+
+ static final TableDesc MOCKUP_TABLE = TableDesc.mockup("GT_MOCKUP_TABLE");
+
+ static TblColRef tblColRef(int col, String datatype) {
+ ColumnDesc desc = ColumnDesc.mockup(MOCKUP_TABLE, col + 1, "" + col, datatype);
+ return new TblColRef(desc);
+ }
+
+ public static TupleFilter convertFilterUnevaluatable(TupleFilter rootFilter, GTInfo info, //
+ Set<TblColRef> unevaluatableColumnCollector) {
+ return convertFilter(rootFilter, info, null, false, unevaluatableColumnCollector);
+ }
+
+ public static TupleFilter convertFilterConstants(TupleFilter rootFilter, GTInfo info) {
+ return convertFilter(rootFilter, info, null, true, null);
+ }
+
+ public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
+ List<TblColRef> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
+ return convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);
+ }
+
+ // converts TblColRef to GridTable column, encode constants, drop unEvaluatable parts
+ private static TupleFilter convertFilter(TupleFilter rootFilter, final GTInfo info, //
+ final List<TblColRef> colMapping, final boolean encodeConstants, //
+ final Set<TblColRef> unevaluatableColumnCollector) {
+
+ IFilterCodeSystem<ByteArray> filterCodeSystem = wrap(info.codeSystem.getComparator());
+
+ byte[] bytes = TupleFilterSerializer.serialize(rootFilter, new TupleFilterSerializer.Decorator() {
+ @Override
+ public TupleFilter onSerialize(TupleFilter filter) {
+ if (filter == null)
+ return null;
+
+ // In case of NOT(unEvaluatableFilter), we should immediately replace it as TRUE,
+ // Otherwise, unEvaluatableFilter will later be replace with TRUE and NOT(unEvaluatableFilter)
+ // will always return FALSE.
+ if (filter.getOperator() == TupleFilter.FilterOperatorEnum.NOT && !TupleFilter.isEvaluableRecursively(filter)) {
+ TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
+ return ConstantTupleFilter.TRUE;
+ }
+
+ // shortcut for unEvaluatable filter
+ if (filter.isEvaluable() == false) {
+ TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
+ return ConstantTupleFilter.TRUE;
+ }
+
+ // map to column onto grid table
+ if (colMapping != null && filter instanceof ColumnTupleFilter) {
+ ColumnTupleFilter colFilter = (ColumnTupleFilter) filter;
+ int gtColIdx = colMapping.indexOf(colFilter.getColumn());
+ return new ColumnTupleFilter(info.colRef(gtColIdx));
+ }
+
+ // encode constants
+ if (encodeConstants && filter instanceof CompareTupleFilter) {
+ return encodeConstants((CompareTupleFilter) filter);
+ }
+
+ return filter;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private TupleFilter encodeConstants(CompareTupleFilter oldCompareFilter) {
+ // extract ColumnFilter & ConstantFilter
+ TblColRef externalCol = oldCompareFilter.getColumn();
+
+ if (externalCol == null) {
+ return oldCompareFilter;
+ }
+
+ Collection constValues = oldCompareFilter.getValues();
+ if (constValues == null || constValues.isEmpty()) {
+ return oldCompareFilter;
+ }
+
+ CompareTupleFilter newCompareFilter = new CompareTupleFilter(oldCompareFilter.getOperator());
+ newCompareFilter.addChild(new ColumnTupleFilter(externalCol));
+
+ Object firstValue = constValues.iterator().next();
+ int col = colMapping == null ? externalCol.getColumnDesc().getZeroBasedIndex() : colMapping.indexOf(externalCol);
+
+ TupleFilter result;
+ ByteArray code;
+
+ // translate constant into code
+ switch (newCompareFilter.getOperator()) {
+ case EQ:
+ case IN:
+ Set newValues = Sets.newHashSet();
+ for (Object value : constValues) {
+ code = translate(col, value, 0);
+ if (code != null)
+ newValues.add(code);
+ }
+ if (newValues.isEmpty()) {
+ result = ConstantTupleFilter.FALSE;
+ } else {
+ newCompareFilter.addChild(new ConstantTupleFilter(newValues));
+ result = newCompareFilter;
+ }
+ break;
+ case NEQ:
+ code = translate(col, firstValue, 0);
+ if (code == null) {
+ result = ConstantTupleFilter.TRUE;
+ } else {
+ newCompareFilter.addChild(new ConstantTupleFilter(code));
+ result = newCompareFilter;
+ }
+ break;
+ case LT:
+ code = translate(col, firstValue, 1);
+ if (code == null) {
+ result = ConstantTupleFilter.TRUE;
+ } else {
+ newCompareFilter.addChild(new ConstantTupleFilter(code));
+ result = newCompareFilter;
+ }
+ break;
+ case LTE:
+ code = translate(col, firstValue, -1);
+ if (code == null) {
+ result = ConstantTupleFilter.FALSE;
+ } else {
+ newCompareFilter.addChild(new ConstantTupleFilter(code));
+ result = newCompareFilter;
+ }
+ break;
+ case GT:
+ code = translate(col, firstValue, -1);
+ if (code == null) {
+ result = ConstantTupleFilter.TRUE;
+ } else {
+ newCompareFilter.addChild(new ConstantTupleFilter(code));
+ result = newCompareFilter;
+ }
+ break;
+ case GTE:
+ code = translate(col, firstValue, 1);
+ if (code == null) {
+ result = ConstantTupleFilter.FALSE;
+ } else {
+ newCompareFilter.addChild(new ConstantTupleFilter(code));
+ result = newCompareFilter;
+ }
+ break;
+ default:
+ throw new IllegalStateException("Cannot handle operator " + newCompareFilter.getOperator());
+ }
+ return result;
+ }
+
+ transient ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+
+ private ByteArray translate(int col, Object value, int roundingFlag) {
+ try {
+ buf.clear();
+ info.codeSystem.encodeColumnValue(col, value, roundingFlag, buf);
+ return ByteArray.copyOf(buf.array(), 0, buf.position());
+ } catch (IllegalArgumentException ex) {
+ return null;
+ }
+ }
+ }, filterCodeSystem);
+
+ return TupleFilterSerializer.deserialize(bytes, filterCodeSystem);
+ }
+
+ public static IFilterCodeSystem<ByteArray> wrap(final IGTComparator comp) {
+ return new IFilterCodeSystem<ByteArray>() {
+
+ @Override
+ public int compare(ByteArray o1, ByteArray o2) {
+ return comp.compare(o1, o2);
+ }
+
+ @Override
+ public boolean isNull(ByteArray code) {
+ return comp.isNull(code);
+ }
+
+ @Override
+ public void serialize(ByteArray code, ByteBuffer buffer) {
+ if (code == null)
+ BytesUtil.writeByteArray(null, 0, 0, buffer);
+ else
+ BytesUtil.writeByteArray(code.array(), code.offset(), code.length(), buffer);
+ }
+
+ @Override
+ public ByteArray deserialize(ByteBuffer buffer) {
+ return new ByteArray(BytesUtil.readByteArray(buffer));
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
new file mode 100644
index 0000000..b8e59f6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
@@ -0,0 +1,61 @@
+package org.apache.kylin.gridtable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class GridTable implements Closeable {
+
+ final GTInfo info;
+ final IGTStore store;
+
+ public GridTable(GTInfo info, IGTStore store) {
+ this.info = info;
+ this.store = store;
+ }
+
+ public GTBuilder rebuild() throws IOException {
+ assert info.isShardingEnabled() == false;
+ return rebuild(-1);
+ }
+
+ public GTBuilder rebuild(int shard) throws IOException {
+ assert shard < info.nShards;
+ return new GTBuilder(info, shard, store);
+ }
+
+ public GTBuilder append() throws IOException {
+ assert info.isShardingEnabled() == false;
+ return append(-1);
+ }
+
+ public GTBuilder append(int shard) throws IOException {
+ return new GTBuilder(info, shard, store, true);
+ }
+
+ public IGTScanner scan(GTScanRequest req) throws IOException {
+ IGTScanner result = new GTRawScanner(info, store, req);
+
+ if (req.hasFilterPushDown()) {
+ result = new GTFilterScanner(result, req);
+ }
+ if (req.hasAggregation()) {
+ result = new GTAggregateScanner(result, req);
+ }
+ return result;
+ }
+
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ public IGTStore getStore() {
+ return store;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (store instanceof Closeable) {
+ ((Closeable) store).close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
new file mode 100644
index 0000000..bb715b2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
@@ -0,0 +1,43 @@
+package org.apache.kylin.gridtable;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+
+public interface IGTCodeSystem {
+
+ void init(GTInfo info);
+
+ IGTComparator getComparator();
+
+ /** Return the length of code starting at the specified buffer, buffer position must not change after return */
+ int codeLength(int col, ByteBuffer buf);
+
+ /** Return the max possible length of a column */
+ int maxCodeLength(int col);
+
+ /**
+ * Encode a value into code.
+ *
+ * @throws IllegalArgumentException if the value is not in dictionary
+ */
+ void encodeColumnValue(int col, Object value, ByteBuffer buf) throws IllegalArgumentException;
+
+ /**
+ * Encode a value into code, with option to floor rounding -1, no rounding 0, or ceiling rounding 1
+ *
+ * @throws IllegalArgumentException
+ * - if rounding=0 and the value is not in dictionary
+ * - if rounding=-1 and there's no equal or smaller value in dictionary
+ * - if rounding=1 and there's no equal or bigger value in dictionary
+ */
+ void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) throws IllegalArgumentException;
+
+ /** Decode a code into value */
+ Object decodeColumnValue(int col, ByteBuffer buf);
+
+ /** Return aggregators for metrics */
+ MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/IGTComparator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTComparator.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTComparator.java
new file mode 100644
index 0000000..4edc4db
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTComparator.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.gridtable;
+
+import java.util.Comparator;
+
+import org.apache.kylin.common.util.ByteArray;
+
+public interface IGTComparator extends Comparator<ByteArray> {
+
+ /** if given code represents the NULL value */
+ boolean isNull(ByteArray code);
+
+ /** compare two values by their codes */
+ // int compare(T code1, T code2);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/IGTScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTScanner.java
new file mode 100644
index 0000000..42e2dec
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTScanner.java
@@ -0,0 +1,13 @@
+package org.apache.kylin.gridtable;
+
+import java.io.Closeable;
+
+public interface IGTScanner extends Iterable<GTRecord>, Closeable {
+
+ GTInfo getInfo();
+
+ int getScannedRowCount();
+
+ int getScannedRowBlockCount();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
new file mode 100644
index 0000000..70620b3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
@@ -0,0 +1,26 @@
+package org.apache.kylin.gridtable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+
+public interface IGTStore {
+
+ GTInfo getInfo();
+
+ IGTStoreWriter rebuild(int shard) throws IOException;
+
+ IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException;
+
+ IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException;
+
+ interface IGTStoreWriter extends Closeable {
+ void write(GTRowBlock block) throws IOException;
+ }
+
+ interface IGTStoreScanner extends Iterator<GTRowBlock>, Closeable {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
new file mode 100644
index 0000000..c6af8e5
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+public class UnitTestSupport {
+
+ public static GTInfo basicInfo() {
+ Builder builder = infoBuilder();
+ GTInfo info = builder.build();
+ return info;
+ }
+
+ public static GTInfo advancedInfo() {
+ Builder builder = infoBuilder();
+ builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
+ builder.enableRowBlock(4);
+ GTInfo info = builder.build();
+ return info;
+ }
+
+ private static Builder infoBuilder() {
+ Builder builder = GTInfo.builder();
+ builder.setCodeSystem(new GTSampleCodeSystem());
+ builder.setColumns( //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("bigint"), //
+ DataType.getInstance("decimal") //
+ );
+ builder.setPrimaryKey(setOf(0));
+ builder.setColumnPreferIndex(setOf(0));
+ return builder;
+ }
+
+ public static List<GTRecord> mockupData(GTInfo info, int nRows) {
+ List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+ int round = nRows / 10;
+ for (int i = 0; i < round; i++) {
+ String d_01_14 = datePlus("2015-01-14", i * 4);
+ String d_01_15 = datePlus("2015-01-15", i * 4);
+ String d_01_16 = datePlus("2015-01-16", i * 4);
+ String d_01_17 = datePlus("2015-01-17", i * 4);
+ result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5")));
+ }
+ return result;
+ }
+
+ private static String datePlus(String date, int plusDays) {
+ long millis = DateFormat.stringToMillis(date);
+ millis += (1000L * 3600L * 24L) * plusDays;
+ return DateFormat.formatToDateStr(millis);
+ }
+
+ private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) {
+ GTRecord rec = new GTRecord(info);
+ return rec.setValues(date, name, category, amount, price);
+ }
+
+ private static ImmutableBitSet setOf(int... values) {
+ BitSet set = new BitSet();
+ for (int i : values)
+ set.set(i);
+ return new ImmutableBitSet(set);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
new file mode 100644
index 0000000..0011d83
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
@@ -0,0 +1,112 @@
+package org.apache.kylin.gridtable.memstore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTStore;
+
+public class GTSimpleMemStore implements IGTStore {
+
+ final GTInfo info;
+ final List<GTRowBlock> rowBlockList;
+
+ public GTSimpleMemStore(GTInfo info) {
+ this.info = info;
+ this.rowBlockList = new ArrayList<GTRowBlock>();
+
+ if (info.isShardingEnabled())
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ public long memoryUsage() {
+ if (rowBlockList.size() == 0) {
+ return 0;
+ } else {
+ return rowBlockList.get(0).exportLength() * Long.valueOf(rowBlockList.size());
+ }
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) {
+ rowBlockList.clear();
+ return new Writer(rowBlockList);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) {
+ if (rowBlockList.size() > 0) {
+ GTRowBlock last = rowBlockList.get(rowBlockList.size() - 1);
+ fillLast.copyFrom(last);
+ }
+ return new Writer(rowBlockList);
+ }
+
+ private static class Writer implements IGTStoreWriter {
+
+ private final List<GTRowBlock> rowBlockList;
+
+ Writer(List<GTRowBlock> rowBlockList) {
+ this.rowBlockList = rowBlockList;
+ }
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ GTRowBlock copy = block.copy();
+ int id = block.getSequenceId();
+ if (id < rowBlockList.size()) {
+ rowBlockList.set(id, copy);
+ } else {
+ assert id == rowBlockList.size();
+ rowBlockList.add(copy);
+ }
+ }
+ }
+
+ @Override
+ public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) {
+
+ return new IGTStoreScanner() {
+ Iterator<GTRowBlock> it = rowBlockList.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public GTRowBlock next() {
+ return it.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ };
+ }
+
+ public void drop() throws IOException {
+ //will there be any concurrent issue? If yes, ArrayList should be replaced
+ rowBlockList.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
new file mode 100644
index 0000000..0b4a3c6
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.cube.inmemcubing.ConcurrentDiskStore;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.junit.Test;
+
+public class ConcurrentDiskStoreTest {
+
+ final GTInfo info = UnitTestSupport.advancedInfo();
+ final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
+
+ @Test
+ public void testSingleThreadRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead(1);
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ @Test
+ public void testMultiThreadRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead(20);
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ private void verifyOneTableWriteAndRead(int readThreads) throws IOException, InterruptedException {
+ ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+ GridTable table = new GridTable(info, store);
+ verifyWriteAndRead(table, readThreads);
+ }
+
+ private void verifyWriteAndRead(final GridTable table, int readThreads) throws IOException, InterruptedException {
+ GTBuilder builder = table.rebuild();
+ for (GTRecord r : data) {
+ builder.write(r);
+ }
+ builder.close();
+
+ int nThreads = readThreads;
+ Thread[] t = new Thread[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ t[i] = new Thread() {
+ public void run() {
+ try {
+ IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo()));
+ int i = 0;
+ for (GTRecord r : scanner) {
+ assertEquals(data.get(i++), r);
+ }
+ scanner.close();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+ t[i].start();
+ }
+ for (int i = 0; i < nThreads; i++) {
+ t[i].join();
+ }
+
+ ((ConcurrentDiskStore) table.getStore()).close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
new file mode 100644
index 0000000..de949ba
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderStressTest.class);
+
+ // CI sandbox memory is no more than 512MB, this many input should hit memory threshold
+ private static final int INPUT_ROWS = 200000;
+ private static final int THREADS = 4;
+
+ private static CubeInstance cube;
+ private static String flatTable;
+ private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+ @BeforeClass
+ public static void before() throws IOException {
+ staticCreateTestMetadata();
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+ flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+ dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ long randSeed = System.currentTimeMillis();
+
+ DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ doggedBuilder.setConcurrentThreads(THREADS);
+
+ {
+ Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
+ InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ }
+ }
+
+ class NoopWriter implements ICuboidWriter {
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ }
+ }
+}
\ No newline at end of file