You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:23 UTC
[21/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job
module into 'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
deleted file mode 100644
index 0313026..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreScanner;
-
-public class GTRawScanner implements IGTScanner {
-
- final GTInfo info;
- final IGTStoreScanner storeScanner;
- final ImmutableBitSet selectedColBlocks;
-
- private GTRowBlock.Reader curBlockReader;
- private GTRecord next;
- final private GTRecord oneRecord; // avoid instance creation
-
- private int scannedRowCount = 0;
- private int scannedRowBlockCount = 0;
-
- public GTRawScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
- this.info = info;
- this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
- this.storeScanner = store.scan(req.getPkStart(), req.getPkEnd(), selectedColBlocks, req);
- this.oneRecord = new GTRecord(info);
- }
-
- @Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public int getScannedRowCount() {
- return scannedRowCount;
- }
-
- @Override
- public int getScannedRowBlockCount() {
- return scannedRowBlockCount;
- }
-
- @Override
- public void close() throws IOException {
- storeScanner.close();
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
-
- if (fetchOneRecord()) {
- next = oneRecord;
- return true;
- } else {
- return false;
- }
- }
-
- private boolean fetchOneRecord() {
- while (true) {
- // get a block
- if (curBlockReader == null) {
- if (storeScanner.hasNext()) {
- curBlockReader = storeScanner.next().getReader(selectedColBlocks);
- scannedRowBlockCount++;
- } else {
- return false;
- }
- }
- // if block exhausted, try next block
- if (curBlockReader.hasNext() == false) {
- curBlockReader = null;
- continue;
- }
- // fetch a row
- curBlockReader.fetchNext(oneRecord);
- scannedRowCount++;
- return true;
- }
- }
-
- @Override
- public GTRecord next() {
- // fetch next record
- if (next == null) {
- hasNext();
- if (next == null)
- throw new NoSuchElementException();
- }
-
- GTRecord result = next;
- next = null;
- return result;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
deleted file mode 100644
index fbb6171..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
+++ /dev/null
@@ -1,285 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.BitSet;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-
-public class GTRecord implements Comparable<GTRecord> {
-
- final GTInfo info;
- final ByteArray[] cols;
-
- private ImmutableBitSet maskForEqualHashComp;
-
- public GTRecord(GTInfo info, ImmutableBitSet maskForEqualHashComp) {
- this.info = info;
- this.cols = new ByteArray[info.getColumnCount()];
- for (int i = 0; i < cols.length; i++) {
- if (maskForEqualHashComp.get(i)) {
- this.cols[i] = new ByteArray();
- }
- }
- this.maskForEqualHashComp = maskForEqualHashComp;
- }
-
- public GTRecord(GTInfo info) {
- this(info, info.colAll);
- }
-
- public GTInfo getInfo() {
- return info;
- }
-
- public ByteArray get(int i) {
- return cols[i];
- }
-
- public void set(int i, ByteArray data) {
- cols[i].set(data.array(), data.offset(), data.length());
- }
-
- /** set record to the codes of specified values, new space allocated to hold the codes */
- public GTRecord setValues(Object... values) {
- setValues(info.colAll, new ByteArray(info.getMaxRecordLength()), values);
- return this;
- }
-
- /** set record to the codes of specified values, reuse given space to hold the codes */
- public GTRecord setValues(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
- assert selectedCols.cardinality() == values.length;
-
- ByteBuffer buf = space.asBuffer();
- int pos = buf.position();
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- info.codeSystem.encodeColumnValue(c, values[i], buf);
- int newPos = buf.position();
- cols[c].set(buf.array(), buf.arrayOffset() + pos, newPos - pos);
- pos = newPos;
- }
- return this;
- }
-
- /** decode and return the values of this record */
- public Object[] getValues() {
- return getValues(info.colAll, new Object[info.getColumnCount()]);
- }
-
- /** decode and return the values of this record */
- public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) {
- assert selectedCols.cardinality() == result.length;
-
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- if (cols[c] == null || cols[c].array() == null) {
- result[i] = null;
- } else {
- result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
- }
- }
- return result;
- }
-
- public Object[] getValues(int[] selectedColumns, Object[] result) {
- assert selectedColumns.length <= result.length;
- for (int i = 0; i < selectedColumns.length; i++) {
- int c = selectedColumns[i];
- if (cols[c].array() == null) {
- result[i] = null;
- } else {
- result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
- }
- }
- return result;
- }
-
- public GTRecord copy() {
- return copy(info.colAll);
- }
-
- public GTRecord copy(ImmutableBitSet selectedCols) {
- int len = 0;
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- len += cols[c].length();
- }
-
- byte[] space = new byte[len];
-
- GTRecord copy = new GTRecord(info, this.maskForEqualHashComp);
- int pos = 0;
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- System.arraycopy(cols[c].array(), cols[c].offset(), space, pos, cols[c].length());
- copy.cols[c].set(space, pos, cols[c].length());
- pos += cols[c].length();
- }
-
- return copy;
- }
-
- public ImmutableBitSet maskForEqualHashComp() {
- return maskForEqualHashComp;
- }
-
- public void maskForEqualHashComp(ImmutableBitSet set) {
- this.maskForEqualHashComp = set;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
-
- GTRecord o = (GTRecord) obj;
- if (this.info != o.info)
- return false;
- if (this.maskForEqualHashComp != o.maskForEqualHashComp)
- return false;
- for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
- int c = maskForEqualHashComp.trueBitAt(i);
- if (this.cols[c].equals(o.cols[c]) == false) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- int hash = 1;
- for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
- int c = maskForEqualHashComp.trueBitAt(i);
- hash = (31 * hash) + cols[c].hashCode();
- }
- return hash;
- }
-
- @Override
- public int compareTo(GTRecord o) {
- assert this.info == o.info;
- assert this.maskForEqualHashComp == o.maskForEqualHashComp; // reference equal for performance
- IGTComparator comparator = info.codeSystem.getComparator();
-
- int comp = 0;
- for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
- int c = maskForEqualHashComp.trueBitAt(i);
- comp = comparator.compare(cols[c], o.cols[c]);
- if (comp != 0)
- return comp;
- }
- return comp;
- }
-
- @Override
- public String toString() {
- return toString(maskForEqualHashComp);
- }
-
- public String toString(ImmutableBitSet selectedColumns) {
- Object[] values = new Object[selectedColumns.cardinality()];
- getValues(selectedColumns, values);
- return Arrays.toString(values);
- }
-
- // ============================================================================
-
- public ByteArray exportColumns(ImmutableBitSet selectedCols) {
- int len = 0;
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- len += cols[c].length();
- }
-
- ByteArray buf = ByteArray.allocate(len);
- exportColumns(info.primaryKey, buf);
- return buf;
- }
-
- /** write data to given buffer, like serialize */
- public void exportColumns(ImmutableBitSet selectedCols, ByteArray buf) {
- int pos = 0;
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- System.arraycopy(cols[c].array(), cols[c].offset(), buf.array(), buf.offset() + pos, cols[c].length());
- pos += cols[c].length();
- }
- buf.setLength(pos);
- }
-
- /** write data to given buffer, like serialize */
- public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- buf.put(cols[c].array(), cols[c].offset(), cols[c].length());
- }
- }
-
- public void exportColumns(int[] fieldIndex, ByteBuffer buf) {
- for (int i : fieldIndex) {
- buf.put(cols[i].array(), cols[i].offset(), cols[i].length());
- }
- }
-
-
- /** write data to given buffer, like serialize */
- public void exportColumnBlock(int c, ByteBuffer buf) {
- exportColumns(info.colBlocks[c], buf);
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void loadPrimaryKey(ByteBuffer buf) {
- loadColumns(info.primaryKey, buf);
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void loadCellBlock(int c, ByteBuffer buf) {
- loadColumns(info.colBlocks[c], buf);
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
- int pos = buf.position();
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- int len = info.codeSystem.codeLength(c, buf);
- cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
- pos += len;
- buf.position(pos);
- }
- }
-
- /** similar to export(primaryKey) but will stop at the first null value */
- public static ByteArray exportScanKey(GTRecord rec) {
- if (rec == null)
- return null;
-
- GTInfo info = rec.getInfo();
-
- BitSet selectedColumns = new BitSet();
- int len = 0;
- for (int i = 0; i < info.primaryKey.trueBitCount(); i++) {
- int c = info.primaryKey.trueBitAt(i);
- if (rec.cols[c].array() == null) {
- break;
- }
- selectedColumns.set(c);
- len += rec.cols[c].length();
- }
-
- if (selectedColumns.cardinality() == 0)
- return null;
-
- ByteArray buf = ByteArray.allocate(len);
- rec.exportColumns(new ImmutableBitSet(selectedColumns), buf);
- return buf;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
deleted file mode 100644
index 6878ef1..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ /dev/null
@@ -1,259 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-
-public class GTRowBlock {
-
- /** create a row block, allocate memory, get ready for writing */
- public static GTRowBlock allocate(GTInfo info) {
- GTRowBlock b = new GTRowBlock(info);
-
- byte[] array = new byte[info.getMaxColumnLength(info.primaryKey)];
- b.primaryKey.set(array);
-
- int maxRows = info.isRowBlockEnabled() ? info.rowBlockSize : 1;
- for (int i = 0; i < b.cellBlocks.length; i++) {
- array = new byte[info.getMaxColumnLength(info.colBlocks[i]) * maxRows];
- b.cellBlocks[i].set(array);
- }
- return b;
- }
-
- final GTInfo info;
-
- int seqId; // 0, 1, 2...
- int nRows;
- ByteArray primaryKey; // the primary key of the first (smallest) row
- ByteArray[] cellBlocks; // cells for each column block
-
- /** create a row block that has no underlying space */
- public GTRowBlock(GTInfo info) {
- this.info = info;
- this.primaryKey = new ByteArray();
- this.cellBlocks = new ByteArray[info.colBlocks.length];
- for (int i = 0; i < this.cellBlocks.length; i++) {
- this.cellBlocks[i] = new ByteArray();
- }
- }
-
- public int getSequenceId() {
- return seqId;
- }
-
- public ByteArray getPrimaryKey() {
- return primaryKey;
- }
-
- public ByteArray getCellBlock(int i) {
- return cellBlocks[i];
- }
-
- public Writer getWriter() {
- return new Writer();
- }
-
- public class Writer {
- ByteBuffer[] cellBlockBuffers;
-
- Writer() {
- cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
- for (int i = 0; i < cellBlockBuffers.length; i++) {
- cellBlockBuffers[i] = cellBlocks[i].asBuffer();
- }
- }
-
- public void copyFrom(GTRowBlock other) {
- assert info == other.info;
-
- seqId = other.seqId;
- nRows = other.nRows;
- primaryKey.copyFrom(other.primaryKey);
- for (int i = 0; i < info.colBlocks.length; i++) {
- cellBlockBuffers[i].clear();
- cellBlockBuffers[i].put(other.cellBlocks[i].array(), other.cellBlocks[i].offset(), other.cellBlocks[i].length());
- }
- }
-
- public void append(GTRecord r) {
- // add record to block
- if (isEmpty()) {
- r.exportColumns(info.primaryKey, primaryKey);
- }
- for (int i = 0; i < info.colBlocks.length; i++) {
- r.exportColumnBlock(i, cellBlockBuffers[i]);
- }
- nRows++;
- }
-
- public void readyForFlush() {
- for (int i = 0; i < cellBlocks.length; i++) {
- cellBlocks[i].setLength(cellBlockBuffers[i].position());
- }
- }
-
- public void clearForNext() {
- seqId++;
- nRows = 0;
- for (int i = 0; i < cellBlockBuffers.length; i++) {
- cellBlockBuffers[i].clear();
- }
- }
- }
-
- public Reader getReader() {
- return new Reader(info.colBlocksAll);
- }
-
- public Reader getReader(ImmutableBitSet selectedColBlocks) {
- return new Reader(selectedColBlocks);
- }
-
- public class Reader {
- int cur;
- ByteBuffer primaryKeyBuffer;
- ByteBuffer[] cellBlockBuffers;
- ImmutableBitSet selectedColBlocks;
-
- Reader(ImmutableBitSet selectedColBlocks) {
- primaryKeyBuffer = primaryKey.asBuffer();
- cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
- for (int i = 0; i < cellBlockBuffers.length; i++) {
- cellBlockBuffers[i] = cellBlocks[i].asBuffer();
- }
- this.selectedColBlocks = selectedColBlocks;
- }
-
- public boolean hasNext() {
- return cur < nRows;
- }
-
- public void fetchNext(GTRecord result) {
- if (hasNext() == false)
- throw new IllegalArgumentException();
-
- for (int i = 0; i < selectedColBlocks.trueBitCount(); i++) {
- int c = selectedColBlocks.trueBitAt(i);
- result.loadCellBlock(c, cellBlockBuffers[c]);
- }
- cur++;
- }
- }
-
- public GTRowBlock copy() {
- GTRowBlock copy = new GTRowBlock(info);
-
- ByteBuffer buf = ByteBuffer.allocate(this.exportLength());
- this.export(buf);
- buf.clear();
- copy.load(buf);
-
- return copy;
- }
-
- public boolean isEmpty() {
- return nRows == 0;
- }
-
- public boolean isFull() {
- if (info.isRowBlockEnabled())
- return nRows >= info.rowBlockSize;
- else
- return nRows > 0;
- }
-
- public int getNumberOfRows() {
- return nRows;
- }
-
- public void setNumberOfRows(int nRows) {
- this.nRows = nRows;
- }
-
- // ============================================================================
-
- public int exportLength() {
- int len = 4; // seq Id
- if (info.isRowBlockEnabled())
- len += 4; // nRows
- len += 4 + primaryKey.length(); // PK byte array
- for (ByteArray array : cellBlocks) {
- len += 4 + array.length(); // cell block byte array
- }
- return len;
- }
-
- /** write data to given output stream, like serialize */
- public void export(DataOutputStream out) throws IOException {
- out.writeInt(seqId);
- if (info.isRowBlockEnabled())
- out.writeInt(nRows);
- export(out, primaryKey);
- for (ByteArray cb : cellBlocks) {
- export(out, cb);
- }
- }
-
- private void export(DataOutputStream out, ByteArray array) throws IOException {
- out.writeInt(array.length());
- out.write(array.array(), array.offset(), array.length());
- }
-
- /** write data to given buffer, like serialize */
- public void export(ByteBuffer buf) {
- buf.putInt(seqId);
- if (info.isRowBlockEnabled())
- buf.putInt(nRows);
- export(primaryKey, buf);
- for (ByteArray cb : cellBlocks) {
- export(cb, buf);
- }
- }
-
- private void export(ByteArray array, ByteBuffer buf) {
- buf.putInt(array.length());
- buf.put(array.array(), array.offset(), array.length());
- }
-
- /** read data from given input stream, like deserialize */
- public void importFrom(DataInputStream in) throws IOException {
- seqId = in.readInt();
- nRows = info.isRowBlockEnabled() ? in.readInt() : 1;
- importFrom(in, primaryKey);
- for (int i = 0; i < info.colBlocks.length; i++) {
- ByteArray cb = cellBlocks[i];
- importFrom(in, cb);
- }
- }
-
- private void importFrom(DataInputStream in, ByteArray result) throws IOException {
- byte[] data = result.array();
- int len = in.readInt();
- in.read(data, 0, len);
- result.set(data, 0, len);
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void load(ByteBuffer buf) {
- seqId = buf.getInt();
- nRows = info.isRowBlockEnabled() ? buf.getInt() : 1;
- load(primaryKey, buf);
- for (int i = 0; i < info.colBlocks.length; i++) {
- ByteArray cb = cellBlocks[i];
- load(cb, buf);
- }
- }
-
- private void load(ByteArray array, ByteBuffer buf) {
- int len = buf.getInt();
- int pos = buf.position();
- array.set(buf.array(), buf.arrayOffset() + pos, len);
- buf.position(pos + len);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
deleted file mode 100644
index d3fa42d..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
-
-@SuppressWarnings({ "rawtypes", "unchecked" })
-/**
- * This is just for example and is INCORRECT when numbers are encoded to bytes and compared in filter.
- *
- * A correct implementation must ensure dimension values preserve order after encoded, e.g. by using an
- * order preserving dictionary.
- *
- * @author yangli9
- */
-public class GTSampleCodeSystem implements IGTCodeSystem {
-
- private GTInfo info;
- private DataTypeSerializer[] serializers;
- private IGTComparator comparator;
-
- public GTSampleCodeSystem() {
- }
-
- @Override
- public void init(GTInfo info) {
- this.info = info;
-
- this.serializers = new DataTypeSerializer[info.getColumnCount()];
- for (int i = 0; i < info.getColumnCount(); i++) {
- this.serializers[i] = DataTypeSerializer.create(info.colTypes[i]);
- }
-
- this.comparator = new IGTComparator() {
- @Override
- public boolean isNull(ByteArray code) {
- // all 0xff is null
- byte[] array = code.array();
- for (int i = 0, j = code.offset(), n = code.length(); i < n; i++, j++) {
- if (array[j] != (byte) 0xff)
- return false;
- }
- return true;
- }
-
- @Override
- public int compare(ByteArray code1, ByteArray code2) {
- return code1.compareTo(code2);
- }
- };
- }
-
- @Override
- public int codeLength(int col, ByteBuffer buf) {
- return serializers[col].peekLength(buf);
- }
-
- @Override
- public int maxCodeLength(int col) {
- return serializers[col].maxLength();
- }
-
- @Override
- public IGTComparator getComparator() {
- return comparator;
- }
-
- // ============================================================================
-
- @Override
- public MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions) {
- assert columns.trueBitCount() == aggrFunctions.length;
-
- MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length];
- for (int i = 0; i < result.length; i++) {
- int col = columns.trueBitAt(i);
- result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString());
- }
- return result;
- }
-
- @Override
- public void encodeColumnValue(int col, Object value, ByteBuffer buf) {
- serializers[col].serialize(value, buf);
- }
-
- @Override
- public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
- // ignore rounding
- encodeColumnValue(col, value, buf);
- }
-
- @Override
- public Object decodeColumnValue(int col, ByteBuffer buf) {
- return serializers[col].deserialize(buf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRange.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRange.java
deleted file mode 100644
index 916fb11..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRange.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.util.Collections;
-import java.util.List;
-
-public class GTScanRange {
-
- final public GTRecord pkStart; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
- final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
- final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys
-
- public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
- this(pkStart, pkEnd, null);
- }
-
- public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) {
- GTInfo info = pkStart.info;
- assert info == pkEnd.info;
-
- validateRangeKey(pkStart);
- validateRangeKey(pkEnd);
-
- this.pkStart = pkStart;
- this.pkEnd = pkEnd;
- this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord> emptyList() : hbaseFuzzyKeys;
- }
-
- private void validateRangeKey(GTRecord pk) {
- pk.maskForEqualHashComp(pk.info.primaryKey);
- boolean afterNull = false;
- for (int i = 0; i < pk.info.primaryKey.trueBitCount(); i++) {
- int c = pk.info.primaryKey.trueBitAt(i);
- if (afterNull) {
- pk.cols[c].set(null, 0, 0);
- } else {
- afterNull = pk.cols[c].array() == null;
- }
- }
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode());
- result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode());
- result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- GTScanRange other = (GTScanRange) obj;
- if (hbaseFuzzyKeys == null) {
- if (other.hbaseFuzzyKeys != null)
- return false;
- } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys))
- return false;
- if (pkEnd == null) {
- if (other.pkEnd != null)
- return false;
- } else if (!pkEnd.equals(other.pkEnd))
- return false;
- if (pkStart == null) {
- if (other.pkStart != null)
- return false;
- } else if (!pkStart.equals(other.pkStart))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return (pkStart == null ? "null" : pkStart.toString(pkStart.info.primaryKey)) //
- + "-" + (pkEnd == null ? "null" : pkEnd.toString(pkEnd.info.primaryKey));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
deleted file mode 100644
index 234d0c3..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
+++ /dev/null
@@ -1,486 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class GTScanRangePlanner {
-
- private static final int MAX_HBASE_FUZZY_KEYS = 100;
-
- final private GTInfo info;
- final private ComparatorEx<ByteArray> byteUnknownIsSmaller;
- final private ComparatorEx<ByteArray> byteUnknownIsBigger;
- final private ComparatorEx<GTRecord> recordUnknownIsSmaller;
- final private ComparatorEx<GTRecord> recordUnknownIsBigger;
-
- public GTScanRangePlanner(GTInfo info) {
- this.info = info;
-
- IGTComparator comp = info.codeSystem.getComparator();
- this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(comp);
- this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(comp);
- this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(comp);
- this.recordUnknownIsBigger = recordComparatorTreatsUnknownBigger(comp);
- }
-
- // return empty list meaning filter is always false
- public List<GTScanRange> planScanRanges(TupleFilter filter) {
- return planScanRanges(filter, Integer.MAX_VALUE);
- }
-
- // return empty list meaning filter is always false
- public List<GTScanRange> planScanRanges(TupleFilter filter, int maxRanges) {
-
- TupleFilter flatFilter = flattenToOrAndFilter(filter);
-
- List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
-
- List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
- for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
- GTScanRange scanRange = newScanRange(andDimRanges);
- scanRanges.add(scanRange);
- }
-
- List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
- mergedRanges = mergeTooManyRanges(mergedRanges, maxRanges);
-
- return mergedRanges;
- }
-
- private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
- GTRecord pkStart = new GTRecord(info);
- GTRecord pkEnd = new GTRecord(info);
- List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList();
-
- for (ColumnRange range : andDimRanges) {
- int col = range.column.getColumnDesc().getZeroBasedIndex();
- if (info.primaryKey.get(col) == false)
- continue;
-
- pkStart.set(col, range.begin);
- pkEnd.set(col, range.end);
-
- if (range.equals != null) {
- ImmutableBitSet fuzzyMask = new ImmutableBitSet(col);
- for (ByteArray v : range.equals) {
- GTRecord fuzzy = new GTRecord(info);
- fuzzy.set(col, v);
- fuzzy.maskForEqualHashComp(fuzzyMask);
- hbaseFuzzyKeys.add(fuzzy);
- }
- }
- }
-
- return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys);
- }
-
- private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
- if (filter == null)
- return null;
-
- TupleFilter flatFilter = filter.flatFilter();
-
- // normalize to OR-AND filter
- if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
- LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
- f.addChild(flatFilter);
- flatFilter = f;
- }
-
- if (flatFilter.getOperator() != FilterOperatorEnum.OR)
- throw new IllegalStateException();
-
- return flatFilter;
- }
-
- private List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) {
- List<Collection<ColumnRange>> result = Lists.newArrayList();
-
- if (flatFilter == null) {
- result.add(Collections.<ColumnRange> emptyList());
- return result;
- }
-
- for (TupleFilter andFilter : flatFilter.getChildren()) {
- if (andFilter.getOperator() != FilterOperatorEnum.AND)
- throw new IllegalStateException("Filter should be AND instead of " + andFilter);
-
- Collection<ColumnRange> andRanges = translateToAndDimRanges(andFilter.getChildren());
- result.add(andRanges);
- }
-
- return preEvaluateConstantConditions(result);
- }
-
- private Collection<ColumnRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters) {
- Map<TblColRef, ColumnRange> rangeMap = new HashMap<TblColRef, ColumnRange>();
- for (TupleFilter filter : andFilters) {
- if ((filter instanceof CompareTupleFilter) == false) {
- continue;
- }
-
- CompareTupleFilter comp = (CompareTupleFilter) filter;
- if (comp.getColumn() == null) {
- continue;
- }
-
- @SuppressWarnings("unchecked")
- ColumnRange newRange = new ColumnRange(comp.getColumn(), (Set<ByteArray>) comp.getValues(), comp.getOperator());
- ColumnRange existing = rangeMap.get(newRange.column);
- if (existing == null) {
- rangeMap.put(newRange.column, newRange);
- } else {
- existing.andMerge(newRange);
- }
- }
- return rangeMap.values();
- }
-
- private List<Collection<ColumnRange>> preEvaluateConstantConditions(List<Collection<ColumnRange>> orAndRanges) {
- boolean globalAlwaysTrue = false;
- Iterator<Collection<ColumnRange>> iterator = orAndRanges.iterator();
- while (iterator.hasNext()) {
- Collection<ColumnRange> andRanges = iterator.next();
- Iterator<ColumnRange> iterator2 = andRanges.iterator();
- boolean hasAlwaysFalse = false;
- while (iterator2.hasNext()) {
- ColumnRange range = iterator2.next();
- if (range.satisfyAll())
- iterator2.remove();
- else if (range.satisfyNone())
- hasAlwaysFalse = true;
- }
- if (hasAlwaysFalse) {
- iterator.remove();
- } else if (andRanges.isEmpty()) {
- globalAlwaysTrue = true;
- break;
- }
- }
- // return empty OR list means global false
- // return an empty AND collection inside OR list means global true
- if (globalAlwaysTrue) {
- orAndRanges.clear();
- orAndRanges.add(Collections.<ColumnRange> emptyList());
- }
- return orAndRanges;
- }
-
- private List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
- if (ranges.size() <= 1) {
- return ranges;
- }
-
- // sort ranges by start key
- Collections.sort(ranges, new Comparator<GTScanRange>() {
- @Override
- public int compare(GTScanRange a, GTScanRange b) {
- return recordUnknownIsSmaller.compare(a.pkStart, b.pkStart);
- }
- });
-
- // merge the overlap range
- List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
- int mergeBeginIndex = 0;
- GTRecord mergeEnd = ranges.get(0).pkEnd;
- for (int index = 0; index < ranges.size(); index++) {
- GTScanRange range = ranges.get(index);
-
- // if overlap, swallow it
- if (recordUnknownIsSmaller.min(range.pkStart, mergeEnd) == range.pkStart //
- || recordUnknownIsBigger.max(mergeEnd, range.pkStart) == mergeEnd) {
- mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
- continue;
- }
-
- // not overlap, split here
- GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index));
- mergedRanges.add(mergedRange);
-
- // start new split
- mergeBeginIndex = index;
- mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
- }
-
- // don't miss the last range
- GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size()));
- mergedRanges.add(mergedRange);
-
- return mergedRanges;
- }
-
- private GTScanRange mergeKeyRange(List<GTScanRange> ranges) {
- GTScanRange first = ranges.get(0);
- if (ranges.size() == 1)
- return first;
-
- GTRecord start = first.pkStart;
- GTRecord end = first.pkEnd;
- List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>();
-
- boolean hasNonFuzzyRange = false;
- for (GTScanRange range : ranges) {
- hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty();
- newFuzzyKeys.addAll(range.hbaseFuzzyKeys);
- end = recordUnknownIsBigger.max(end, range.pkEnd);
- }
-
- // if any range is non-fuzzy, then all fuzzy keys must be cleared
- // also too many fuzzy keys will slow down HBase scan
- if (hasNonFuzzyRange || newFuzzyKeys.size() > MAX_HBASE_FUZZY_KEYS) {
- newFuzzyKeys.clear();
- }
-
- return new GTScanRange(start, end, newFuzzyKeys);
- }
-
- private List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
- if (ranges.size() <= maxRanges) {
- return ranges;
- }
-
- // TODO: check the distance between range and merge the large distance range
- List<GTScanRange> result = new ArrayList<GTScanRange>(1);
- GTScanRange mergedRange = mergeKeyRange(ranges);
- result.add(mergedRange);
- return result;
- }
-
- private class ColumnRange {
- private TblColRef column;
- private ByteArray begin = ByteArray.EMPTY;
- private ByteArray end = ByteArray.EMPTY;
- private Set<ByteArray> equals;
-
- public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
- this.column = column;
-
- switch (op) {
- case EQ:
- case IN:
- equals = new HashSet<ByteArray>(values);
- refreshBeginEndFromEquals();
- break;
- case LT:
- case LTE:
- end = byteUnknownIsBigger.max(values);
- break;
- case GT:
- case GTE:
- begin = byteUnknownIsSmaller.min(values);
- break;
- case NEQ:
- case NOTIN:
- case ISNULL:
- case ISNOTNULL:
- // let Optiq filter it!
- break;
- default:
- throw new UnsupportedOperationException(op.name());
- }
- }
-
- void copy(TblColRef column, ByteArray beginValue, ByteArray endValue, Set<ByteArray> equalValues) {
- this.column = column;
- this.begin = beginValue;
- this.end = endValue;
- this.equals = equalValues;
- }
-
- private void refreshBeginEndFromEquals() {
- if (equals.isEmpty()) {
- begin = ByteArray.EMPTY;
- end = ByteArray.EMPTY;
- } else {
- begin = byteUnknownIsSmaller.min(equals);
- end = byteUnknownIsBigger.max(equals);
- }
- }
-
- public boolean satisfyAll() {
- return begin.array() == null && end.array() == null; // the NEQ case
- }
-
- public boolean satisfyNone() {
- if (equals != null) {
- return equals.isEmpty();
- } else if (begin.array() != null && end.array() != null) {
- return info.codeSystem.getComparator().compare(begin, end) > 0;
- } else {
- return false;
- }
- }
-
- public void andMerge(ColumnRange another) {
- assert this.column.equals(another.column);
-
- if (another.satisfyAll()) {
- return;
- }
-
- if (this.satisfyAll()) {
- copy(another.column, another.begin, another.end, another.equals);
- return;
- }
-
- if (this.equals != null && another.equals != null) {
- this.equals.retainAll(another.equals);
- refreshBeginEndFromEquals();
- return;
- }
-
- if (this.equals != null) {
- this.equals = filter(this.equals, another.begin, another.end);
- refreshBeginEndFromEquals();
- return;
- }
-
- if (another.equals != null) {
- this.equals = filter(another.equals, this.begin, this.end);
- refreshBeginEndFromEquals();
- return;
- }
-
- this.begin = byteUnknownIsSmaller.max(this.begin, another.begin);
- this.end = byteUnknownIsBigger.min(this.end, another.end);
- }
-
- private Set<ByteArray> filter(Set<ByteArray> equalValues, ByteArray beginValue, ByteArray endValue) {
- Set<ByteArray> result = Sets.newHashSetWithExpectedSize(equalValues.size());
- for (ByteArray v : equalValues) {
- if (byteUnknownIsSmaller.compare(beginValue, v) <= 0 && byteUnknownIsBigger.compare(v, endValue) <= 0) {
- result.add(v);
- }
- }
- return equalValues;
- }
-
- public String toString() {
- if (equals == null) {
- return column.getName() + " between " + begin + " and " + end;
- } else {
- return column.getName() + " in " + equals;
- }
- }
- }
-
- public static abstract class ComparatorEx<T> implements Comparator<T> {
-
- public T min(Collection<T> v) {
- if (v.size() <= 0) {
- return null;
- }
-
- Iterator<T> iterator = v.iterator();
- T min = iterator.next();
- while (iterator.hasNext()) {
- min = min(min, iterator.next());
- }
- return min;
- }
-
- public T max(Collection<T> v) {
- if (v.size() <= 0) {
- return null;
- }
-
- Iterator<T> iterator = v.iterator();
- T max = iterator.next();
- while (iterator.hasNext()) {
- max = max(max, iterator.next());
- }
- return max;
- }
-
- public T min(T a, T b) {
- return compare(a, b) <= 0 ? a : b;
- }
-
- public T max(T a, T b) {
- return compare(a, b) >= 0 ? a : b;
- }
-
- public boolean between(T v, T start, T end) {
- return compare(start, v) <= 0 && compare(v, end) <= 0;
- }
- }
-
- public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownSmaller(final IGTComparator comp) {
- return new ComparatorEx<ByteArray>() {
- @Override
- public int compare(ByteArray a, ByteArray b) {
- if (a.array() == null)
- return -1;
- else if (b.array() == null)
- return 1;
- else
- return comp.compare(a, b);
- }
- };
- }
-
- public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownBigger(final IGTComparator comp) {
- return new ComparatorEx<ByteArray>() {
- @Override
- public int compare(ByteArray a, ByteArray b) {
- if (a.array() == null)
- return 1;
- else if (b.array() == null)
- return -1;
- else
- return comp.compare(a, b);
- }
- };
- }
-
- public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownSmaller(IGTComparator comp) {
- return new RecordComparator(byteComparatorTreatsUnknownSmaller(comp));
- }
-
- public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownBigger(IGTComparator comp) {
- return new RecordComparator(byteComparatorTreatsUnknownBigger(comp));
- }
-
- private static class RecordComparator extends ComparatorEx<GTRecord> {
- final ComparatorEx<ByteArray> comparator;
-
- RecordComparator(ComparatorEx<ByteArray> byteComparator) {
- this.comparator = byteComparator;
- }
-
- @Override
- public int compare(GTRecord a, GTRecord b) {
- assert a.info == b.info;
- assert a.maskForEqualHashComp() == b.maskForEqualHashComp();
- ImmutableBitSet mask = a.maskForEqualHashComp();
-
- int comp = 0;
- for (int i = 0; i < mask.trueBitCount(); i++) {
- int c = mask.trueBitAt(i);
- comp = comparator.compare(a.cols[c], b.cols[c]);
- if (comp != 0)
- return comp;
- }
- return 0; // equals
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java
deleted file mode 100644
index 1fd3c8e..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.util.Arrays;
-import java.util.Set;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Sets;
-
-public class GTScanRequest {
-
- // basic
- private GTInfo info;
- private GTScanRange range;
- private ImmutableBitSet columns;
-
- // optional filtering
- private TupleFilter filterPushDown;
-
- // optional aggregation
- private ImmutableBitSet aggrGroupBy;
- private ImmutableBitSet aggrMetrics;
- private String[] aggrMetricsFuncs;
-
- public GTScanRequest(GTInfo info) {
- this(info, null, null, null);
- }
-
- public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet columns, TupleFilter filterPushDown) {
- this.info = info;
- this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
- this.columns = columns;
- this.filterPushDown = filterPushDown;
- validate();
- }
-
- public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet aggrGroupBy, ImmutableBitSet aggrMetrics, //
- String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
- this(info, range, null, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown);
- }
-
- public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
- ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
- this.info = info;
- this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
- this.columns = dimensions;
- this.filterPushDown = filterPushDown;
-
- this.aggrGroupBy = aggrGroupBy;
- this.aggrMetrics = aggrMetrics;
- this.aggrMetricsFuncs = aggrMetricsFuncs;
-
- validate();
- }
-
- private void validate() {
- if (range == null)
- range = new GTScanRange(null, null);
-
- if (hasAggregation()) {
- if (aggrGroupBy.intersects(aggrMetrics))
- throw new IllegalStateException();
- if (aggrMetrics.cardinality() != aggrMetricsFuncs.length)
- throw new IllegalStateException();
-
- if (columns == null)
- columns = ImmutableBitSet.EMPTY;
- columns = columns.or(aggrGroupBy);
- columns = columns.or(aggrMetrics);
- }
-
- if (columns == null)
- columns = info.colAll;
-
- if (hasFilterPushDown()) {
- validateFilterPushDown();
- }
- }
-
- private void validateFilterPushDown() {
- if (hasFilterPushDown() == false)
- return;
-
- Set<TblColRef> filterColumns = Sets.newHashSet();
- TupleFilter.collectColumns(filterPushDown, filterColumns);
-
- for (TblColRef col : filterColumns) {
- // filter columns must belong to the table
- info.validateColRef(col);
- // filter columns must be returned to satisfy upper layer evaluation (calcite)
- columns = columns.set(col.getColumnDesc().getZeroBasedIndex());
- }
-
- // un-evaluatable filter must be removed
- if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) {
- Set<TblColRef> unevaluableColumns = Sets.newHashSet();
- filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns);
-
- // columns in un-evaluatable filter must be returned without loss so upper layer can do final evaluation
- if (hasAggregation()) {
- for (TblColRef col : unevaluableColumns) {
- aggrGroupBy = aggrGroupBy.set(col.getColumnDesc().getZeroBasedIndex());
- }
- }
- }
- }
-
- public boolean hasFilterPushDown() {
- return filterPushDown != null;
- }
-
- public boolean hasAggregation() {
- return aggrGroupBy != null && aggrMetrics != null && aggrMetricsFuncs != null;
- }
-
- public GTInfo getInfo() {
- return info;
- }
-
- public GTRecord getPkStart() {
- return range.pkStart;
- }
-
- public GTRecord getPkEnd() {
- return range.pkEnd;
- }
-
- public ImmutableBitSet getColumns() {
- return columns;
- }
-
- public TupleFilter getFilterPushDown() {
- return filterPushDown;
- }
-
- public ImmutableBitSet getAggrGroupBy() {
- return aggrGroupBy;
- }
-
- public ImmutableBitSet getAggrMetrics() {
- return aggrMetrics;
- }
-
- public String[] getAggrMetricsFuncs() {
- return aggrMetricsFuncs;
- }
-
- @Override
- public String toString() {
- return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GTUtil.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTUtil.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTUtil.java
deleted file mode 100644
index 2532659..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTUtil.java
+++ /dev/null
@@ -1,221 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.IFilterCodeSystem;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilterSerializer;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Sets;
-
-public class GTUtil {
-
- static final TableDesc MOCKUP_TABLE = TableDesc.mockup("GT_MOCKUP_TABLE");
-
- static TblColRef tblColRef(int col, String datatype) {
- ColumnDesc desc = ColumnDesc.mockup(MOCKUP_TABLE, col + 1, "" + col, datatype);
- return new TblColRef(desc);
- }
-
- public static TupleFilter convertFilterUnevaluatable(TupleFilter rootFilter, GTInfo info, //
- Set<TblColRef> unevaluatableColumnCollector) {
- return convertFilter(rootFilter, info, null, false, unevaluatableColumnCollector);
- }
-
- public static TupleFilter convertFilterConstants(TupleFilter rootFilter, GTInfo info) {
- return convertFilter(rootFilter, info, null, true, null);
- }
-
- public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
- List<TblColRef> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
- return convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);
- }
-
- // converts TblColRef to GridTable column, encode constants, drop unEvaluatable parts
- private static TupleFilter convertFilter(TupleFilter rootFilter, final GTInfo info, //
- final List<TblColRef> colMapping, final boolean encodeConstants, //
- final Set<TblColRef> unevaluatableColumnCollector) {
-
- IFilterCodeSystem<ByteArray> filterCodeSystem = wrap(info.codeSystem.getComparator());
-
- byte[] bytes = TupleFilterSerializer.serialize(rootFilter, new TupleFilterSerializer.Decorator() {
- @Override
- public TupleFilter onSerialize(TupleFilter filter) {
- if (filter == null)
- return null;
-
- // In case of NOT(unEvaluatableFilter), we should immediately replace it as TRUE,
- // Otherwise, unEvaluatableFilter will later be replace with TRUE and NOT(unEvaluatableFilter)
- // will always return FALSE.
- if (filter.getOperator() == TupleFilter.FilterOperatorEnum.NOT && !TupleFilter.isEvaluableRecursively(filter)) {
- TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
- return ConstantTupleFilter.TRUE;
- }
-
- // shortcut for unEvaluatable filter
- if (filter.isEvaluable() == false) {
- TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
- return ConstantTupleFilter.TRUE;
- }
-
- // map to column onto grid table
- if (colMapping != null && filter instanceof ColumnTupleFilter) {
- ColumnTupleFilter colFilter = (ColumnTupleFilter) filter;
- int gtColIdx = colMapping.indexOf(colFilter.getColumn());
- return new ColumnTupleFilter(info.colRef(gtColIdx));
- }
-
- // encode constants
- if (encodeConstants && filter instanceof CompareTupleFilter) {
- return encodeConstants((CompareTupleFilter) filter);
- }
-
- return filter;
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private TupleFilter encodeConstants(CompareTupleFilter oldCompareFilter) {
- // extract ColumnFilter & ConstantFilter
- TblColRef externalCol = oldCompareFilter.getColumn();
-
- if (externalCol == null) {
- return oldCompareFilter;
- }
-
- Collection constValues = oldCompareFilter.getValues();
- if (constValues == null || constValues.isEmpty()) {
- return oldCompareFilter;
- }
-
- CompareTupleFilter newCompareFilter = new CompareTupleFilter(oldCompareFilter.getOperator());
- newCompareFilter.addChild(new ColumnTupleFilter(externalCol));
-
- Object firstValue = constValues.iterator().next();
- int col = colMapping == null ? externalCol.getColumnDesc().getZeroBasedIndex() : colMapping.indexOf(externalCol);
-
- TupleFilter result;
- ByteArray code;
-
- // translate constant into code
- switch (newCompareFilter.getOperator()) {
- case EQ:
- case IN:
- Set newValues = Sets.newHashSet();
- for (Object value : constValues) {
- code = translate(col, value, 0);
- if (code != null)
- newValues.add(code);
- }
- if (newValues.isEmpty()) {
- result = ConstantTupleFilter.FALSE;
- } else {
- newCompareFilter.addChild(new ConstantTupleFilter(newValues));
- result = newCompareFilter;
- }
- break;
- case NEQ:
- code = translate(col, firstValue, 0);
- if (code == null) {
- result = ConstantTupleFilter.TRUE;
- } else {
- newCompareFilter.addChild(new ConstantTupleFilter(code));
- result = newCompareFilter;
- }
- break;
- case LT:
- code = translate(col, firstValue, 1);
- if (code == null) {
- result = ConstantTupleFilter.TRUE;
- } else {
- newCompareFilter.addChild(new ConstantTupleFilter(code));
- result = newCompareFilter;
- }
- break;
- case LTE:
- code = translate(col, firstValue, -1);
- if (code == null) {
- result = ConstantTupleFilter.FALSE;
- } else {
- newCompareFilter.addChild(new ConstantTupleFilter(code));
- result = newCompareFilter;
- }
- break;
- case GT:
- code = translate(col, firstValue, -1);
- if (code == null) {
- result = ConstantTupleFilter.TRUE;
- } else {
- newCompareFilter.addChild(new ConstantTupleFilter(code));
- result = newCompareFilter;
- }
- break;
- case GTE:
- code = translate(col, firstValue, 1);
- if (code == null) {
- result = ConstantTupleFilter.FALSE;
- } else {
- newCompareFilter.addChild(new ConstantTupleFilter(code));
- result = newCompareFilter;
- }
- break;
- default:
- throw new IllegalStateException("Cannot handle operator " + newCompareFilter.getOperator());
- }
- return result;
- }
-
- transient ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
-
- private ByteArray translate(int col, Object value, int roundingFlag) {
- try {
- buf.clear();
- info.codeSystem.encodeColumnValue(col, value, roundingFlag, buf);
- return ByteArray.copyOf(buf.array(), 0, buf.position());
- } catch (IllegalArgumentException ex) {
- return null;
- }
- }
- }, filterCodeSystem);
-
- return TupleFilterSerializer.deserialize(bytes, filterCodeSystem);
- }
-
- public static IFilterCodeSystem<ByteArray> wrap(final IGTComparator comp) {
- return new IFilterCodeSystem<ByteArray>() {
-
- @Override
- public int compare(ByteArray o1, ByteArray o2) {
- return comp.compare(o1, o2);
- }
-
- @Override
- public boolean isNull(ByteArray code) {
- return comp.isNull(code);
- }
-
- @Override
- public void serialize(ByteArray code, ByteBuffer buffer) {
- if (code == null)
- BytesUtil.writeByteArray(null, 0, 0, buffer);
- else
- BytesUtil.writeByteArray(code.array(), code.offset(), code.length(), buffer);
- }
-
- @Override
- public ByteArray deserialize(ByteBuffer buffer) {
- return new ByteArray(BytesUtil.readByteArray(buffer));
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
deleted file mode 100644
index 092227b..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class GridTable implements Closeable {
-
- final GTInfo info;
- final IGTStore store;
-
- public GridTable(GTInfo info, IGTStore store) {
- this.info = info;
- this.store = store;
- }
-
- public GTBuilder rebuild() throws IOException {
- assert info.isShardingEnabled() == false;
- return rebuild(-1);
- }
-
- public GTBuilder rebuild(int shard) throws IOException {
- assert shard < info.nShards;
- return new GTBuilder(info, shard, store);
- }
-
- public GTBuilder append() throws IOException {
- assert info.isShardingEnabled() == false;
- return append(-1);
- }
-
- public GTBuilder append(int shard) throws IOException {
- return new GTBuilder(info, shard, store, true);
- }
-
- public IGTScanner scan(GTScanRequest req) throws IOException {
- IGTScanner result = new GTRawScanner(info, store, req);
-
- if (req.hasFilterPushDown()) {
- result = new GTFilterScanner(result, req);
- }
- if (req.hasAggregation()) {
- result = new GTAggregateScanner(result, req);
- }
- return result;
- }
-
- public GTInfo getInfo() {
- return info;
- }
-
- public IGTStore getStore() {
- return store;
- }
-
- @Override
- public void close() throws IOException {
- if (store instanceof Closeable) {
- ((Closeable) store).close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java
deleted file mode 100644
index 644c94d..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-
-public interface IGTCodeSystem {
-
- void init(GTInfo info);
-
- IGTComparator getComparator();
-
- /** Return the length of code starting at the specified buffer, buffer position must not change after return */
- int codeLength(int col, ByteBuffer buf);
-
- /** Return the max possible length of a column */
- int maxCodeLength(int col);
-
- /**
- * Encode a value into code.
- *
- * @throws IllegalArgumentException if the value is not in dictionary
- */
- void encodeColumnValue(int col, Object value, ByteBuffer buf) throws IllegalArgumentException;
-
- /**
- * Encode a value into code, with option to floor rounding -1, no rounding 0, or ceiling rounding 1
- *
- * @throws IllegalArgumentException
- * - if rounding=0 and the value is not in dictionary
- * - if rounding=-1 and there's no equal or smaller value in dictionary
- * - if rounding=1 and there's no equal or bigger value in dictionary
- */
- void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) throws IllegalArgumentException;
-
- /** Decode a code into value */
- Object decodeColumnValue(int col, ByteBuffer buf);
-
- /** Return aggregators for metrics */
- MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTComparator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTComparator.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTComparator.java
deleted file mode 100644
index 442adcc..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTComparator.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.util.Comparator;
-
-import org.apache.kylin.common.util.ByteArray;
-
-public interface IGTComparator extends Comparator<ByteArray> {
-
- /** if given code represents the NULL value */
- boolean isNull(ByteArray code);
-
- /** compare two values by their codes */
- // int compare(T code1, T code2);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
deleted file mode 100644
index 3d3c3c8..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.Closeable;
-
-public interface IGTScanner extends Iterable<GTRecord>, Closeable {
-
- GTInfo getInfo();
-
- int getScannedRowCount();
-
- int getScannedRowBlockCount();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
deleted file mode 100644
index cf4a3cc..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-
-public interface IGTStore {
-
- GTInfo getInfo();
-
- IGTStoreWriter rebuild(int shard) throws IOException;
-
- IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException;
-
- IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException;
-
- interface IGTStoreWriter extends Closeable {
- void write(GTRowBlock block) throws IOException;
- }
-
- interface IGTStoreScanner extends Iterator<GTRowBlock>, Closeable {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
deleted file mode 100644
index b1a7180..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.gridtable;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.measure.LongMutable;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.storage.gridtable.GTInfo.Builder;
-
-public class UnitTestSupport {
-
- public static GTInfo basicInfo() {
- Builder builder = infoBuilder();
- GTInfo info = builder.build();
- return info;
- }
-
- public static GTInfo advancedInfo() {
- Builder builder = infoBuilder();
- builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
- builder.enableRowBlock(4);
- GTInfo info = builder.build();
- return info;
- }
-
- private static Builder infoBuilder() {
- Builder builder = GTInfo.builder();
- builder.setCodeSystem(new GTSampleCodeSystem());
- builder.setColumns( //
- DataType.getInstance("varchar(10)"), //
- DataType.getInstance("varchar(10)"), //
- DataType.getInstance("varchar(10)"), //
- DataType.getInstance("bigint"), //
- DataType.getInstance("decimal") //
- );
- builder.setPrimaryKey(setOf(0));
- builder.setColumnPreferIndex(setOf(0));
- return builder;
- }
-
- public static List<GTRecord> mockupData(GTInfo info, int nRows) {
- List<GTRecord> result = new ArrayList<GTRecord>(nRows);
- int round = nRows / 10;
- for (int i = 0; i < round; i++) {
- String d_01_14 = datePlus("2015-01-14", i * 4);
- String d_01_15 = datePlus("2015-01-15", i * 4);
- String d_01_16 = datePlus("2015-01-16", i * 4);
- String d_01_17 = datePlus("2015-01-17", i * 4);
- result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5")));
- result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5")));
- }
- return result;
- }
-
- private static String datePlus(String date, int plusDays) {
- long millis = DateFormat.stringToMillis(date);
- millis += (1000L * 3600L * 24L) * plusDays;
- return DateFormat.formatToDateStr(millis);
- }
-
- private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) {
- GTRecord rec = new GTRecord(info);
- return rec.setValues(date, name, category, amount, price);
- }
-
- private static ImmutableBitSet setOf(int... values) {
- BitSet set = new BitSet();
- for (int i : values)
- set.set(i);
- return new ImmutableBitSet(set);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
deleted file mode 100644
index db245fd..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.kylin.storage.gridtable.diskstore;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- */
-interface FileSystem {
-
- boolean checkExistence(String path);
-
- boolean delete(String path);
-
- void deleteOnExit(String path);
-
- boolean createDirectory(String path);
-
- boolean createFile(String path);
-
- OutputStream getWriter(String path);
-
- InputStream getReader(String path);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
deleted file mode 100644
index 4dcacf7..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package org.apache.kylin.storage.gridtable.diskstore;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class GTDiskStore implements IGTStore {
-
- private static final Logger logger = LoggerFactory.getLogger(GTDiskStore.class);
-
- private final String identifier;
- private final FileSystem fileSystem;
- private final DiskStoreWriter writer;
- private final GTInfo gtInfo;
-
- public GTDiskStore(GTInfo gtInfo) {
- this.gtInfo = gtInfo;
- this.fileSystem = new LocalFileSystem();
- this.identifier = generateIdentifier(fileSystem);
- logger.info("disk store created, identifier:" + identifier);
- this.writer = new DiskStoreWriter(fileSystem.getWriter(getRowBlockFile(identifier)));
- deleteTmpFilesOnExit();
- }
-
- @Override
- public GTInfo getInfo() {
- return gtInfo;
- }
-
- private String generateIdentifier(FileSystem fs) {
- int tryCount = 0;
- while (true) {
- //uuid may conflict
- String identifier = UUID.randomUUID().toString();
- final String path = getRootDirectory(identifier);
- if (fs.createDirectory(path)) {
- return identifier;
- } else {
- logger.warn("failed to create dir " + path);
- if (++tryCount > 5) {
- throw new RuntimeException("failed to generateIdentifier");
- }
- }
- }
- }
-
- private String getRootDirectory(String identifier) {
- return "/tmp/kylin_gtdiskstore_" + identifier;
- }
-
- private String getRowBlockFile(String identifier) {
- return getRootDirectory(identifier) + "/rowblock";
- }
-
- private class DiskStoreWriter implements IGTStoreWriter {
-
- private final DataOutputStream outputStream;
-
- DiskStoreWriter(OutputStream outputStream) {
- this.outputStream = new DataOutputStream(outputStream);
- }
-
- @Override
- public void write(GTRowBlock block) throws IOException {
- final int blockSize = block.exportLength();
- outputStream.writeInt(blockSize);
- block.export(outputStream);
- outputStream.flush();
- }
-
- @Override
- public void close() throws IOException {
- outputStream.close();
- }
- }
-
- public long memoryUsage() {
- return 0;
- }
-
- @Override
- public IGTStoreWriter rebuild(int shard) throws IOException {
- return writer;
- }
-
- @Override
- public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
- return writer;
- }
-
- private class DiskStoreScanner implements IGTStoreScanner {
-
- private final DataInputStream inputStream;
- private int blockSize = 0;
-
- DiskStoreScanner(InputStream inputStream) {
- this.inputStream = new DataInputStream(inputStream);
- }
-
- @Override
- public void close() throws IOException {
- inputStream.close();
- }
-
- @Override
- public boolean hasNext() {
- try {
- blockSize = inputStream.readInt();
- return blockSize > 0;
- } catch (EOFException e) {
- return false;
- } catch (IOException e) {
- logger.error("input stream fail", e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public GTRowBlock next() {
- GTRowBlock block = new GTRowBlock(gtInfo);
- ByteBuffer buffer = ByteBuffer.allocate(blockSize);
- int count = blockSize;
- while (count > 0) {
- try {
- count -= inputStream.read(buffer.array(), buffer.position(), buffer.remaining());
- } catch (IOException e) {
- logger.error("input stream fail", e);
- throw new RuntimeException(e);
- }
- }
- Preconditions.checkArgument(count == 0, "invalid read count:" + count + " block size:" + blockSize);
- block.load(buffer);
- return block;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- @Override
- public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
- return new DiskStoreScanner(fileSystem.getReader(getRowBlockFile(identifier)));
- }
-
- public void drop() throws IOException {
- try {
- writer.close();
- } catch (Exception e) {
- logger.error("error to close writer", e);
- }
- deleteTmpFilesOnExit();
- }
-
- private void deleteTmpFilesOnExit() {
- fileSystem.deleteOnExit(getRowBlockFile(identifier));
- fileSystem.deleteOnExit(getRootDirectory(identifier));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
deleted file mode 100644
index 429a30b..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.kylin.storage.gridtable.diskstore;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- */
-class HadoopFileSystem implements FileSystem {
-
- private static final Logger logger = LoggerFactory.getLogger(HadoopFileSystem.class);
-
- final org.apache.hadoop.fs.FileSystem fileSystem;
-
- HadoopFileSystem() {
- try {
- fileSystem = org.apache.hadoop.fs.FileSystem.get(HadoopUtil.getCurrentConfiguration());
- } catch (IOException e) {
- logger.error("error construct HadoopFileSystem", e);
- throw new RuntimeException(e);
- }
- }
- @Override
- public boolean checkExistence(String path) {
- try {
- return fileSystem.exists(new Path(path));
- } catch (IOException e) {
- logger.error("error checkExistence, path:" + path, e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean delete(String path) {
- try {
- return fileSystem.delete(new Path(path), true);
- } catch (IOException e) {
- logger.error("error delete, path:" + path, e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void deleteOnExit(String path) {
- try {
- fileSystem.deleteOnExit(new Path(path));
- } catch (IOException e) {
- logger.error("error deleteOnExit, path:" + path, e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean createDirectory(String path) {
- try {
- return fileSystem.mkdirs(new Path(path));
- } catch (IOException e) {
- logger.error("error createDirectory, path:" + path, e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean createFile(String path) {
- try {
- return fileSystem.createNewFile(new Path(path));
- } catch (IOException e) {
- logger.error("error createFile, path:" + path, e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public OutputStream getWriter(String path) {
- try {
- return fileSystem.create(new Path(path));
- } catch (IOException e) {
- logger.error("error getWriter, path:" + path, e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public InputStream getReader(String path) {
- try {
- return fileSystem.open(new Path(path));
- } catch (IOException e) {
- logger.error("error getReader, path:" + path, e);
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
deleted file mode 100644
index 0d07f3b..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.kylin.storage.gridtable.diskstore;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-/**
- */
-class LocalFileSystem implements FileSystem {
-
- private static Logger logger = LoggerFactory.getLogger(LocalFileSystem.class);
-
- LocalFileSystem(){}
-
- @Override
- public boolean checkExistence(String path) {
- return new File(path).exists();
- }
-
- @Override
- public boolean delete(String path) {
- return new File(path).delete();
- }
-
- @Override
- public void deleteOnExit(String path) {
- new File(path).deleteOnExit();
- }
-
- @Override
- public boolean createDirectory(String path) {
- return new File(path).mkdirs();
- }
-
- @Override
- public boolean createFile(String path) {
- try {
- return new File(path).createNewFile();
- } catch (IOException e) {
- logger.warn("create file failed:" + path, e);
- return false;
- }
- }
-
- @Override
- public OutputStream getWriter(String path) {
- try {
- return new FileOutputStream(path);
- } catch (FileNotFoundException e) {
- //should not happen
- logger.error("path:" + path + " out found");
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public InputStream getReader(String path) {
- try {
- return new FileInputStream(path);
- } catch (FileNotFoundException e) {
- //should not happen
- logger.error("path:" + path + " out found");
- throw new RuntimeException(e);
- }
- }
-}