You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/26 17:51:58 UTC
incubator-kylin git commit: KYLIN-770 optimize InMemCubeBuilder
memory usage (reverted from commit 47c6ccd5c573ae791eccdf084217a1742eefa35b)
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 74572f3ee -> 9efb47369
KYLIN-770 optimize InMemCubeBuilder memory usage (reverted from commit 47c6ccd5c573ae791eccdf084217a1742eefa35b)
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9efb4736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9efb4736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9efb4736
Branch: refs/heads/0.8.0
Commit: 9efb473691d6150fa19dccd839813336bd71f566
Parents: 74572f3
Author: honma <ho...@ebay.com>
Authored: Tue May 26 23:56:41 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue May 26 23:56:41 2015 +0800
----------------------------------------------------------------------
.../job/hadoop/cubev2/InMemCubeBuilder.java | 142 +++++++-----------
pom.xml | 7 -
.../storage/gridtable/GTAggregateScanner.java | 150 +++----------------
.../kylin/storage/gridtable/GTRecord.java | 39 ++---
.../kylin/storage/gridtable/MemoryChecker.java | 29 ----
.../gridtable/memstore/GTSimpleMemStore.java | 2 -
6 files changed, 84 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9efb4736/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
index 56989a6..cff3474 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
@@ -56,20 +56,22 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CubeGridTable;
import org.apache.kylin.storage.gridtable.*;
+import org.apache.kylin.storage.util.SizeOfUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
/**
*/
public class InMemCubeBuilder implements Runnable {
+ //estimation of (size of aggregation cache) / (size of mem store)
+ private static final double AGGREGATION_CACHE_FACTOR = 3;
private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
- private static final int DEFAULT_TIMEOUT = 25;
private BlockingQueue<List<String>> queue;
private CubeDesc desc = null;
@@ -84,6 +86,7 @@ public class InMemCubeBuilder implements Runnable {
private int[] hbaseMeasureRefIndex;
private MeasureDesc[] measureDescs;
private int measureCount;
+ private boolean hasDependentMeasure = false;
protected IGTRecordWriter gtRecordWriter;
@@ -145,6 +148,8 @@ public class InMemCubeBuilder implements Runnable {
}
}
+ this.hasDependentMeasure = dependentMeasures.size() > 0;
+
this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
}
@@ -156,7 +161,8 @@ public class InMemCubeBuilder implements Runnable {
return gridTable;
}
- private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
+ private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId, boolean inMem) throws IOException {
+ logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
BitSet parentDimensions = columnBitSets.getFirst();
BitSet measureColumns = columnBitSets.getSecond();
@@ -177,14 +183,14 @@ public class InMemCubeBuilder implements Runnable {
mask = mask >> 1;
}
- return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
+ return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns, inMem);
}
- private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
+ private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns, boolean inMem) throws IOException {
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
IGTScanner scanner = gridTable.scan(req);
- GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
+ GridTable newGridTable = newGridTableByCuboidID(cuboidId, inMem);
GTBuilder builder = newGridTable.rebuild();
BitSet allNeededColumns = new BitSet();
@@ -209,7 +215,7 @@ public class InMemCubeBuilder implements Runnable {
newRecord.set(index, record.get(i));
}
- if(dependentMeasures.size() > 0) {
+ if(hasDependentMeasure) {
// update measures which have 'dependent_measure_ref'
newRecord.getValues(dependentMetrics, hllObjects);
@@ -303,7 +309,7 @@ public class InMemCubeBuilder implements Runnable {
public void run() {
try {
logger.info("Create base cuboid " + baseCuboidId);
- final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
+ final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, true);
GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
@@ -386,7 +392,6 @@ public class InMemCubeBuilder implements Runnable {
createNDCuboidGT(tree, baseCuboidId, childId);
}
}
- outputGT(baseCuboidId, baseCuboidGT);
dropStore(baseCuboidGT);
} catch (IOException e) {
@@ -406,96 +411,55 @@ public class InMemCubeBuilder implements Runnable {
record.setValues(recordValues);
}
+ private long checkMemory(long threshold) {
+ final long freeMemory = Runtime.getRuntime().freeMemory();
+ logger.info("available memory:" + (freeMemory >> 10) + " KB, memory needed:" + (threshold >> 10) + " KB");
+ return freeMemory - threshold;
+ }
+
private boolean gc(TreeNode<GridTable> parentNode) {
+ final long parentCuboidMem = SizeOfUtil.deepSizeOf(parentNode.data.getStore());
+ long threshold = (long) (parentCuboidMem * (AGGREGATION_CACHE_FACTOR + 1));
final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
- logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
+ long memoryLeft = checkMemory(threshold);
for (TreeNode<GridTable> gridTable : gridTables) {
- final GTComboStore store = (GTComboStore) gridTable.data.getStore();
- if (store.memoryUsage() > 0) {
- logger.info("cuboid id:" + gridTable.id + " flush to disk");
- long t = System.currentTimeMillis();
- store.switchToDiskStore();
- logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
- waitForGc();
+ if (memoryLeft >= 0) {
return true;
- }
- }
- logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
- return false;
-
- }
-
- private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
- final ExecutorService executorService = Executors.newSingleThreadExecutor();
- final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
- @Override
- public GridTable call() throws Exception {
- return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
- }
- });
- try {
- final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
- return gridTable;
- } catch (InterruptedException e) {
- throw new RuntimeException("this should not happen", e);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof OutOfMemoryError) {
- logger.warn("Future.get() OutOfMemory, stop the thread");
} else {
- throw new RuntimeException("this should not happen", e);
+ logger.info("memory is low, try to select one node to flush to disk from:" + StringUtils.join(",", gridTables));
+ final GTComboStore store = (GTComboStore) gridTable.data.getStore();
+ if (store.memoryUsage() > 0) {
+ final long storeSize = SizeOfUtil.deepSizeOf(store);
+ memoryLeft += storeSize;
+ logger.info("cuboid id:" + gridTable.id + " selected, memory used:" + (storeSize >> 10) + " KB");
+ long t = System.currentTimeMillis();
+ ((GTComboStore) store).switchToDiskStore();
+ logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
+ }
}
- } catch (TimeoutException e) {
- logger.warn("Future.get() timeout, stop the thread");
}
- logger.info("shutdown executor service");
- final List<Runnable> runnables = executorService.shutdownNow();
- try {
- executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
- waitForGc();
- } catch (InterruptedException e) {
- throw new RuntimeException("this should not happen", e);
+ if (memoryLeft >= 0) {
+ return true;
+ } else {
+ logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk, memory is still insufficient, usually due to jvm gc not finished, forced to use memory store");
+ return true;
}
- return null;
-
- }
- private void waitForGc() {
- System.gc();
- logger.info("wait 5 seconds for gc");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException("should not happen", e);
- }
}
private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
long startTime = System.currentTimeMillis();
-// GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
-// if (parentStore.memoryUsage() <= 0) {
-// long t = System.currentTimeMillis();
-// parentStore.switchToMemStore();
-// logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
-// }
-
- GridTable currentCuboid;
- while (true) {
- logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
- currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
- if (currentCuboid != null) {
- break;
- } else {
- logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
- if (gc(parentNode)) {
- continue;
- } else {
- logger.warn("all parent node has been flushed into disk, memory is still insufficient");
- throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
- }
- }
+ GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
+ if (parentStore.memoryUsage() <= 0) {
+ long t = System.currentTimeMillis();
+ parentStore.switchToMemStore();
+ logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
}
+
+ boolean inMem = gc(parentNode);
+ GridTable currentCuboid = aggregateCuboid(parentNode.data, parentCuboidId, cuboidId, inMem);
SimpleGridTableTree node = new SimpleGridTableTree();
node.parent = parentNode;
node.data = currentCuboid;
@@ -512,15 +476,13 @@ public class InMemCubeBuilder implements Runnable {
}
}
-
+ startTime = System.currentTimeMillis();
//output the grid table
outputGT(cuboidId, currentCuboid);
dropStore(currentCuboid);
parentNode.children.remove(node);
- if (parentNode.children.size() > 0) {
- logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
- ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
- }
+ logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+
}
private void dropStore(GridTable gt) throws IOException {
@@ -529,13 +491,11 @@ public class InMemCubeBuilder implements Runnable {
private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
- long startTime = System.currentTimeMillis();
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
IGTScanner scanner = gridTable.scan(req);
for (GTRecord record : scanner) {
this.gtRecordWriter.write(cuboidId, record);
}
- logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
}
private static class TreeNode<T> {
@@ -546,7 +506,7 @@ public class InMemCubeBuilder implements Runnable {
List<TreeNode<T>> getAncestorList() {
ArrayList<TreeNode<T>> result = Lists.newArrayList();
- TreeNode<T> parent = this;
+ TreeNode<T> parent = this.parent;
while (parent != null) {
result.add(parent);
parent = parent.parent;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9efb4736/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3f24301..e1e13fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,8 +68,6 @@
<commons-configuration.version>1.9</commons-configuration.version>
<commons-daemon.version>1.0.15</commons-daemon.version>
<commons-httpclient.version>3.1</commons-httpclient.version>
- <commons-collections4.version>4.0</commons-collections4.version>
-
<!-- Utility -->
<log4j.version>1.2.17</log4j.version>
@@ -342,11 +340,6 @@
<version>${commons-httpclient.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-collections4</artifactId>
- <version>${commons-collections4.version}</version>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9efb4736/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index f3de4f6..14a3efa 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -1,23 +1,18 @@
package org.apache.kylin.storage.gridtable;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.SortedMap;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+
+import com.google.common.collect.Maps;
+
public class GTAggregateScanner implements IGTScanner {
- private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
final GTInfo info;
final BitSet dimensions; // dimensions to return, can be more than group by
final BitSet groupBy;
@@ -25,7 +20,6 @@ public class GTAggregateScanner implements IGTScanner {
final String[] metricsAggrFuncs;
final IGTScanner inputScanner;
-
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
if (req.hasAggregation() == false)
throw new IllegalStateException();
@@ -61,119 +55,13 @@ public class GTAggregateScanner implements IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
- AggregationCacheWithBytesKey aggregationCacheWithBytesKey = new AggregationCacheWithBytesKey();
+ AggregationCache aggrCache = new AggregationCache();
for (GTRecord r : inputScanner) {
- aggregationCacheWithBytesKey.aggregate(r);
- MemoryChecker.checkMemory();
+ aggrCache.aggregate(r);
}
- return aggregationCacheWithBytesKey.iterator();
+ return aggrCache.iterator();
}
- class AggregationCacheWithBytesKey {
- final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
-
- public AggregationCacheWithBytesKey() {
- aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
- @Override
- public int compare(byte[] o1, byte[] o2) {
- int result = 0;
- Preconditions.checkArgument(o1.length == o2.length);
- final int length = o1.length;
- for (int i = 0; i < length; ++i) {
- result = o1[i] - o2[i];
- if (result == 0) {
- continue;
- } else {
- return result;
- }
- }
- return result;
- }
- });
- }
-
- private byte[] createKey(GTRecord record) {
- byte[] result = new byte[info.getMaxColumnLength(groupBy)];
- int offset = 0;
- for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
- final ByteArray byteArray = record.cols[i];
- final int columnLength = info.codeSystem.maxCodeLength(i);
- System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, columnLength);
- offset += columnLength;
- }
- assert offset == result.length;
- return result;
- }
-
- void aggregate(GTRecord r) {
- final byte[] key = createKey(r);
- MeasureAggregator[] aggrs = aggBufMap.get(key);
- if (aggrs == null) {
- aggrs = new MeasureAggregator[metricsAggrFuncs.length];
- for (int i = 0, col = -1; i < aggrs.length; i++) {
- col = metrics.nextSetBit(col + 1);
- aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
- }
- aggBufMap.put(key, aggrs);
- }
- for (int i = 0, col = -1; i < aggrs.length; i++) {
- col = metrics.nextSetBit(col + 1);
- Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
- aggrs[i].aggregate(metrics);
- }
- }
-
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
-
- final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
- final GTRecord secondRecord;
-
- {
- BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
- dimensionsAndMetrics.or(metrics);
- secondRecord = new GTRecord(info, dimensionsAndMetrics);
- }
-
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override
- public GTRecord next() {
- Entry<byte[], MeasureAggregator[]> entry = it.next();
- create(entry.getKey(), entry.getValue());
- return secondRecord;
- }
-
- private void create(byte[] key, MeasureAggregator[] value) {
- int offset = 0;
- for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
- final int columnLength = info.codeSystem.maxCodeLength(i);
- secondRecord.set(i, new ByteArray(key, offset, columnLength));
- offset += columnLength;
- }
- metricsBuf.clear();
- for (int i = 0, col = -1; i < value.length; i++) {
- col = metrics.nextSetBit(col + 1);
- int pos = metricsBuf.position();
- info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
- secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- }
-
- /*
@SuppressWarnings({ "rawtypes", "unchecked" })
class AggregationCache {
final SortedMap<GTRecord, MeasureAggregator[]> aggBufMap;
@@ -204,16 +92,9 @@ public class GTAggregateScanner implements IGTScanner {
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
- final Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
-
- final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
- final GTRecord oneRecord; // avoid instance creation
-
- {
- BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
- dimensionsAndMetrics.or(metrics);
- oneRecord = new GTRecord(info, dimensionsAndMetrics);
- }
+ Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+ ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+ GTRecord oneRecord = new GTRecord(info); // avoid instance creation
@Override
public boolean hasNext() {
@@ -223,7 +104,7 @@ public class GTAggregateScanner implements IGTScanner {
@Override
public GTRecord next() {
Entry<GTRecord, MeasureAggregator[]> entry = it.next();
-
+
GTRecord dims = entry.getKey();
for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
oneRecord.cols[i].set(dims.cols[i]);
@@ -237,6 +118,7 @@ public class GTAggregateScanner implements IGTScanner {
info.codeSystem.encodeColumnValue(col, aggrs[i].getState(), metricsBuf);
oneRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
}
+
return oneRecord;
}
@@ -247,6 +129,12 @@ public class GTAggregateScanner implements IGTScanner {
};
}
+ public long getSize() {
+ return aggBufMap.size();
+ }
+
+ // ============================================================================
+
transient int rowMemBytes;
static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
@@ -270,6 +158,4 @@ public class GTAggregateScanner implements IGTScanner {
}
}
- */
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9efb4736/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
index 0e5c1b7..3d0f9bb 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
@@ -1,12 +1,12 @@
package org.apache.kylin.storage.gridtable;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.metadata.filter.IFilterCodeSystem;
-
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+
public class GTRecord implements Comparable<GTRecord> {
final GTInfo info;
@@ -14,19 +14,12 @@ public class GTRecord implements Comparable<GTRecord> {
private BitSet maskForEqualHashComp;
- public GTRecord(GTInfo info, BitSet maskForEqualHashComp) {
+ public GTRecord(GTInfo info) {
this.info = info;
this.cols = new ByteArray[info.getColumnCount()];
- for (int i = 0; i < cols.length; i++) {
- if (maskForEqualHashComp.get(i)) {
- this.cols[i] = new ByteArray();
- }
- }
- this.maskForEqualHashComp = maskForEqualHashComp;
- }
-
- public GTRecord(GTInfo info) {
- this(info, info.colAll);
+ for (int i = 0; i < cols.length; i++)
+ this.cols[i] = new ByteArray();
+ this.maskForEqualHashComp = info.colAll;
}
public GTInfo getInfo() {
@@ -69,11 +62,10 @@ public class GTRecord implements Comparable<GTRecord> {
public Object[] getValues(BitSet selectedColumns, Object[] result) {
assert selectedColumns.cardinality() <= result.length;
for (int i = 0, c = selectedColumns.nextSetBit(0); c >= 0; i++, c = selectedColumns.nextSetBit(c + 1)) {
- if (cols[c] == null || cols[c].array() == null) {
+ if (cols[c].array() == null)
result[i] = null;
- } else {
+ else
result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
- }
}
return result;
}
@@ -82,11 +74,10 @@ public class GTRecord implements Comparable<GTRecord> {
assert selectedColumns.length <= result.length;
for (int i = 0; i < selectedColumns.length; i++) {
int c = selectedColumns[i];
- if (cols[c].array() == null) {
+ if (cols[c].array() == null)
result[i] = null;
- } else {
+ else
result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
- }
}
return result;
}
@@ -103,7 +94,8 @@ public class GTRecord implements Comparable<GTRecord> {
byte[] space = new byte[len];
- GTRecord copy = new GTRecord(info, this.maskForEqualHashComp);
+ GTRecord copy = new GTRecord(info);
+ copy.maskForEqualHashComp = this.maskForEqualHashComp;
int pos = 0;
for (int i = selectedCols.nextSetBit(0); i >= 0; i = selectedCols.nextSetBit(i + 1)) {
System.arraycopy(cols[i].array(), cols[i].offset(), space, pos, cols[i].length());
@@ -137,9 +129,8 @@ public class GTRecord implements Comparable<GTRecord> {
if (this.maskForEqualHashComp != o.maskForEqualHashComp)
return false;
for (int i = maskForEqualHashComp.nextSetBit(0); i >= 0; i = maskForEqualHashComp.nextSetBit(i + 1)) {
- if (this.cols[i].equals(o.cols[i]) == false) {
+ if (this.cols[i].equals(o.cols[i]) == false)
return false;
- }
}
return true;
}
@@ -170,7 +161,7 @@ public class GTRecord implements Comparable<GTRecord> {
@Override
public String toString() {
- return toString(maskForEqualHashComp);
+ return toString(info.colAll);
}
public String toString(BitSet selectedColumns) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9efb4736/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
deleted file mode 100644
index 2dac8fe..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public final class MemoryChecker {
-
- private static Logger logger = LoggerFactory.getLogger(MemoryChecker.class);
-
- private static final int MEMORY_THRESHOLD = 80 << 20;
-
- private MemoryChecker() {
- }
-
- public static final void checkMemory() {
- if (!Thread.currentThread().isInterrupted()) {
- final long freeMem = Runtime.getRuntime().freeMemory();
- if (freeMem <= MEMORY_THRESHOLD) {
- throw new OutOfMemoryError("free memory:" + freeMem + " is lower than " + MEMORY_THRESHOLD);
- }
- } else {
- logger.info("thread interrupted");
- throw new OutOfMemoryError("thread interrupted");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9efb4736/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index c97fe39..a4d0b8d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -11,7 +11,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTRowBlock;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.IGTStore;
-import org.apache.kylin.storage.gridtable.*;
public class GTSimpleMemStore implements IGTStore {
@@ -74,7 +73,6 @@ public class GTSimpleMemStore implements IGTStore {
} else {
assert id == rowBlockList.size();
rowBlockList.add(copy);
- MemoryChecker.checkMemory();
}
}
}