You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:33 UTC
[25/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
new file mode 100644
index 0000000..85ac47a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTAggregateScanner;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
+ * Assumes base cuboid fits in memory or otherwise OOM exception will occur.
+ */
+public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
+ private static final LongMutable ONE = new LongMutable(1l);
+
+ private final CuboidScheduler cuboidScheduler;
+ private final long baseCuboidId;
+ private final int totalCuboidCount;
+ private final CubeJoinedFlatTableDesc intermediateTableDesc;
+ private final MeasureCodec measureCodec;
+ private final String[] metricsAggrFuncs;
+ private final int[] hbaseMeasureRefIndex;
+ private final MeasureDesc[] measureDescs;
+ private final int measureCount;
+
+ private MemoryBudgetController memBudget;
+ private Thread[] taskThreads;
+ private Throwable[] taskThreadExceptions;
+ private LinkedBlockingQueue<CuboidTask> taskPending;
+ private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
+
+ private CuboidResult baseResult;
+ private Object[] totalSumForSanityCheck;
+ private ICuboidCollector resultCollector;
+
+ public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ super(cubeDesc, dictionaryMap);
+ this.cuboidScheduler = new CuboidScheduler(cubeDesc);
+ this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ this.totalCuboidCount = cuboidScheduler.getCuboidCount();
+ this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+
+ Map<String, Integer> measureIndexMap = Maps.newHashMap();
+ List<String> metricsAggrFuncsList = Lists.newArrayList();
+ measureCount = cubeDesc.getMeasures().size();
+
+ List<MeasureDesc> measureDescsList = Lists.newArrayList();
+ hbaseMeasureRefIndex = new int[measureCount];
+ int measureRef = 0;
+ for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+ for (int j = 0; j < measureCount; j++) {
+ if (cubeDesc.getMeasures().get(j).equals(measure)) {
+ measureDescsList.add(measure);
+ hbaseMeasureRefIndex[measureRef] = j;
+ break;
+ }
+ }
+ measureRef++;
+ }
+ }
+ }
+
+ for (int i = 0; i < measureCount; i++) {
+ MeasureDesc measureDesc = measureDescsList.get(i);
+ metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+ measureIndexMap.put(measureDesc.getName(), i);
+ }
+ this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+ this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+ }
+
+ private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
+ GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
+
+ // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+ // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
+ // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+ ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+
+ GridTable gridTable = new GridTable(info, store);
+ return gridTable;
+ }
+
+ private Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
+ BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
+ BitSet dimension = new BitSet();
+ dimension.set(0, bitSet.cardinality());
+ BitSet metrics = new BitSet();
+ metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
+ return new Pair<ImmutableBitSet, ImmutableBitSet>(new ImmutableBitSet(dimension), new ImmutableBitSet(metrics));
+ }
+
+ @Override
+ public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ ConcurrentNavigableMap<Long, CuboidResult> result = build(input);
+ for (CuboidResult cuboidResult : result.values()) {
+ outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+ cuboidResult.table.close();
+ }
+ }
+
+ ConcurrentNavigableMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+ final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
+ build(input, new ICuboidCollector() {
+ @Override
+ public void collect(CuboidResult cuboidResult) {
+ result.put(cuboidResult.cuboidId, cuboidResult);
+ }
+ });
+ return result;
+ }
+
+ interface ICuboidCollector {
+ void collect(CuboidResult result);
+ }
+
+ static class CuboidResult {
+ public long cuboidId;
+ public GridTable table;
+ public int nRows;
+ public long timeSpent;
+ public int aggrCacheMB;
+
+ public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+ this.cuboidId = cuboidId;
+ this.table = table;
+ this.nRows = nRows;
+ this.timeSpent = timeSpent;
+ this.aggrCacheMB = aggrCacheMB;
+ }
+ }
+
+ private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
+ long startTime = System.currentTimeMillis();
+ logger.info("In Mem Cube Build start, " + cubeDesc.getName());
+
+ // multiple threads to compute cuboid in parallel
+ taskPending = new LinkedBlockingQueue<>();
+ taskCuboidCompleted.set(0);
+ taskThreads = prepareTaskThreads();
+ taskThreadExceptions = new Throwable[taskThreadCount];
+
+ // build base cuboid
+ resultCollector = collector;
+ totalSumForSanityCheck = null;
+ baseResult = createBaseCuboid(input);
+ if (baseResult.nRows == 0)
+ return;
+
+ // plan memory budget
+ makeMemoryBudget();
+
+ // kick off N-D cuboid tasks and output
+ addChildTasks(baseResult);
+ start(taskThreads);
+
+ // wait complete
+ join(taskThreads);
+
+ long endTime = System.currentTimeMillis();
+ logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
+
+ throwExceptionIfAny();
+ }
+
+ public void abort() {
+ interrupt(taskThreads);
+ }
+
+ private void start(Thread... threads) {
+ for (Thread t : threads)
+ t.start();
+ }
+
+ private void interrupt(Thread... threads) {
+ for (Thread t : threads)
+ t.interrupt();
+ }
+
+ private void join(Thread... threads) throws IOException {
+ try {
+ for (Thread t : threads)
+ t.join();
+ } catch (InterruptedException e) {
+ throw new IOException("interrupted while waiting task and output complete", e);
+ }
+ }
+
+ private void throwExceptionIfAny() throws IOException {
+ ArrayList<Throwable> errors = new ArrayList<Throwable>();
+ for (int i = 0; i < taskThreadCount; i++) {
+ Throwable t = taskThreadExceptions[i];
+ if (t != null)
+ errors.add(t);
+ }
+ if (errors.isEmpty()) {
+ return;
+ } else if (errors.size() == 1) {
+ Throwable t = errors.get(0);
+ if (t instanceof IOException)
+ throw (IOException) t;
+ else
+ throw new IOException(t);
+ } else {
+ for (Throwable t : errors)
+ logger.error("Exception during in-mem cube build", t);
+ throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
+ }
+ }
+
+ private Thread[] prepareTaskThreads() {
+ Thread[] result = new Thread[taskThreadCount];
+ for (int i = 0; i < taskThreadCount; i++) {
+ result[i] = new CuboidTaskThread(i);
+ }
+ return result;
+ }
+
+ public boolean isAllCuboidDone() {
+ return taskCuboidCompleted.get() == totalCuboidCount;
+ }
+
+ private class CuboidTaskThread extends Thread {
+ private int id;
+
+ CuboidTaskThread(int id) {
+ super("CuboidTask-" + id);
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!isAllCuboidDone()) {
+ CuboidTask task = null;
+ while (task == null && taskHasNoException()) {
+ task = taskPending.poll(15, TimeUnit.SECONDS);
+ }
+ // if task error occurs
+ if (task == null)
+ break;
+
+ CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
+ addChildTasks(newCuboid);
+
+ if (isAllCuboidDone()) {
+ for (Thread t : taskThreads) {
+ if (t != Thread.currentThread())
+ t.interrupt();
+ }
+ }
+ }
+ } catch (Throwable ex) {
+ if (!isAllCuboidDone()) {
+ logger.error("task thread exception", ex);
+ taskThreadExceptions[id] = ex;
+ }
+ }
+ }
+ }
+
+ private boolean taskHasNoException() {
+ for (int i = 0; i < taskThreadExceptions.length; i++)
+ if (taskThreadExceptions[i] != null)
+ return false;
+ return true;
+ }
+
+ private void addChildTasks(CuboidResult parent) {
+ List<Long> children = cuboidScheduler.getSpanningCuboid(parent.cuboidId);
+ for (Long child : children) {
+ taskPending.add(new CuboidTask(parent, child));
+ }
+ }
+
+ private int getSystemAvailMB() {
+ Runtime.getRuntime().gc();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error("", e);
+ }
+ return MemoryBudgetController.getSystemAvailMB();
+ }
+
+ private void makeMemoryBudget() {
+ int systemAvailMB = getSystemAvailMB();
+ logger.info("System avail " + systemAvailMB + " MB");
+ int reserve = Math.max(reserveMemoryMB, baseResult.aggrCacheMB / 3);
+ logger.info("Reserve " + reserve + " MB for system basics");
+
+ int budget = systemAvailMB - reserve;
+ if (budget < baseResult.aggrCacheMB) {
+ // make sure we have base aggr cache as minimal
+ budget = baseResult.aggrCacheMB;
+ logger.warn("!!! System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
+ }
+
+ logger.info("Memory Budget is " + budget + " MB");
+ memBudget = new MemoryBudgetController(budget);
+ }
+
+ private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
+ GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
+ GTBuilder baseBuilder = baseCuboid.rebuild();
+ IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
+
+ int mbBefore = getSystemAvailMB();
+ int mbAfter = 0;
+
+ Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
+ GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
+ GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+
+ long startTime = System.currentTimeMillis();
+ logger.info("Calculating cuboid " + baseCuboidId);
+
+ int count = 0;
+ for (GTRecord r : aggregationScanner) {
+ if (mbAfter == 0) {
+ mbAfter = getSystemAvailMB();
+ }
+ baseBuilder.write(r);
+ count++;
+ }
+ aggregationScanner.close();
+ baseBuilder.close();
+
+ long timeSpent = System.currentTimeMillis() - startTime;
+ logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
+
+ int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter;
+ int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
+ int mbBaseAggrCache = Math.max((int) (mbBaseAggrCacheOnHeap * 1.1), mbEstimateBaseAggrCache);
+ mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be 10 MB at least
+ logger.info("Base aggr cache is " + mbBaseAggrCache + " MB (heap " + mbBaseAggrCacheOnHeap + " MB, estimate " + mbEstimateBaseAggrCache + " MB)");
+
+ return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache);
+ }
+
+ private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+ if (aggrCacheMB <= 0) {
+ aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
+ }
+
+ CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
+ taskCuboidCompleted.incrementAndGet();
+
+ resultCollector.collect(result);
+ return result;
+ }
+
+ private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException {
+ final String consumerName = "AggrCache@Cuboid " + cuboidId;
+ MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
+ @Override
+ public int freeUp(int mb) {
+ return 0; // cannot free up on demand
+ }
+
+ @Override
+ public String toString() {
+ return consumerName;
+ }
+ };
+
+ // reserve memory for aggregation cache, can't be larger than the parent
+ memBudget.reserveInsist(consumer, parent.aggrCacheMB);
+ try {
+ return aggregateCuboid(parent, cuboidId);
+ } finally {
+ memBudget.reserve(consumer, 0);
+ }
+ }
+
+ private CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
+ Pair<ImmutableBitSet, ImmutableBitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parent.cuboidId);
+ ImmutableBitSet parentDimensions = columnBitSets.getFirst();
+ ImmutableBitSet measureColumns = columnBitSets.getSecond();
+ ImmutableBitSet childDimensions = parentDimensions;
+
+ long mask = Long.highestOneBit(parent.cuboidId);
+ long childCuboidId = cuboidId;
+ long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parent.cuboidId);
+ int index = 0;
+ for (int i = 0; i < parentCuboidIdActualLength; i++) {
+ if ((mask & parent.cuboidId) > 0) {
+ if ((mask & childCuboidId) == 0) {
+ // this dim will be aggregated
+ childDimensions = childDimensions.set(index, false);
+ }
+ index++;
+ }
+ mask = mask >> 1;
+ }
+
+ return scanAndAggregateGridTable(parent.table, cuboidId, childDimensions, measureColumns);
+ }
+
+ private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
+ long startTime = System.currentTimeMillis();
+ logger.info("Calculating cuboid " + cuboidId);
+
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+ GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
+ GridTable newGridTable = newGridTableByCuboidID(cuboidId);
+ GTBuilder builder = newGridTable.rebuild();
+
+ ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns);
+
+ GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+ int count = 0;
+ try {
+ for (GTRecord record : scanner) {
+ count++;
+ for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
+ int c = allNeededColumns.trueBitAt(i);
+ newRecord.set(i, record.get(c));
+ }
+ builder.write(newRecord);
+ }
+
+ // disable sanity check for performance
+ sanityCheck(scanner.getTotalSumForSanityCheck());
+ } finally {
+ scanner.close();
+ builder.close();
+ }
+
+ long timeSpent = System.currentTimeMillis() - startTime;
+ logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
+
+ return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
+ }
+
+ //@SuppressWarnings("unused")
+ private void sanityCheck(Object[] totalSum) {
+ // double sum introduces error and causes result not exactly equal
+ for (int i = 0; i < totalSum.length; i++) {
+ if (totalSum[i] instanceof DoubleMutable) {
+ totalSum[i] = Math.round(((DoubleMutable) totalSum[i]).get());
+ }
+ }
+
+ if (totalSumForSanityCheck == null) {
+ totalSumForSanityCheck = totalSum;
+ return;
+ }
+ if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) {
+ throw new IllegalStateException();
+ }
+ }
+
+ // ===========================================================================
+
+ private static class CuboidTask implements Comparable<CuboidTask> {
+ final CuboidResult parent;
+ final long childCuboidId;
+
+ CuboidTask(CuboidResult parent, long childCuboidId) {
+ this.parent = parent;
+ this.childCuboidId = childCuboidId;
+ }
+
+ @Override
+ public int compareTo(CuboidTask o) {
+ long comp = this.childCuboidId - o.childCuboidId;
+ return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
+ }
+ }
+
+ // ============================================================================
+
+ private class InputConverter implements IGTScanner {
+ GTInfo info;
+ GTRecord record;
+ BlockingQueue<List<String>> input;
+
+ public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
+ this.info = info;
+ this.input = input;
+ this.record = new GTRecord(info);
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ List<String> currentObject = null;
+
+ @Override
+ public boolean hasNext() {
+ try {
+ currentObject = input.take();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return currentObject != null && currentObject.size() > 0;
+ }
+
+ @Override
+ public GTRecord next() {
+ if (currentObject.size() == 0)
+ throw new IllegalStateException();
+
+ buildGTRecord(currentObject, record);
+ return record;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return 0;
+ }
+
+ @Override
+ public int getScannedRowBlockCount() {
+ return 0;
+ }
+
+ private void buildGTRecord(List<String> row, GTRecord record) {
+ Object[] dimensions = buildKey(row);
+ Object[] metricsValues = buildValue(row);
+ Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+ System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+ System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+ record.setValues(recordValues);
+ }
+
+ private Object[] buildKey(List<String> row) {
+ int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+ Object[] key = new Object[keySize];
+
+ for (int i = 0; i < keySize; i++) {
+ key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ }
+
+ return key;
+ }
+
+ private Object[] buildValue(List<String> row) {
+
+ Object[] values = new Object[measureCount];
+ MeasureDesc measureDesc = null;
+
+ for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
+ int i = hbaseMeasureRefIndex[position];
+ measureDesc = measureDescs[i];
+
+ Object value = null;
+ int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
+ FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
+ if (function.isCount() || function.isHolisticCountDistinct()) {
+ // note for holistic count distinct, this value will be ignored
+ value = ONE;
+ } else if (flatTableIdx == null) {
+ value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+ } else if (flatTableIdx.length == 1) {
+ value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
+ } else {
+
+ byte[] result = null;
+ for (int x = 0; x < flatTableIdx.length; x++) {
+ byte[] split = toBytes(row.get(flatTableIdx[x]));
+ if (result == null) {
+ result = Arrays.copyOf(split, split.length);
+ } else {
+ byte[] newResult = new byte[result.length + split.length];
+ System.arraycopy(result, 0, newResult, 0, result.length);
+ System.arraycopy(split, 0, newResult, result.length, split.length);
+ result = newResult;
+ }
+ }
+ value = measureCodec.getSerializer(i).valueOf(result);
+ }
+ values[position] = value;
+ }
+ return values;
+ }
+
+ private byte[] toBytes(String v) {
+ return v == null ? null : Bytes.toBytes(v);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
new file mode 100644
index 0000000..d9b0ba6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemDiskStore implements IGTStore, Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+ private static final boolean debug = true;
+
+ private static final int STREAM_BUFFER_SIZE = 8192;
+ private static final int MEM_CHUNK_SIZE_MB = 5;
+
+ private final GTInfo info;
+ private final Object lock; // all public methods that read/write object states are synchronized on this lock
+ private final MemPart memPart;
+ private final DiskPart diskPart;
+ private final boolean delOnClose;
+
+ private Writer ongoingWriter;
+
+ public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
+ this(info, budgetCtrl, File.createTempFile("MemDiskStore", ""), true);
+ }
+
+ public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
+ this(info, budgetCtrl, diskFile, false);
+ }
+
+ private MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
+ this.info = info;
+ this.lock = this;
+ this.memPart = new MemPart(budgetCtrl);
+ this.diskPart = new DiskPart(diskFile);
+ this.delOnClose = delOnClose;
+
+ // in case user forget to call close()
+ if (delOnClose)
+ diskFile.deleteOnExit();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return newWriter(0);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ return newWriter(length());
+ }
+
+ private Writer newWriter(long startOffset) throws IOException {
+ synchronized (lock) {
+ if (ongoingWriter != null)
+ throw new IllegalStateException();
+
+ ongoingWriter = new Writer(startOffset);
+ return ongoingWriter;
+ }
+ }
+
+ @Override
+ public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ synchronized (lock) {
+ return new Reader();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // synchronized inside the parts close()
+ memPart.close();
+ diskPart.close();
+ }
+
+ public long length() {
+ synchronized (lock) {
+ return Math.max(memPart.tailOffset(), diskPart.tailOffset);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
+ }
+
+ private class Reader implements IGTStoreScanner {
+
+ final DataInputStream din;
+ long readOffset = 0;
+ long memRead = 0;
+ long diskRead = 0;
+ int nReadCalls = 0;
+
+ GTRowBlock block = GTRowBlock.allocate(info);
+ GTRowBlock next = null;
+
+ Reader() throws IOException {
+ diskPart.openRead();
+ if (debug)
+ logger.debug(MemDiskStore.this + " read start @ " + readOffset);
+
+ InputStream in = new InputStream() {
+ byte[] tmp = new byte[1];
+ MemChunk memChunk;
+
+ @Override
+ public int read() throws IOException {
+ int n = read(tmp, 0, 1);
+ if (n <= 0)
+ return -1;
+ else
+ return (int) tmp[0];
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ synchronized (lock) {
+ nReadCalls++;
+ if (available() <= 0)
+ return -1;
+
+ if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
+ memChunk = memPart.seekMemChunk(readOffset);
+ }
+
+ int lenToGo = Math.min(available(), len);
+
+ int nRead = 0;
+ while (lenToGo > 0) {
+ int n;
+ if (memChunk != null) {
+ if (memChunk.headOffset() > readOffset) {
+ memChunk = null;
+ continue;
+ }
+ if (readOffset >= memChunk.tailOffset()) {
+ memChunk = memChunk.next;
+ continue;
+ }
+ int chunkOffset = (int) (readOffset - memChunk.headOffset());
+ n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
+ System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+ memRead += n;
+ } else {
+ n = diskPart.read(readOffset, b, off, lenToGo);
+ diskRead += n;
+ }
+ lenToGo -= n;
+ nRead += n;
+ off += n;
+ readOffset += n;
+ }
+ return nRead;
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ synchronized (lock) {
+ return (int) (length() - readOffset);
+ }
+ }
+ };
+
+ din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ try {
+ if (din.available() > 0) {
+ block.importFrom(din);
+ next = block;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return next != null;
+ }
+
+ @Override
+ public GTRowBlock next() {
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+ GTRowBlock r = next;
+ next = null;
+ return r;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ din.close();
+ diskPart.closeRead();
+ if (debug)
+ logger.debug(MemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+ }
+ }
+
+ }
+
+ private class Writer implements IGTStoreWriter {
+
+ final DataOutputStream dout;
+ long writeOffset;
+ long memWrite = 0;
+ long diskWrite = 0;
+ int nWriteCalls;
+ boolean closed = false;
+
+ Writer(long startOffset) throws IOException {
+ writeOffset = 0; // TODO does not support append yet
+ memPart.clear();
+ diskPart.clear();
+ diskPart.openWrite(false);
+ if (debug)
+ logger.debug(MemDiskStore.this + " write start @ " + writeOffset);
+
+ memPart.activateMemWrite();
+
+ OutputStream out = new OutputStream() {
+ byte[] tmp = new byte[1];
+ boolean memPartActivated = true;
+
+ @Override
+ public void write(int b) throws IOException {
+ tmp[0] = (byte) b;
+ write(tmp, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ // lock inside memPart.write() and diskPartm.write()
+ nWriteCalls++;
+ while (length > 0) {
+ int n;
+ if (memPartActivated) {
+ n = memPart.write(bytes, offset, length, writeOffset);
+ memWrite += n;
+ if (n == 0) {
+ memPartActivated = false;
+ }
+ } else {
+ n = diskPart.write(writeOffset, bytes, offset, length);
+ diskWrite += n;
+ }
+ offset += n;
+ length -= n;
+ writeOffset += n;
+ }
+ }
+ };
+ dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ block.export(dout);
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (!closed) {
+ dout.close();
+ memPart.deactivateMemWrite();
+ }
+
+ if (memPart.asyncFlusher == null) {
+ assert writeOffset == diskPart.tailOffset;
+ diskPart.closeWrite();
+ ongoingWriter = null;
+ if (debug)
+ logger.debug(MemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+ } else {
+ // the asyncFlusher will call this close() again later
+ }
+ closed = true;
+ }
+ }
+ }
+
+ private static class MemChunk {
+ long diskOffset;
+ int length;
+ byte[] data;
+ MemChunk next;
+
+ boolean isFull() {
+ return length == data.length;
+ }
+
+ long headOffset() {
+ return diskOffset;
+ }
+
+ long tailOffset() {
+ return diskOffset + length;
+ }
+
+ int freeSpace() {
+ return data.length - length;
+ }
+ }
+
+ private class MemPart implements Closeable, MemoryConsumer {
+
+ final MemoryBudgetController budgetCtrl;
+
+ // async flush thread checks this flag out of sync block
+ volatile boolean writeActivated;
+ MemChunk firstChunk;
+ MemChunk lastChunk;
+ int chunkCount;
+
+ Thread asyncFlusher;
+ MemChunk asyncFlushChunk;
+ long asyncFlushDiskOffset;
+ Throwable asyncFlushException;
+
+ MemPart(MemoryBudgetController budgetCtrl) {
+ this.budgetCtrl = budgetCtrl;
+ }
+
+ long headOffset() {
+ return firstChunk == null ? 0 : firstChunk.headOffset();
+ }
+
+ long tailOffset() {
+ return lastChunk == null ? 0 : lastChunk.tailOffset();
+ }
+
+ public MemChunk seekMemChunk(long diskOffset) {
+ MemChunk c = firstChunk;
+ while (c != null && c.headOffset() <= diskOffset) {
+ if (diskOffset < c.tailOffset())
+ break;
+ c = c.next;
+ }
+ return c;
+ }
+
+ public int write(byte[] bytes, int offset, int length, long diskOffset) {
+ int needMoreMem = 0;
+
+ synchronized (lock) {
+ if (writeActivated == false)
+ return 0;
+
+ // write is only expected at the tail
+ if (diskOffset != tailOffset())
+ return 0;
+
+ if (chunkCount == 0 || lastChunk.isFull())
+ needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
+ }
+
+ // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
+ if (needMoreMem > 0) {
+ try {
+ budgetCtrl.reserve(this, needMoreMem);
+ } catch (NotEnoughBudgetException ex) {
+ deactivateMemWrite();
+ return 0;
+ }
+ }
+
+ synchronized (lock) {
+ if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
+ MemChunk chunk = new MemChunk();
+ chunk.diskOffset = diskOffset;
+ chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = chunk;
+ } else {
+ lastChunk.next = chunk;
+ lastChunk = chunk;
+ }
+ chunkCount++;
+ }
+
+ int n = Math.min(lastChunk.freeSpace(), length);
+ System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+ lastChunk.length += n;
+
+ if (n > 0)
+ asyncFlush(lastChunk, diskOffset, n);
+
+ return n;
+ }
+ }
+
+ private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+ if (asyncFlushChunk == null) {
+ asyncFlushChunk = lastChunk;
+ asyncFlushDiskOffset = diskOffset;
+ }
+
+ if (asyncFlusher == null) {
+ asyncFlusher = new Thread() {
+ public void run() {
+ asyncFlushException = null;
+ if (debug)
+ logger.debug(MemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+ try {
+ while (writeActivated) {
+ flushToDisk();
+ Thread.sleep(10);
+ }
+ flushToDisk();
+
+ if (debug)
+ logger.debug(MemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+
+ synchronized (lock) {
+ asyncFlusher = null;
+ asyncFlushChunk = null;
+ if (ongoingWriter.closed) {
+ ongoingWriter.close(); // call writer.close() again to clean up
+ }
+ }
+ } catch (Throwable ex) {
+ asyncFlushException = ex;
+ }
+ }
+ };
+ asyncFlusher.start();
+ }
+ }
+
+ private void flushToDisk() throws IOException {
+ byte[] data;
+ int offset = 0;
+ int length = 0;
+ int flushedLen = 0;
+
+ while (true) {
+ data = null;
+ synchronized (lock) {
+ asyncFlushDiskOffset += flushedLen; // bytes written in last loop
+ // if (debug)
+ // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+ if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+ asyncFlushChunk = asyncFlushChunk.next;
+ }
+ if (asyncFlushChunk != null) {
+ data = asyncFlushChunk.data;
+ offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+ length = asyncFlushChunk.length - offset;
+ }
+ }
+
+ if (data == null)
+ break;
+
+ flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
+ }
+ }
+
+ @Override
+ public int freeUp(int mb) {
+ synchronized (lock) {
+ int mbReleased = 0;
+ while (chunkCount > 0 && mbReleased < mb) {
+ if (firstChunk == asyncFlushChunk)
+ break;
+
+ mbReleased += MEM_CHUNK_SIZE_MB;
+ chunkCount--;
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = null;
+ } else {
+ MemChunk next = firstChunk.next;
+ firstChunk.next = null;
+ firstChunk = next;
+ }
+ }
+ return mbReleased;
+ }
+ }
+
+ public void activateMemWrite() {
+ if (budgetCtrl.getTotalBudgetMB() > 0) {
+ writeActivated = true;
+ if (debug)
+ logger.debug(MemDiskStore.this + " mem write activated");
+ }
+ }
+
+ public void deactivateMemWrite() {
+ writeActivated = false;
+ if (debug)
+ logger.debug(MemDiskStore.this + " mem write de-activated");
+ }
+
+ public void clear() {
+ chunkCount = 0;
+ firstChunk = lastChunk = null;
+ budgetCtrl.reserve(this, 0);
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (asyncFlushException != null)
+ throwAsyncException(asyncFlushException);
+ }
+ try {
+ asyncFlusher.join();
+ } catch (NullPointerException npe) {
+ // that's fine, async flusher may not present
+ } catch (InterruptedException e) {
+ logger.warn("async join interrupted", e);
+ }
+ synchronized (lock) {
+ if (asyncFlushException != null)
+ throwAsyncException(asyncFlushException);
+
+ clear();
+ }
+ }
+
+ private void throwAsyncException(Throwable ex) throws IOException {
+ if (ex instanceof IOException)
+ throw (IOException) ex;
+ else
+ throw new IOException(ex);
+ }
+
+ @Override
+ public String toString() {
+ return MemDiskStore.this.toString();
+ }
+
+ }
+
+ private class DiskPart implements Closeable {
+ final File diskFile;
+ FileChannel writeChannel;
+ FileChannel readChannel;
+ int readerCount = 0; // allow parallel readers
+ long tailOffset;
+
+ DiskPart(File diskFile) throws IOException {
+ this.diskFile = diskFile;
+ this.tailOffset = diskFile.length();
+ if (debug)
+ logger.debug(MemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+ }
+
+ public void openRead() throws IOException {
+ if (readChannel == null) {
+ readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+ }
+ readerCount++;
+ }
+
+ public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ }
+
+ public void closeRead() throws IOException {
+ closeRead(false);
+ }
+
+ private void closeRead(boolean force) throws IOException {
+ readerCount--;
+ if (readerCount == 0 || force) {
+ if (readChannel != null) {
+ readChannel.close();
+ readChannel = null;
+ }
+ }
+ }
+
+ public void openWrite(boolean append) throws IOException {
+ if (append) {
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+ tailOffset = diskFile.length();
+ } else {
+ diskFile.delete();
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ tailOffset = 0;
+ }
+ }
+
+ public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ synchronized (lock) {
+ int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ tailOffset = Math.max(diskOffset + n, tailOffset);
+ return n;
+ }
+ }
+
+ public void closeWrite() throws IOException {
+ if (writeChannel != null) {
+ writeChannel.close();
+ writeChannel = null;
+ }
+ }
+
+ public void clear() throws IOException {
+ diskFile.delete();
+ tailOffset = 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ closeWrite();
+ closeRead(true);
+ if (delOnClose) {
+ diskFile.delete();
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
new file mode 100644
index 0000000..38ccccd
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -0,0 +1,268 @@
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.HLLCAggregator;
+import org.apache.kylin.metadata.measure.LDCAggregator;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class GTAggregateScanner implements IGTScanner {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
+
+ final GTInfo info;
+ final ImmutableBitSet dimensions; // dimensions to return, can be more than group by
+ final ImmutableBitSet groupBy;
+ final ImmutableBitSet metrics;
+ final String[] metricsAggrFuncs;
+ final IGTScanner inputScanner;
+ final AggregationCache aggrCache;
+
+ public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+ if (req.hasAggregation() == false)
+ throw new IllegalStateException();
+
+ this.info = inputScanner.getInfo();
+ this.dimensions = req.getColumns().andNot(req.getAggrMetrics());
+ this.groupBy = req.getAggrGroupBy();
+ this.metrics = req.getAggrMetrics();
+ this.metricsAggrFuncs = req.getAggrMetricsFuncs();
+ this.inputScanner = inputScanner;
+ this.aggrCache = new AggregationCache();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return inputScanner.getScannedRowCount();
+ }
+
+ @Override
+ public int getScannedRowBlockCount() {
+ return inputScanner.getScannedRowBlockCount();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputScanner.close();
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ for (GTRecord r : inputScanner) {
+ aggrCache.aggregate(r);
+ }
+ return aggrCache.iterator();
+ }
+
+ /** return the estimate memory size of aggregation cache */
+ public long getEstimateSizeOfAggrCache() {
+ return aggrCache.esitmateMemSize();
+ }
+
+ public Object[] getTotalSumForSanityCheck() {
+ return aggrCache.calculateTotalSumSanityCheck();
+ }
+
+ class AggregationCache {
+ final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+ final int keyLength;
+ final boolean[] compareMask;
+
+ public AggregationCache() {
+ compareMask = createCompareMask();
+ keyLength = compareMask.length;
+ aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ int result = 0;
+ // profiler shows this check is slow
+ // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
+ for (int i = 0; i < keyLength; ++i) {
+ if (compareMask[i]) {
+ int a = (o1[i] & 0xff);
+ int b = (o2[i] & 0xff);
+ result = a - b;
+ if (result == 0) {
+ continue;
+ } else {
+ return result;
+ }
+ }
+ }
+ return result;
+ }
+ });
+ }
+
+ private boolean[] createCompareMask() {
+ int keyLength = 0;
+ for (int i = 0; i < dimensions.trueBitCount(); i++) {
+ int c = dimensions.trueBitAt(i);
+ int l = info.codeSystem.maxCodeLength(c);
+ keyLength += l;
+ }
+
+ boolean[] mask = new boolean[keyLength];
+ int p = 0;
+ for (int i = 0; i < dimensions.trueBitCount(); i++) {
+ int c = dimensions.trueBitAt(i);
+ int l = info.codeSystem.maxCodeLength(c);
+ boolean m = groupBy.get(c) ? true : false;
+ for (int j = 0; j < l; j++) {
+ mask[p++] = m;
+ }
+ }
+ return mask;
+ }
+
+ private byte[] createKey(GTRecord record) {
+ byte[] result = new byte[keyLength];
+ int offset = 0;
+ for (int i = 0; i < dimensions.trueBitCount(); i++) {
+ int c = dimensions.trueBitAt(i);
+ final ByteArray byteArray = record.cols[c];
+ final int columnLength = info.codeSystem.maxCodeLength(c);
+ System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, byteArray.length());
+ offset += columnLength;
+ }
+ assert offset == result.length;
+ return result;
+ }
+
+ void aggregate(GTRecord r) {
+ final byte[] key = createKey(r);
+ MeasureAggregator[] aggrs = aggBufMap.get(key);
+ if (aggrs == null) {
+ aggrs = newAggregators();
+ aggBufMap.put(key, aggrs);
+ }
+ for (int i = 0; i < aggrs.length; i++) {
+ int col = metrics.trueBitAt(i);
+ Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
+ aggrs[i].aggregate(metrics);
+ }
+ }
+
+ private MeasureAggregator[] newAggregators() {
+ return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
+ }
+
+ public Object[] calculateTotalSumSanityCheck() {
+ MeasureAggregator[] totalSum = newAggregators();
+
+ // skip expensive aggregation
+ for (int i = 0; i < totalSum.length; i++) {
+ if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator)
+ totalSum[i] = null;
+ }
+
+ for (MeasureAggregator[] entry : aggBufMap.values()) {
+ for (int i = 0; i < totalSum.length; i++) {
+ if (totalSum[i] != null)
+ totalSum[i].aggregate(entry[i].getState());
+ }
+ }
+ Object[] result = new Object[totalSum.length];
+ for (int i = 0; i < totalSum.length; i++) {
+ if (totalSum[i] != null)
+ result[i] = totalSum[i].getState();
+ }
+ return result;
+ }
+
+ public long esitmateMemSize() {
+ if (aggBufMap.isEmpty())
+ return 0;
+
+ byte[] sampleKey = aggBufMap.firstKey();
+ MeasureAggregator<?>[] sampleValue = aggBufMap.get(sampleKey);
+ return estimateSizeOfAggrCache(sampleKey, sampleValue, aggBufMap.size());
+ }
+
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+
+ final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+ final GTRecord secondRecord = new GTRecord(info);
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public GTRecord next() {
+ Entry<byte[], MeasureAggregator[]> entry = it.next();
+ create(entry.getKey(), entry.getValue());
+ return secondRecord;
+ }
+
+ private void create(byte[] key, MeasureAggregator[] value) {
+ int offset = 0;
+ for (int i = 0; i < dimensions.trueBitCount(); i++) {
+ int c = dimensions.trueBitAt(i);
+ final int columnLength = info.codeSystem.maxCodeLength(c);
+ secondRecord.set(c, new ByteArray(key, offset, columnLength));
+ offset += columnLength;
+ }
+ metricsBuf.clear();
+ for (int i = 0; i < value.length; i++) {
+ int col = metrics.trueBitAt(i);
+ int pos = metricsBuf.position();
+ info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
+ secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
+ public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
+ // Aggregation cache is basically a tree map. The tree map entry overhead is
+ // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
+ // - 41~52 according to AggregationCacheMemSizeTest
+ return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
+ }
+
+ public static long estimateSizeOf(MeasureAggregator[] aggrs) {
+ // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
+ // Memory alignment to 8 bytes
+ long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
+ for (MeasureAggregator aggr : aggrs) {
+ if (aggr != null)
+ est += aggr.getMemBytesEstimate();
+ }
+ return est;
+ }
+
+ public static long estimateSizeOf(byte[] bytes) {
+ // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
+ // Memory alignment to 8 bytes
+ return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
new file mode 100644
index 0000000..7308b5c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
@@ -0,0 +1,74 @@
+package org.apache.kylin.gridtable;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+
+import org.apache.kylin.gridtable.IGTStore.IGTStoreWriter;
+
+public class GTBuilder implements Closeable, Flushable {
+
+ @SuppressWarnings("unused")
+ final private GTInfo info;
+ final private IGTStoreWriter storeWriter;
+
+ final private GTRowBlock block;
+ final private GTRowBlock.Writer blockWriter;
+
+ private int writtenRowCount;
+ private int writtenRowBlockCount;
+
+ GTBuilder(GTInfo info, int shard, IGTStore store) throws IOException {
+ this(info, shard, store, false);
+ }
+
+ GTBuilder(GTInfo info, int shard, IGTStore store, boolean append) throws IOException {
+ this.info = info;
+
+ block = GTRowBlock.allocate(info);
+ blockWriter = block.getWriter();
+ if (append) {
+ storeWriter = store.append(shard, blockWriter);
+ if (block.isFull()) {
+ blockWriter.clearForNext();
+ }
+ } else {
+ storeWriter = store.rebuild(shard);
+ }
+ }
+
+ public void write(GTRecord r) throws IOException {
+ blockWriter.append(r);
+ writtenRowCount++;
+
+ if (block.isFull()) {
+ flush();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ blockWriter.readyForFlush();
+ storeWriter.write(block);
+ writtenRowBlockCount++;
+ if (block.isFull()) {
+ blockWriter.clearForNext();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (block.isEmpty() == false) {
+ flush();
+ }
+ storeWriter.close();
+ }
+
+ public int getWrittenRowCount() {
+ return writtenRowCount;
+ }
+
+ public int getWrittenRowBlockCount() {
+ return writtenRowBlockCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
new file mode 100644
index 0000000..21fb167
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -0,0 +1,100 @@
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+
+public class GTFilterScanner implements IGTScanner {
+
+ final private IGTScanner inputScanner;
+ final private TupleFilter filter;
+ final private IEvaluatableTuple oneTuple; // avoid instance creation
+
+ private GTRecord next = null;
+
+ public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
+ this.inputScanner = inputScanner;
+ this.filter = req.getFilterPushDown();
+ this.oneTuple = new IEvaluatableTuple() {
+ @Override
+ public Object getValue(TblColRef col) {
+ return next.get(col.getColumnDesc().getZeroBasedIndex());
+ }
+ };
+
+ if (TupleFilter.isEvaluableRecursively(filter) == false)
+ throw new IllegalArgumentException();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return inputScanner.getInfo();
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return inputScanner.getScannedRowCount();
+ }
+
+ @Override
+ public int getScannedRowBlockCount() {
+ return inputScanner.getScannedRowBlockCount();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputScanner.close();
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ private Iterator<GTRecord> inputIterator = inputScanner.iterator();
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ IFilterCodeSystem<ByteArray> filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
+
+ while (inputIterator.hasNext()) {
+ next = inputIterator.next();
+ if (filter != null && filter.evaluate(oneTuple, filterCodeSystem) == false) {
+ continue;
+ }
+ return true;
+ }
+ next = null;
+ return false;
+ }
+
+ @Override
+ public GTRecord next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ GTRecord result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
new file mode 100644
index 0000000..87d7811
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -0,0 +1,246 @@
+package org.apache.kylin.gridtable;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class GTInfo {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ String tableName;
+ IGTCodeSystem codeSystem;
+
+ // column schema
+ int nColumns;
+ DataType[] colTypes;
+ ImmutableBitSet colAll;
+ ImmutableBitSet colPreferIndex;
+ transient TblColRef[] colRefs;
+
+ // grid info
+ ImmutableBitSet primaryKey; // order by, uniqueness is not required
+ ImmutableBitSet[] colBlocks; // primary key must be the first column block
+ ImmutableBitSet colBlocksAll;
+ int rowBlockSize; // 0: disable row block
+
+ // sharding
+ int nShards; // 0: no sharding
+
+ // must create from builder
+ private GTInfo() {
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public IGTCodeSystem getCodeSystem() {
+ return codeSystem;
+ }
+
+ public int getColumnCount() {
+ return nColumns;
+ }
+
+ public DataType getColumnType(int i) {
+ return colTypes[i];
+ }
+
+ public ImmutableBitSet getPrimaryKey() {
+ return primaryKey;
+ }
+
+ public boolean isShardingEnabled() {
+ return nShards > 0;
+ }
+
+ public boolean isRowBlockEnabled() {
+ return rowBlockSize > 0;
+ }
+
+ public int getRowBlockSize() {
+ return rowBlockSize;
+ }
+
+ public int getMaxRecordLength() {
+ return getMaxColumnLength(colAll);
+ }
+
+ public int getMaxColumnLength(ImmutableBitSet selectedCols) {
+ int result = 0;
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ result += codeSystem.maxCodeLength(c);
+ }
+ return result;
+ }
+
+ public int getMaxColumnLength() {
+ int max = 0;
+ for (int i = 0; i < nColumns; i++)
+ max = Math.max(max, codeSystem.maxCodeLength(i));
+ return max;
+ }
+
+ public ImmutableBitSet selectColumnBlocks(ImmutableBitSet columns) {
+ if (columns == null)
+ columns = colAll;
+
+ BitSet result = new BitSet();
+ for (int i = 0; i < colBlocks.length; i++) {
+ ImmutableBitSet cb = colBlocks[i];
+ if (cb.intersects(columns)) {
+ result.set(i);
+ }
+ }
+ return new ImmutableBitSet(result);
+ }
+
+ public TblColRef colRef(int i) {
+ if (colRefs == null) {
+ colRefs = new TblColRef[nColumns];
+ }
+ if (colRefs[i] == null) {
+ colRefs[i] = GTUtil.tblColRef(i, colTypes[i].toString());
+ }
+ return colRefs[i];
+ }
+
+ public void validateColRef(TblColRef ref) {
+ TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex());
+ if (expected.equals(ref) == false)
+ throw new IllegalArgumentException();
+ }
+
+ void validate() {
+
+ if (codeSystem == null)
+ throw new IllegalStateException();
+
+ if (primaryKey.cardinality() == 0)
+ throw new IllegalStateException();
+
+ codeSystem.init(this);
+
+ validateColumnBlocks();
+ }
+
+ private void validateColumnBlocks() {
+ colAll = new ImmutableBitSet(0, nColumns);
+
+ if (colBlocks == null) {
+ colBlocks = new ImmutableBitSet[2];
+ colBlocks[0] = primaryKey;
+ colBlocks[1] = colAll.andNot(primaryKey);
+ }
+
+ colBlocksAll = new ImmutableBitSet(0, colBlocks.length);
+
+ if (colPreferIndex == null)
+ colPreferIndex = ImmutableBitSet.EMPTY;
+
+ // column blocks must not overlap
+ for (int i = 0; i < colBlocks.length; i++) {
+ for (int j = i + 1; j < colBlocks.length; j++) {
+ if (colBlocks[i].intersects(colBlocks[j]))
+ throw new IllegalStateException();
+ }
+ }
+
+ // column block must cover all columns
+ ImmutableBitSet merge = ImmutableBitSet.EMPTY;
+ for (int i = 0; i < colBlocks.length; i++) {
+ merge = merge.or(colBlocks[i]);
+ }
+ if (merge.equals(colAll) == false)
+ throw new IllegalStateException();
+
+ // primary key must be the first column block
+ if (primaryKey.equals(colBlocks[0]) == false)
+ throw new IllegalStateException();
+
+ // drop empty column block
+ LinkedList<ImmutableBitSet> list = new LinkedList<ImmutableBitSet>(Arrays.asList(colBlocks));
+ Iterator<ImmutableBitSet> it = list.iterator();
+ while (it.hasNext()) {
+ ImmutableBitSet cb = it.next();
+ if (cb.isEmpty())
+ it.remove();
+ }
+ colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]);
+ }
+
+ public static class Builder {
+ final GTInfo info;
+
+ private Builder() {
+ this.info = new GTInfo();
+ }
+
+ /** optional */
+ public Builder setTableName(String name) {
+ info.tableName = name;
+ return this;
+ }
+
+ /** required */
+ public Builder setCodeSystem(IGTCodeSystem cs) {
+ info.codeSystem = cs;
+ return this;
+ }
+
+ /** required */
+ public Builder setColumns(DataType... colTypes) {
+ info.nColumns = colTypes.length;
+ info.colTypes = colTypes;
+ return this;
+ }
+
+ /** required */
+ public Builder setPrimaryKey(ImmutableBitSet primaryKey) {
+ info.primaryKey = primaryKey;
+ return this;
+ }
+
+ /** optional */
+ public Builder enableColumnBlock(ImmutableBitSet[] columnBlocks) {
+ info.colBlocks = new ImmutableBitSet[columnBlocks.length];
+ for (int i = 0; i < columnBlocks.length; i++) {
+ info.colBlocks[i] = columnBlocks[i];
+ }
+ return this;
+ }
+
+ /** optional */
+ public Builder enableRowBlock(int rowBlockSize) {
+ info.rowBlockSize = rowBlockSize;
+ return this;
+ }
+
+ /** optional */
+ public Builder enableSharding(int nShards) {
+ info.nShards = nShards;
+ return this;
+ }
+
+ /** optional */
+ public Builder setColumnPreferIndex(ImmutableBitSet colPreferIndex) {
+ info.colPreferIndex = colPreferIndex;
+ return this;
+ }
+
+ public GTInfo build() {
+ info.validate();
+ return info;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndex.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndex.java
new file mode 100644
index 0000000..4f99108
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndex.java
@@ -0,0 +1,205 @@
+package org.apache.kylin.gridtable;
+
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+
+/**
+ * A thread-safe inverted index of row blocks in memory.
+ *
+ * Note function not() must return all blocks, because index only know what block contains a value,
+ * but not sure what block does not contain a value.
+ *
+ * @author yangli9
+ */
+public class GTInvertedIndex {
+
+ private final GTInfo info;
+ private final ImmutableBitSet colPreferIndex;
+ private final ImmutableBitSet colBlocks;
+ private final GTInvertedIndexOfColumn[] index; // for each column
+
+ private volatile int nIndexedBlocks;
+
+ public GTInvertedIndex(GTInfo info) {
+ this.info = info;
+ this.colPreferIndex = info.colPreferIndex;
+ this.colBlocks = info.selectColumnBlocks(colPreferIndex);
+
+ index = new GTInvertedIndexOfColumn[info.getColumnCount()];
+ for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
+ int c = colPreferIndex.trueBitAt(i);
+ index[c] = new GTInvertedIndexOfColumn(info.codeSystem.getComparator());
+ }
+ }
+
+ public void add(GTRowBlock block) {
+
+ @SuppressWarnings("unchecked")
+ Set<ByteArray>[] distinctValues = new Set[info.getColumnCount()];
+ for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
+ int c = colPreferIndex.trueBitAt(i);
+ distinctValues[c] = new HashSet<ByteArray>();
+ }
+
+ GTRowBlock.Reader reader = block.getReader(colBlocks);
+ GTRecord record = new GTRecord(info);
+ while (reader.hasNext()) {
+ reader.fetchNext(record);
+ for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
+ int c = colPreferIndex.trueBitAt(i);
+ distinctValues[c].add(record.get(c));
+ }
+ }
+
+ for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
+ int c = colPreferIndex.trueBitAt(i);
+ index[c].add(distinctValues[c], block.getSequenceId());
+ }
+
+ nIndexedBlocks = Math.max(nIndexedBlocks, block.seqId + 1);
+ }
+
+ public ConciseSet filter(TupleFilter filter) {
+ return filter(filter, nIndexedBlocks);
+ }
+
+ public ConciseSet filter(TupleFilter filter, int totalBlocks) {
+ // number of indexed blocks may increase as we do evaluation
+ int indexedBlocks = nIndexedBlocks;
+
+ Evaluator evaluator = new Evaluator(indexedBlocks);
+ ConciseSet r = evaluator.evaluate(filter);
+
+ // add blocks that have not been indexed
+ for (int i = indexedBlocks; i < totalBlocks; i++) {
+ r.add(i);
+ }
+
+ return r;
+ }
+
+ private class Evaluator {
+ private int indexedBlocks;
+
+ Evaluator(int indexedBlocks) {
+ this.indexedBlocks = indexedBlocks;
+ }
+
+ public ConciseSet evaluate(TupleFilter filter) {
+ if (filter == null) {
+ return all();
+ }
+
+ if (filter instanceof LogicalTupleFilter)
+ return evalLogical((LogicalTupleFilter) filter);
+
+ if (filter instanceof CompareTupleFilter)
+ return evalCompare((CompareTupleFilter) filter);
+
+ // unable to evaluate
+ return all();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ConciseSet evalCompare(CompareTupleFilter filter) {
+ int col = col(filter);
+ if (index[col] == null)
+ return all();
+
+ switch (filter.getOperator()) {
+ case ISNULL:
+ return index[col].getNull();
+ case ISNOTNULL:
+ return all();
+ case EQ:
+ return index[col].getEquals((ByteArray) filter.getFirstValue());
+ case NEQ:
+ return all();
+ case IN:
+ return index[col].getIn((Iterable<ByteArray>) filter.getValues());
+ case NOTIN:
+ return all();
+ case LT:
+ return index[col].getRange(null, false, (ByteArray) filter.getFirstValue(), false);
+ case LTE:
+ return index[col].getRange(null, false, (ByteArray) filter.getFirstValue(), true);
+ case GT:
+ return index[col].getRange((ByteArray) filter.getFirstValue(), false, null, false);
+ case GTE:
+ return index[col].getRange((ByteArray) filter.getFirstValue(), true, null, false);
+ default:
+ throw new IllegalStateException("Unsupported operator " + filter.getOperator());
+ }
+ }
+
+ private ConciseSet evalLogical(LogicalTupleFilter filter) {
+ List<? extends TupleFilter> children = filter.getChildren();
+
+ switch (filter.getOperator()) {
+ case AND:
+ return evalLogicalAnd(children);
+ case OR:
+ return evalLogicalOr(children);
+ case NOT:
+ return evalLogicalNot(children);
+ default:
+ throw new IllegalStateException("Unsupported operator " + filter.getOperator());
+ }
+ }
+
+ private ConciseSet evalLogicalAnd(List<? extends TupleFilter> children) {
+ ConciseSet set = all();
+
+ for (TupleFilter c : children) {
+ ConciseSet t = evaluate(c);
+ if (t == null)
+ continue; // because it's AND
+
+ set.retainAll(t);
+ }
+ return set;
+ }
+
+ private ConciseSet evalLogicalOr(List<? extends TupleFilter> children) {
+ ConciseSet set = new ConciseSet();
+
+ for (TupleFilter c : children) {
+ ConciseSet t = evaluate(c);
+ if (t == null)
+ return null; // because it's OR
+
+ set.addAll(t);
+ }
+ return set;
+ }
+
+ private ConciseSet evalLogicalNot(List<? extends TupleFilter> children) {
+ return all();
+ }
+
+ private ConciseSet all() {
+ return not(new ConciseSet());
+ }
+
+ private ConciseSet not(ConciseSet set) {
+ set.add(indexedBlocks);
+ set.complement();
+ return set;
+ }
+
+ private int col(CompareTupleFilter filter) {
+ return filter.getColumn().getColumnDesc().getZeroBasedIndex();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndexOfColumn.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndexOfColumn.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndexOfColumn.java
new file mode 100644
index 0000000..4b094fd
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndexOfColumn.java
@@ -0,0 +1,115 @@
+package org.apache.kylin.gridtable;
+
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.util.NavigableMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.kylin.common.util.ByteArray;
+
+import com.google.common.collect.Maps;
+
+public class GTInvertedIndexOfColumn {
+
+ final private IGTComparator comparator;
+ final private ReentrantReadWriteLock rwLock;
+
+ private int nBlocks;
+ private NavigableMap<ByteArray, ConciseSet> rangeIndex;
+ private ConciseSet nullIndex;
+
+ public GTInvertedIndexOfColumn(IGTComparator comparator) {
+ this.comparator = comparator;
+ this.rwLock = new ReentrantReadWriteLock();
+ this.rangeIndex = Maps.newTreeMap(comparator);
+ this.nullIndex = new ConciseSet();
+ }
+
+ public void add(Iterable<ByteArray> codes, int blockId) {
+ rwLock.writeLock().lock();
+ try {
+ for (ByteArray code : codes) {
+ if (comparator.isNull(code)) {
+ nullIndex.add(blockId);
+ continue;
+ }
+ ConciseSet set = rangeIndex.get(code);
+ if (set == null) {
+ set = new ConciseSet();
+ rangeIndex.put(code.copy(), set);
+ }
+ set.add(blockId);
+ }
+
+ if (blockId >= nBlocks) {
+ nBlocks = blockId + 1;
+ }
+
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ public ConciseSet getNull() {
+ rwLock.readLock().lock();
+ try {
+ return nullIndex.clone();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public ConciseSet getEquals(ByteArray code) {
+ rwLock.readLock().lock();
+ try {
+ ConciseSet set = rangeIndex.get(code);
+ if (set == null)
+ return new ConciseSet();
+ else
+ return set.clone();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public ConciseSet getIn(Iterable<ByteArray> codes) {
+ rwLock.readLock().lock();
+ try {
+ ConciseSet r = new ConciseSet();
+ for (ByteArray code : codes) {
+ ConciseSet set = rangeIndex.get(code);
+ if (set != null)
+ r.addAll(set);
+ }
+ return r;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public ConciseSet getRange(ByteArray from, boolean fromInclusive, ByteArray to, boolean toInclusive) {
+ rwLock.readLock().lock();
+ try {
+ ConciseSet r = new ConciseSet();
+ if (from == null && to == null) {
+ r.add(nBlocks);
+ r.complement();
+ return r;
+ }
+ NavigableMap<ByteArray, ConciseSet> subMap;
+ if (from == null) {
+ subMap = rangeIndex.headMap(to, toInclusive);
+ } else if (to == null) {
+ subMap = rangeIndex.tailMap(from, fromInclusive);
+ } else {
+ subMap = rangeIndex.subMap(from, fromInclusive, to, toInclusive);
+ }
+ for (ConciseSet set : subMap.values()) {
+ r.addAll(set);
+ }
+ return r;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+}