You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/25 12:39:49 UTC
incubator-kylin git commit: KYLIN-770 optimize InMemCubeBuilder
memory usage
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 9c0b3a954 -> 47c6ccd5c
KYLIN-770 optimize InMemCubeBuilder memory usage
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/47c6ccd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/47c6ccd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/47c6ccd5
Branch: refs/heads/0.8.0
Commit: 47c6ccd5c573ae791eccdf084217a1742eefa35b
Parents: 9c0b3a9
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue May 19 14:48:41 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon May 25 18:39:23 2015 +0800
----------------------------------------------------------------------
.../job/hadoop/cubev2/InMemCubeBuilder.java | 142 +++++++++++-------
.../cubev2/InMemCubeBuilderBenchmarkTest.java | 117 +++++++++++++++
pom.xml | 8 +
.../storage/gridtable/GTAggregateScanner.java | 150 ++++++++++++++++---
.../kylin/storage/gridtable/GTRecord.java | 39 +++--
.../kylin/storage/gridtable/MemoryChecker.java | 29 ++++
.../gridtable/memstore/GTSimpleMemStore.java | 2 +
7 files changed, 403 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
index cff3474..56989a6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
@@ -56,22 +56,20 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CubeGridTable;
import org.apache.kylin.storage.gridtable.*;
-import org.apache.kylin.storage.util.SizeOfUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
/**
*/
public class InMemCubeBuilder implements Runnable {
- //estimation of (size of aggregation cache) / (size of mem store)
- private static final double AGGREGATION_CACHE_FACTOR = 3;
private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
+ private static final int DEFAULT_TIMEOUT = 25;
private BlockingQueue<List<String>> queue;
private CubeDesc desc = null;
@@ -86,7 +84,6 @@ public class InMemCubeBuilder implements Runnable {
private int[] hbaseMeasureRefIndex;
private MeasureDesc[] measureDescs;
private int measureCount;
- private boolean hasDependentMeasure = false;
protected IGTRecordWriter gtRecordWriter;
@@ -148,8 +145,6 @@ public class InMemCubeBuilder implements Runnable {
}
}
- this.hasDependentMeasure = dependentMeasures.size() > 0;
-
this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
}
@@ -161,8 +156,7 @@ public class InMemCubeBuilder implements Runnable {
return gridTable;
}
- private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId, boolean inMem) throws IOException {
- logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
+ private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
BitSet parentDimensions = columnBitSets.getFirst();
BitSet measureColumns = columnBitSets.getSecond();
@@ -183,14 +177,14 @@ public class InMemCubeBuilder implements Runnable {
mask = mask >> 1;
}
- return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns, inMem);
+ return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
}
- private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns, boolean inMem) throws IOException {
+ private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
IGTScanner scanner = gridTable.scan(req);
- GridTable newGridTable = newGridTableByCuboidID(cuboidId, inMem);
+ GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
GTBuilder builder = newGridTable.rebuild();
BitSet allNeededColumns = new BitSet();
@@ -215,7 +209,7 @@ public class InMemCubeBuilder implements Runnable {
newRecord.set(index, record.get(i));
}
- if(hasDependentMeasure) {
+ if(dependentMeasures.size() > 0) {
// update measures which have 'dependent_measure_ref'
newRecord.getValues(dependentMetrics, hllObjects);
@@ -309,7 +303,7 @@ public class InMemCubeBuilder implements Runnable {
public void run() {
try {
logger.info("Create base cuboid " + baseCuboidId);
- final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, true);
+ final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
@@ -392,6 +386,7 @@ public class InMemCubeBuilder implements Runnable {
createNDCuboidGT(tree, baseCuboidId, childId);
}
}
+ outputGT(baseCuboidId, baseCuboidGT);
dropStore(baseCuboidGT);
} catch (IOException e) {
@@ -411,55 +406,96 @@ public class InMemCubeBuilder implements Runnable {
record.setValues(recordValues);
}
- private long checkMemory(long threshold) {
- final long freeMemory = Runtime.getRuntime().freeMemory();
- logger.info("available memory:" + (freeMemory >> 10) + " KB, memory needed:" + (threshold >> 10) + " KB");
- return freeMemory - threshold;
- }
-
private boolean gc(TreeNode<GridTable> parentNode) {
- final long parentCuboidMem = SizeOfUtil.deepSizeOf(parentNode.data.getStore());
- long threshold = (long) (parentCuboidMem * (AGGREGATION_CACHE_FACTOR + 1));
final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
- long memoryLeft = checkMemory(threshold);
+ logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
for (TreeNode<GridTable> gridTable : gridTables) {
- if (memoryLeft >= 0) {
+ final GTComboStore store = (GTComboStore) gridTable.data.getStore();
+ if (store.memoryUsage() > 0) {
+ logger.info("cuboid id:" + gridTable.id + " flush to disk");
+ long t = System.currentTimeMillis();
+ store.switchToDiskStore();
+ logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
+ waitForGc();
return true;
+ }
+ }
+ logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
+ return false;
+
+ }
+
+ private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
+ @Override
+ public GridTable call() throws Exception {
+ return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
+ }
+ });
+ try {
+ final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+ return gridTable;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("this should not happen", e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof OutOfMemoryError) {
+ logger.warn("Future.get() OutOfMemory, stop the thread");
} else {
- logger.info("memory is low, try to select one node to flush to disk from:" + StringUtils.join(",", gridTables));
- final GTComboStore store = (GTComboStore) gridTable.data.getStore();
- if (store.memoryUsage() > 0) {
- final long storeSize = SizeOfUtil.deepSizeOf(store);
- memoryLeft += storeSize;
- logger.info("cuboid id:" + gridTable.id + " selected, memory used:" + (storeSize >> 10) + " KB");
- long t = System.currentTimeMillis();
- ((GTComboStore) store).switchToDiskStore();
- logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
- }
+ throw new RuntimeException("this should not happen", e);
}
+ } catch (TimeoutException e) {
+ logger.warn("Future.get() timeout, stop the thread");
}
- if (memoryLeft >= 0) {
- return true;
- } else {
- logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk, memory is still insufficient, usually due to jvm gc not finished, forced to use memory store");
- return true;
+ logger.info("shutdown executor service");
+ final List<Runnable> runnables = executorService.shutdownNow();
+ try {
+ executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+ waitForGc();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("this should not happen", e);
}
+ return null;
+
+ }
+ private void waitForGc() {
+ System.gc();
+ logger.info("wait 5 seconds for gc");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("should not happen", e);
+ }
}
private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
long startTime = System.currentTimeMillis();
- GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
- if (parentStore.memoryUsage() <= 0) {
- long t = System.currentTimeMillis();
- parentStore.switchToMemStore();
- logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
+// GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
+// if (parentStore.memoryUsage() <= 0) {
+// long t = System.currentTimeMillis();
+// parentStore.switchToMemStore();
+// logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
+// }
+
+ GridTable currentCuboid;
+ while (true) {
+ logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
+ currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
+ if (currentCuboid != null) {
+ break;
+ } else {
+ logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
+ if (gc(parentNode)) {
+ continue;
+ } else {
+ logger.warn("all parent node has been flushed into disk, memory is still insufficient");
+ throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
+ }
+ }
}
-
- boolean inMem = gc(parentNode);
- GridTable currentCuboid = aggregateCuboid(parentNode.data, parentCuboidId, cuboidId, inMem);
SimpleGridTableTree node = new SimpleGridTableTree();
node.parent = parentNode;
node.data = currentCuboid;
@@ -476,13 +512,15 @@ public class InMemCubeBuilder implements Runnable {
}
}
- startTime = System.currentTimeMillis();
+
//output the grid table
outputGT(cuboidId, currentCuboid);
dropStore(currentCuboid);
parentNode.children.remove(node);
- logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-
+ if (parentNode.children.size() > 0) {
+ logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
+ ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
+ }
}
private void dropStore(GridTable gt) throws IOException {
@@ -491,11 +529,13 @@ public class InMemCubeBuilder implements Runnable {
private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
+ long startTime = System.currentTimeMillis();
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
IGTScanner scanner = gridTable.scan(req);
for (GTRecord record : scanner) {
this.gtRecordWriter.write(cuboidId, record);
}
+ logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
}
private static class TreeNode<T> {
@@ -506,7 +546,7 @@ public class InMemCubeBuilder implements Runnable {
List<TreeNode<T>> getAncestorList() {
ArrayList<TreeNode<T>> result = Lists.newArrayList();
- TreeNode<T> parent = this.parent;
+ TreeNode<T> parent = this;
while (parent != null) {
result.add(parent);
parent = parent.parent;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
new file mode 100644
index 0000000..e90a41b
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
@@ -0,0 +1,117 @@
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
+
+ private static final int BENCHMARK_RECORD_LIMIT = 2000000;
+ private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
+ final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+ final CubeDesc desc = cubeSegment.getCubeDesc();
+ for (DimensionDesc dim : desc.getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (desc.getRowkey().isUseDictionary(col)) {
+ Dictionary dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
+ }
+ logger.info("Dictionary for " + col + " was put into dictionary map.");
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
+ }
+ }
+ return dictionaryMap;
+ }
+
+ private static class ConsoleGTRecordWriter implements IGTRecordWriter {
+
+ boolean verbose = false;
+
+ @Override
+ public void write(Long cuboidId, GTRecord record) throws IOException {
+ if (verbose)
+ System.out.println(record.toString());
+ }
+ }
+
+ private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+ String line;
+ int counter = 0;
+ while ((line = br.readLine()) != null) {
+ queue.put(Arrays.asList(line.split("\t")));
+ counter++;
+ if (counter == BENCHMARK_RECORD_LIMIT) {
+ break;
+ }
+ }
+ queue.put(Collections.emptyList());
+ }
+
+ private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+ queue.put(Collections.emptyList());
+ }
+
+
+ @Test
+ public void test() throws Exception {
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+ final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
+ final CubeSegment cubeSegment = cube.getFirstSegment();
+
+ LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
+
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(cubeBuilder);
+ loadDataFromLocalFile(queue);
+ future.get();
+ logger.info("stream build finished");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb091a9..2e9b4f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,8 @@
<commons-configuration.version>1.9</commons-configuration.version>
<commons-daemon.version>1.0.15</commons-daemon.version>
<commons-httpclient.version>3.1</commons-httpclient.version>
+ <commons-collections4.version>4.0</commons-collections4.version>
+
<!-- Utility -->
<log4j.version>1.2.17</log4j.version>
@@ -338,6 +340,11 @@
<version>${commons-httpclient.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>${commons-collections4.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
@@ -609,6 +616,7 @@
<exclude>**/Kafka*Test.java</exclude>
<exclude>**/RequesterTest.java</exclude>
+ <exclude>**/InMemCubeBuilderBenchmarkTest.java</exclude>
</excludes>
<systemProperties>
<property>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index 14a3efa..f3de4f6 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -1,18 +1,23 @@
package org.apache.kylin.storage.gridtable;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.SortedMap;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-
-import com.google.common.collect.Maps;
-
public class GTAggregateScanner implements IGTScanner {
+ private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
final GTInfo info;
final BitSet dimensions; // dimensions to return, can be more than group by
final BitSet groupBy;
@@ -20,6 +25,7 @@ public class GTAggregateScanner implements IGTScanner {
final String[] metricsAggrFuncs;
final IGTScanner inputScanner;
+
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
if (req.hasAggregation() == false)
throw new IllegalStateException();
@@ -55,13 +61,119 @@ public class GTAggregateScanner implements IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
- AggregationCache aggrCache = new AggregationCache();
+ AggregationCacheWithBytesKey aggregationCacheWithBytesKey = new AggregationCacheWithBytesKey();
for (GTRecord r : inputScanner) {
- aggrCache.aggregate(r);
+ aggregationCacheWithBytesKey.aggregate(r);
+ MemoryChecker.checkMemory();
}
- return aggrCache.iterator();
+ return aggregationCacheWithBytesKey.iterator();
}
+ class AggregationCacheWithBytesKey {
+ final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+
+ public AggregationCacheWithBytesKey() {
+ aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ int result = 0;
+ Preconditions.checkArgument(o1.length == o2.length);
+ final int length = o1.length;
+ for (int i = 0; i < length; ++i) {
+ result = o1[i] - o2[i];
+ if (result == 0) {
+ continue;
+ } else {
+ return result;
+ }
+ }
+ return result;
+ }
+ });
+ }
+
+ private byte[] createKey(GTRecord record) {
+ byte[] result = new byte[info.getMaxColumnLength(groupBy)];
+ int offset = 0;
+ for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
+ final ByteArray byteArray = record.cols[i];
+ final int columnLength = info.codeSystem.maxCodeLength(i);
+ System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, columnLength);
+ offset += columnLength;
+ }
+ assert offset == result.length;
+ return result;
+ }
+
+ void aggregate(GTRecord r) {
+ final byte[] key = createKey(r);
+ MeasureAggregator[] aggrs = aggBufMap.get(key);
+ if (aggrs == null) {
+ aggrs = new MeasureAggregator[metricsAggrFuncs.length];
+ for (int i = 0, col = -1; i < aggrs.length; i++) {
+ col = metrics.nextSetBit(col + 1);
+ aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
+ }
+ aggBufMap.put(key, aggrs);
+ }
+ for (int i = 0, col = -1; i < aggrs.length; i++) {
+ col = metrics.nextSetBit(col + 1);
+ Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
+ aggrs[i].aggregate(metrics);
+ }
+ }
+
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+
+ final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+ final GTRecord secondRecord;
+
+ {
+ BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
+ dimensionsAndMetrics.or(metrics);
+ secondRecord = new GTRecord(info, dimensionsAndMetrics);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public GTRecord next() {
+ Entry<byte[], MeasureAggregator[]> entry = it.next();
+ create(entry.getKey(), entry.getValue());
+ return secondRecord;
+ }
+
+ private void create(byte[] key, MeasureAggregator[] value) {
+ int offset = 0;
+ for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
+ final int columnLength = info.codeSystem.maxCodeLength(i);
+ secondRecord.set(i, new ByteArray(key, offset, columnLength));
+ offset += columnLength;
+ }
+ metricsBuf.clear();
+ for (int i = 0, col = -1; i < value.length; i++) {
+ col = metrics.nextSetBit(col + 1);
+ int pos = metricsBuf.position();
+ info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
+ secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
+ /*
@SuppressWarnings({ "rawtypes", "unchecked" })
class AggregationCache {
final SortedMap<GTRecord, MeasureAggregator[]> aggBufMap;
@@ -92,9 +204,16 @@ public class GTAggregateScanner implements IGTScanner {
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
- Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
- ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
- GTRecord oneRecord = new GTRecord(info); // avoid instance creation
+ final Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+
+ final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+ final GTRecord oneRecord; // avoid instance creation
+
+ {
+ BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
+ dimensionsAndMetrics.or(metrics);
+ oneRecord = new GTRecord(info, dimensionsAndMetrics);
+ }
@Override
public boolean hasNext() {
@@ -104,7 +223,7 @@ public class GTAggregateScanner implements IGTScanner {
@Override
public GTRecord next() {
Entry<GTRecord, MeasureAggregator[]> entry = it.next();
-
+
GTRecord dims = entry.getKey();
for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
oneRecord.cols[i].set(dims.cols[i]);
@@ -118,7 +237,6 @@ public class GTAggregateScanner implements IGTScanner {
info.codeSystem.encodeColumnValue(col, aggrs[i].getState(), metricsBuf);
oneRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
}
-
return oneRecord;
}
@@ -129,12 +247,6 @@ public class GTAggregateScanner implements IGTScanner {
};
}
- public long getSize() {
- return aggBufMap.size();
- }
-
- // ============================================================================
-
transient int rowMemBytes;
static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
@@ -158,4 +270,6 @@ public class GTAggregateScanner implements IGTScanner {
}
}
+ */
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
index 3d0f9bb..0e5c1b7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
@@ -1,12 +1,12 @@
package org.apache.kylin.storage.gridtable;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.metadata.filter.IFilterCodeSystem;
-
public class GTRecord implements Comparable<GTRecord> {
final GTInfo info;
@@ -14,12 +14,19 @@ public class GTRecord implements Comparable<GTRecord> {
private BitSet maskForEqualHashComp;
- public GTRecord(GTInfo info) {
+ public GTRecord(GTInfo info, BitSet maskForEqualHashComp) {
this.info = info;
this.cols = new ByteArray[info.getColumnCount()];
- for (int i = 0; i < cols.length; i++)
- this.cols[i] = new ByteArray();
- this.maskForEqualHashComp = info.colAll;
+ for (int i = 0; i < cols.length; i++) {
+ if (maskForEqualHashComp.get(i)) {
+ this.cols[i] = new ByteArray();
+ }
+ }
+ this.maskForEqualHashComp = maskForEqualHashComp;
+ }
+
+ public GTRecord(GTInfo info) {
+ this(info, info.colAll);
}
public GTInfo getInfo() {
@@ -62,10 +69,11 @@ public class GTRecord implements Comparable<GTRecord> {
public Object[] getValues(BitSet selectedColumns, Object[] result) {
assert selectedColumns.cardinality() <= result.length;
for (int i = 0, c = selectedColumns.nextSetBit(0); c >= 0; i++, c = selectedColumns.nextSetBit(c + 1)) {
- if (cols[c].array() == null)
+ if (cols[c] == null || cols[c].array() == null) {
result[i] = null;
- else
+ } else {
result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+ }
}
return result;
}
@@ -74,10 +82,11 @@ public class GTRecord implements Comparable<GTRecord> {
assert selectedColumns.length <= result.length;
for (int i = 0; i < selectedColumns.length; i++) {
int c = selectedColumns[i];
- if (cols[c].array() == null)
+ if (cols[c].array() == null) {
result[i] = null;
- else
+ } else {
result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+ }
}
return result;
}
@@ -94,8 +103,7 @@ public class GTRecord implements Comparable<GTRecord> {
byte[] space = new byte[len];
- GTRecord copy = new GTRecord(info);
- copy.maskForEqualHashComp = this.maskForEqualHashComp;
+ GTRecord copy = new GTRecord(info, this.maskForEqualHashComp);
int pos = 0;
for (int i = selectedCols.nextSetBit(0); i >= 0; i = selectedCols.nextSetBit(i + 1)) {
System.arraycopy(cols[i].array(), cols[i].offset(), space, pos, cols[i].length());
@@ -129,8 +137,9 @@ public class GTRecord implements Comparable<GTRecord> {
if (this.maskForEqualHashComp != o.maskForEqualHashComp)
return false;
for (int i = maskForEqualHashComp.nextSetBit(0); i >= 0; i = maskForEqualHashComp.nextSetBit(i + 1)) {
- if (this.cols[i].equals(o.cols[i]) == false)
+ if (this.cols[i].equals(o.cols[i]) == false) {
return false;
+ }
}
return true;
}
@@ -161,7 +170,7 @@ public class GTRecord implements Comparable<GTRecord> {
@Override
public String toString() {
- return toString(info.colAll);
+ return toString(maskForEqualHashComp);
}
public String toString(BitSet selectedColumns) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
new file mode 100644
index 0000000..2dac8fe
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
@@ -0,0 +1,29 @@
+package org.apache.kylin.storage.gridtable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public final class MemoryChecker {
+
+ private static Logger logger = LoggerFactory.getLogger(MemoryChecker.class);
+
+ private static final int MEMORY_THRESHOLD = 80 << 20;
+
+ private MemoryChecker() {
+ }
+
+ public static final void checkMemory() {
+ if (!Thread.currentThread().isInterrupted()) {
+ final long freeMem = Runtime.getRuntime().freeMemory();
+ if (freeMem <= MEMORY_THRESHOLD) {
+ throw new OutOfMemoryError("free memory:" + freeMem + " is lower than " + MEMORY_THRESHOLD);
+ }
+ } else {
+ logger.info("thread interrupted");
+ throw new OutOfMemoryError("thread interrupted");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index a4d0b8d..c97fe39 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -11,6 +11,7 @@ import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTRowBlock;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.IGTStore;
+import org.apache.kylin.storage.gridtable.*;
public class GTSimpleMemStore implements IGTStore {
@@ -73,6 +74,7 @@ public class GTSimpleMemStore implements IGTStore {
} else {
assert id == rowBlockList.size();
rowBlockList.add(copy);
+ MemoryChecker.checkMemory();
}
}
}