You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/01 05:28:44 UTC

incubator-kylin git commit: KYLIN-802, push missing files

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 8a80c17b9 -> 1b0c7f171


KYLIN-802, push missing files


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1b0c7f17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1b0c7f17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1b0c7f17

Branch: refs/heads/0.8.0
Commit: 1b0c7f1713e342dc9da9ebb2ff257b439ec9e302
Parents: 8a80c17
Author: Yang Li <li...@apache.org>
Authored: Mon Jun 1 11:28:34 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Jun 1 11:28:34 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/inmemcubing/ICuboidWriter.java    |  11 +
 .../kylin/job/inmemcubing/InMemCubeBuilder.java | 728 +++++++++++++++++++
 2 files changed, 739 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b0c7f17/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
new file mode 100644
index 0000000..c05f217
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
@@ -0,0 +1,11 @@
+package org.apache.kylin.job.inmemcubing;
+
+import org.apache.kylin.storage.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public interface ICuboidWriter {
+    void write(long cuboidId, GTRecord record) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b0c7f17/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
new file mode 100644
index 0000000..5baec0f
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -0,0 +1,728 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CubeGridTable;
+import org.apache.kylin.storage.gridtable.GTAggregateScanner;
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCubeBuilder implements Runnable {
+
+    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
+    private static final LongWritable ONE = new LongWritable(1l);
+
+    private final BlockingQueue<List<String>> inputQueue;
+    private final ICuboidWriter outputWriter;
+
+    private final CubeDesc cubeDesc;
+    private final long baseCuboidId;
+    private final CuboidScheduler cuboidScheduler;
+    private final Map<TblColRef, Dictionary<?>> dictionaryMap;
+    private final CubeJoinedFlatTableDesc intermediateTableDesc;
+    private final MeasureCodec measureCodec;
+    private final String[] metricsAggrFuncs;
+    private final Map<Integer, Integer> dependentMeasures; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
+    private final int[] hbaseMeasureRefIndex;
+    private final MeasureDesc[] measureDescs;
+    private final int measureCount;
+
+    private MemoryBudgetController memBudget;
+    private int taskThreadCount = 4;
+    private Thread[] taskThreads;
+    private Throwable[] taskThreadExceptions;
+    private TreeSet<CuboidTask> taskPending;
+    private AtomicInteger taskCuboidCompleted;
+    private CuboidResult baseResult;
+
+    private TreeMap<Long, CuboidResult> outputPending;
+    private Thread outputThread;
+    private Throwable outputThreadException;
+    private int outputCuboidExpected;
+
+    public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap, ICuboidWriter gtRecordWriter) {
+        if (dictionaryMap == null || dictionaryMap.isEmpty()) {
+            throw new IllegalArgumentException("dictionary cannot be empty");
+        }
+        this.inputQueue = queue;
+        this.cubeDesc = cubeDesc;
+        this.cuboidScheduler = new CuboidScheduler(cubeDesc);
+        this.dictionaryMap = dictionaryMap;
+        this.outputWriter = gtRecordWriter;
+        this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+
+        Map<String, Integer> measureIndexMap = Maps.newHashMap();
+        List<String> metricsAggrFuncsList = Lists.newArrayList();
+        measureCount = cubeDesc.getMeasures().size();
+
+        List<MeasureDesc> measureDescsList = Lists.newArrayList();
+        hbaseMeasureRefIndex = new int[measureCount];
+        int measureRef = 0;
+        for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+                    for (int j = 0; j < measureCount; j++) {
+                        if (cubeDesc.getMeasures().get(j).equals(measure)) {
+                            measureDescsList.add(measure);
+                            hbaseMeasureRefIndex[measureRef] = j;
+                            break;
+                        }
+                    }
+                    measureRef++;
+                }
+            }
+        }
+
+        for (int i = 0; i < measureCount; i++) {
+            MeasureDesc measureDesc = measureDescsList.get(i);
+            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+            measureIndexMap.put(measureDesc.getName(), i);
+        }
+        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+
+        this.dependentMeasures = Maps.newHashMap();
+        for (int i = 0; i < measureCount; i++) {
+            String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
+            if (depMsrRef != null) {
+                int index = measureIndexMap.get(depMsrRef);
+                dependentMeasures.put(i, index);
+            }
+        }
+
+        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+    }
+
+    public void setConcurrentThreads(int n) {
+        this.taskThreadCount = n;
+    }
+
+    private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
+        GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
+        // could use a real budget controller, but experiment shows write directly to disk is a few ms faster
+        GTMemDiskStore store = new GTMemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+        GridTable gridTable = new GridTable(info, store);
+        return gridTable;
+    }
+
+    private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
+        BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
+        BitSet dimension = new BitSet();
+        dimension.set(0, bitSet.cardinality());
+        BitSet metrics = new BitSet();
+        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
+        return new Pair<BitSet, BitSet>(dimension, metrics);
+    }
+
+    @Override
+    public void run() {
+        try {
+            build();
+        } catch (IOException e) {
+            logger.error("Fail to build cube", e);
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public void build() throws IOException {
+        long startTime = System.currentTimeMillis();
+        logger.info("In Mem Cube Build start, " + cubeDesc.getName());
+
+        // multiple threads to compute cuboid in parallel
+        taskPending = new TreeSet<CuboidTask>();
+        taskCuboidCompleted = new AtomicInteger(0);
+        taskThreads = prepareTaskThreads();
+        taskThreadExceptions = new Throwable[taskThreadCount];
+
+        // output goes in a separate thread to leverage any async-ness
+        outputPending = prepareOutputPending();
+        outputCuboidExpected = outputPending.size();
+        outputThread = prepareOutputThread();
+        outputThreadException = null;
+
+        // build base cuboid
+        baseResult = createBaseCuboid();
+        taskCuboidCompleted.incrementAndGet();
+        if (baseResult.nRows == 0)
+            return;
+
+        // plan memory budget
+        makeMemoryBudget();
+
+        // kick off N-D cuboid tasks and output
+        addChildTasks(baseResult);
+        start(taskThreads);
+        start(outputThread);
+
+        // wait complete
+        join(taskThreads);
+        join(outputThread);
+
+        long endTime = System.currentTimeMillis();
+        logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
+
+        throwExceptionIfAny();
+    }
+
+    private void start(Thread... threads) {
+        for (Thread t : threads)
+            t.start();
+    }
+
+    private void join(Thread... threads) throws IOException {
+        try {
+            for (Thread t : threads)
+                t.join();
+        } catch (InterruptedException e) {
+            throw new IOException("interrupted while waiting task and output complete", e);
+        }
+    }
+
+    private void throwExceptionIfAny() throws IOException {
+        ArrayList<Throwable> errors = new ArrayList<Throwable>();
+        for (int i = 0; i < taskThreadCount; i++) {
+            Throwable t = taskThreadExceptions[i];
+            if (t != null)
+                errors.add(t);
+        }
+        if (outputThreadException != null) {
+            errors.add(outputThreadException);
+        }
+        if (errors.isEmpty()) {
+            return;
+        } else if (errors.size() == 1) {
+            Throwable t = errors.get(0);
+            if (t instanceof IOException)
+                throw (IOException) t;
+            else
+                throw new IOException(t);
+        } else {
+            for (Throwable t : errors)
+                logger.error("Exception during in-mem cube build", t);
+            throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
+        }
+    }
+
+    private Thread[] prepareTaskThreads() {
+        Thread[] result = new Thread[taskThreadCount];
+        for (int i = 0; i < taskThreadCount; i++) {
+            result[i] = new CuboidTaskThread(i);
+        }
+        return result;
+    }
+
+    private class CuboidTaskThread extends Thread {
+        private int id;
+
+        CuboidTaskThread(int id) {
+            super("CuboidTask-" + id);
+            this.id = id;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (taskCuboidCompleted.get() < outputCuboidExpected) {
+                    CuboidTask task = null;
+                    synchronized (taskPending) {
+                        while (task == null) {
+                            task = taskPending.pollFirst();
+                            if (task == null)
+                                taskPending.wait();
+                        }
+                    }
+
+                    CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
+                    addChildTasks(newCuboid);
+                    taskCuboidCompleted.incrementAndGet();
+
+                    if (taskCuboidCompleted.get() == outputCuboidExpected) {
+                        for (Thread t : taskThreads) {
+                            if (t != Thread.currentThread())
+                                t.interrupt();
+                        }
+                    }
+                }
+            } catch (Throwable ex) {
+                if (taskCuboidCompleted.get() < outputCuboidExpected)
+                    taskThreadExceptions[id] = ex;
+            }
+        }
+    }
+
+    private void addChildTasks(CuboidResult parent) {
+        List<Long> children = cuboidScheduler.getSpanningCuboid(parent.cuboidId);
+        if (!children.isEmpty()) {
+            synchronized (taskPending) {
+                for (Long child : children) {
+                    taskPending.add(new CuboidTask(parent, child));
+                }
+                taskPending.notifyAll();
+            }
+        }
+    }
+
+    private TreeMap<Long, CuboidResult> prepareOutputPending() {
+        TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
+        prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
+        return result;
+    }
+
+    private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, CuboidResult> result) {
+        result.put(cuboidId, new CuboidResult(cuboidId, null, 0, 0, 0));
+        for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
+            prepareOutputPendingRecursive(child, result);
+        }
+    }
+
+    private Thread prepareOutputThread() {
+        return new Thread("CuboidOutput") {
+            public void run() {
+                try {
+                    while (!outputPending.isEmpty()) {
+                        CuboidResult result = outputPending.firstEntry().getValue();
+                        synchronized (result) {
+                            while (result.table == null) {
+                                try {
+                                    result.wait();
+                                } catch (InterruptedException e) {
+                                    logger.error("interrupted", e);
+                                }
+                            }
+                        }
+                        outputCuboid(result.cuboidId, result.table);
+                        outputPending.remove(result.cuboidId);
+                    }
+                } catch (Throwable ex) {
+                    outputThreadException = ex;
+                }
+            }
+        };
+    }
+
+    private int getSystemAvailMB() {
+        Runtime.getRuntime().gc();
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            logger.error("", e);
+        }
+        return MemoryBudgetController.getSystemAvailMB();
+    }
+
+    private void makeMemoryBudget() {
+        int systemAvailMB = getSystemAvailMB();
+        logger.info("System avail " + systemAvailMB + " MB");
+        int reserve = Math.min(100, baseResult.aggrCacheMB / 3);
+        logger.info("Reserve " + reserve + " MB for system basics");
+
+        int budget = systemAvailMB - reserve;
+        if (budget < baseResult.aggrCacheMB) {
+            // make sure we have base aggr cache as minimal
+            budget = baseResult.aggrCacheMB;
+            logger.warn("!!! System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
+        }
+
+        logger.info("Memory Budget is " + budget + " MB");
+        if (budget > 0) {
+            memBudget = new MemoryBudgetController(budget);
+        }
+    }
+
+    private CuboidResult createBaseCuboid() throws IOException {
+        GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
+        GTBuilder baseBuilder = baseCuboid.rebuild();
+        IGTScanner baseInput = new InputConverter(baseCuboid.getInfo());
+
+        int mbBefore = getSystemAvailMB();
+        int mbAfter = 0;
+
+        Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
+        GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
+        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+
+        long startTime = System.currentTimeMillis();
+        logger.info("Calculating cuboid " + baseCuboidId);
+
+        int count = 0;
+        for (GTRecord r : aggregationScanner) {
+            if (mbAfter == 0) {
+                mbAfter = getSystemAvailMB();
+            }
+            baseBuilder.write(r);
+            count++;
+        }
+        aggregationScanner.close();
+        baseBuilder.close();
+
+        long timeSpent = System.currentTimeMillis() - startTime;
+        logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
+
+        int mbBaseAggrCacheOnHeap = mbBefore - mbAfter;
+        int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
+        int mbBaseAggrCache = Math.max((int) (mbBaseAggrCacheOnHeap * 1.1), mbEstimateBaseAggrCache);
+        logger.info("Base aggr cache is " + mbBaseAggrCache + " MB (heap " + mbBaseAggrCacheOnHeap + " MB, estimate " + mbEstimateBaseAggrCache + " MB)");
+
+        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache);
+    }
+
+    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int mbBaseAggrCache) {
+        CuboidResult result = outputPending.get(cuboidId);
+        result.table = table;
+        result.nRows = nRows;
+        result.timeSpent = timeSpent;
+        result.aggrCacheMB = mbBaseAggrCache;
+        if (result.aggrCacheMB <= 0) {
+            result.aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
+        }
+        synchronized (result) {
+            result.notify();
+        }
+        return result;
+    }
+
+    private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException {
+        final String consumerName = "AggrCache@Cuboid " + cuboidId;
+        MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
+            @Override
+            public int freeUp(int mb) {
+                return 0; // cannot free up
+            }
+
+            @Override
+            public String toString() {
+                return consumerName;
+            }
+        };
+
+        // reserve memory for aggregation cache, can't be larger than the parent
+        memBudget.reserveInsist(consumer, parent.aggrCacheMB);
+        try {
+            return aggregateCuboid(parent, cuboidId);
+        } finally {
+            memBudget.reserve(consumer, 0);
+        }
+    }
+
+    private CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
+        Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parent.cuboidId);
+        BitSet parentDimensions = columnBitSets.getFirst();
+        BitSet measureColumns = columnBitSets.getSecond();
+        BitSet childDimensions = (BitSet) parentDimensions.clone();
+
+        long mask = Long.highestOneBit(parent.cuboidId);
+        long childCuboidId = cuboidId;
+        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parent.cuboidId);
+        int index = 0;
+        for (int i = 0; i < parentCuboidIdActualLength; i++) {
+            if ((mask & parent.cuboidId) > 0) {
+                if ((mask & childCuboidId) == 0) {
+                    // this dim will be aggregated
+                    childDimensions.set(index, false);
+                }
+                index++;
+            }
+            mask = mask >> 1;
+        }
+
+        return scanAndAggregateGridTable(parent.table, cuboidId, childDimensions, measureColumns);
+    }
+
+    private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
+        long startTime = System.currentTimeMillis();
+        logger.info("Calculating cuboid " + cuboidId);
+
+        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+        IGTScanner scanner = gridTable.scan(req);
+        GridTable newGridTable = newGridTableByCuboidID(cuboidId);
+        GTBuilder builder = newGridTable.rebuild();
+
+        BitSet allNeededColumns = new BitSet();
+        allNeededColumns.or(aggregationColumns);
+        allNeededColumns.or(measureColumns);
+
+        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+        int count = 0;
+        ByteArray byteArray = new ByteArray(8);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+        try {
+            BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
+            for (Integer i : dependentMeasures.keySet()) {
+                dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
+            }
+
+            Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
+
+            for (GTRecord record : scanner) {
+                count++;
+                for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
+                    newRecord.set(index, record.get(i));
+                }
+
+                if (dependentMeasures.size() > 0) {
+                    // update measures which have 'dependent_measure_ref'
+                    newRecord.getValues(dependentMetrics, hllObjects);
+
+                    for (Integer i : dependentMeasures.keySet()) {
+                        for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
+                            if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
+                                assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
+
+                                byteBuffer.clear();
+                                BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
+                                byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
+                                newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
+                            }
+                        }
+
+                    }
+                }
+
+                builder.write(newRecord);
+            }
+        } finally {
+            scanner.close();
+            builder.close();
+        }
+
+        long timeSpent = System.currentTimeMillis() - startTime;
+        logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
+
+        return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
+    }
+
+    private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
+        long startTime = System.currentTimeMillis();
+        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+        IGTScanner scanner = gridTable.scan(req);
+        for (GTRecord record : scanner) {
+            this.outputWriter.write(cuboidId, record);
+        }
+        scanner.close();
+        logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+
+        closeStore(gridTable);
+    }
+
+    private void closeStore(GridTable gt) throws IOException {
+        ((GTMemDiskStore) gt.getStore()).close();
+    }
+
+    // ===========================================================================
+
+    private static class CuboidTask implements Comparable<CuboidTask> {
+        CuboidResult parent;
+        long childCuboidId;
+
+        CuboidTask(CuboidResult parent, long childCuboidId) {
+            this.parent = parent;
+            this.childCuboidId = childCuboidId;
+        }
+
+        @Override
+        public int compareTo(CuboidTask o) {
+            long comp = this.childCuboidId - o.childCuboidId;
+            return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
+        }
+    }
+
+    private static class CuboidResult {
+        long cuboidId;
+        GridTable table;
+        int nRows;
+        @SuppressWarnings("unused")
+        long timeSpent;
+        int aggrCacheMB;
+
+        public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+            this.cuboidId = cuboidId;
+            this.table = table;
+            this.nRows = nRows;
+            this.timeSpent = timeSpent;
+            this.aggrCacheMB = aggrCacheMB;
+        }
+    }
+
+    // ============================================================================
+
+    private class InputConverter implements IGTScanner {
+        GTInfo info;
+        GTRecord record;
+
+        public InputConverter(GTInfo info) {
+            this.info = info;
+            this.record = new GTRecord(info);
+        }
+
+        @Override
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+
+                List<String> currentObject = null;
+
+                @Override
+                public boolean hasNext() {
+                    try {
+                        currentObject = inputQueue.take();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    return currentObject != null && currentObject.size() > 0;
+                }
+
+                @Override
+                public GTRecord next() {
+                    if (currentObject.size() == 0)
+                        throw new IllegalStateException();
+
+                    buildGTRecord(currentObject, record);
+                    return record;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public GTInfo getInfo() {
+            return info;
+        }
+
+        @Override
+        public int getScannedRowCount() {
+            return 0;
+        }
+
+        @Override
+        public int getScannedRowBlockCount() {
+            return 0;
+        }
+
+        private void buildGTRecord(List<String> row, GTRecord record) {
+            Object[] dimensions = buildKey(row);
+            Object[] metricsValues = buildValue(row);
+            Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+            System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+            System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+            record.setValues(recordValues);
+        }
+
+        private Object[] buildKey(List<String> row) {
+            int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+            Object[] key = new Object[keySize];
+
+            for (int i = 0; i < keySize; i++) {
+                key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+            }
+
+            return key;
+        }
+
+        private Object[] buildValue(List<String> row) {
+
+            Object[] values = new Object[measureCount];
+            MeasureDesc measureDesc = null;
+
+            for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
+                int i = hbaseMeasureRefIndex[position];
+                measureDesc = measureDescs[i];
+
+                Object value = null;
+                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
+                FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
+                if (function.isCount() || function.isHolisticCountDistinct()) {
+                    // note for holistic count distinct, this value will be ignored
+                    value = ONE;
+                } else if (flatTableIdx == null) {
+                    value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+                } else if (flatTableIdx.length == 1) {
+                    value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
+                } else {
+
+                    byte[] result = null;
+                    for (int x = 0; x < flatTableIdx.length; x++) {
+                        byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
+                        if (result == null) {
+                            result = Arrays.copyOf(split, split.length);
+                        } else {
+                            byte[] newResult = new byte[result.length + split.length];
+                            System.arraycopy(result, 0, newResult, 0, result.length);
+                            System.arraycopy(split, 0, newResult, result.length, split.length);
+                            result = newResult;
+                        }
+                    }
+                    value = measureCodec.getSerializer(i).valueOf(result);
+                }
+                values[position] = value;
+            }
+            return values;
+        }
+
+    }
+}