You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:43 UTC
[31/50] [abbrv] incubator-kylin git commit: KYLIN-728
KYLIN-728
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0d87e8f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0d87e8f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0d87e8f6
Branch: refs/heads/streaming-localdict
Commit: 0d87e8f689e42d5a21330815eca6b2f83afb3f02
Parents: 876ac60
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue May 12 13:59:54 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue May 12 13:59:54 2015 +0800
----------------------------------------------------------------------
.../kylin/common/util/BytesSerializer.java | 6 +-
.../org/apache/kylin/common/util/SSHClient.java | 5 +-
.../kylin/cube/cuboid/CuboidScheduler.java | 14 +-
.../apache/kylin/dict/DictionarySerializer.java | 54 +++++
.../invertedindex/model/IIKeyValueCodec.java | 31 +--
.../job/hadoop/cubev2/InMemCubeBuilder.java | 197 +++++++++++++------
.../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 2 +-
.../hadoop/cubev2/MapContextGTRecordWriter.java | 2 +-
.../metadata/serializer/DataTypeSerializer.java | 32 +--
.../kylin/storage/cube/CubeCodeSystem.java | 1 +
.../storage/cube/CubeHBaseReadonlyStore.java | 14 +-
.../kylin/storage/gridtable/GTBuilder.java | 1 -
.../kylin/storage/gridtable/GTComboStore.java | 112 +++++++++++
.../apache/kylin/storage/gridtable/GTInfo.java | 10 +-
.../storage/gridtable/GTInvertedIndex.java | 4 +-
.../kylin/storage/gridtable/GTRawScanner.java | 24 +--
.../kylin/storage/gridtable/GTRecord.java | 4 +-
.../kylin/storage/gridtable/GTRowBlock.java | 19 +-
.../storage/gridtable/GTSampleCodeSystem.java | 5 +-
.../kylin/storage/gridtable/IGTScanner.java | 6 +-
.../kylin/storage/gridtable/IGTStore.java | 18 +-
.../apache/kylin/storage/gridtable/ScanKey.java | 34 ++++
.../storage/gridtable/diskstore/FileSystem.java | 22 +++
.../gridtable/diskstore/GTDiskStore.java | 160 +++++++++++++++
.../gridtable/diskstore/HadoopFileSystem.java | 88 +++++++++
.../gridtable/diskstore/LocalFileSystem.java | 60 ++++++
.../gridtable/memstore/GTSimpleMemStore.java | 49 +++--
.../apache/kylin/storage/util/SizeOfUtil.java | 21 ++
.../invertedindex/IIStreamBuilder.java | 6 +-
29 files changed, 803 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java b/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java
index 1d1f5ae..26342f5 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java
@@ -26,10 +26,10 @@ import java.nio.ByteBuffer;
*/
public interface BytesSerializer<T> {
- public static final int SERIALIZE_BUFFER_SIZE = 65536;
+ int SERIALIZE_BUFFER_SIZE = 65536;
- abstract public void serialize(T value, ByteBuffer out);
+ void serialize(T value, ByteBuffer out);
- abstract public T deserialize(ByteBuffer in);
+ T deserialize(ByteBuffer in);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/common/src/main/java/org/apache/kylin/common/util/SSHClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/SSHClient.java b/common/src/main/java/org/apache/kylin/common/util/SSHClient.java
index ccaabf0..32eb72a 100644
--- a/common/src/main/java/org/apache/kylin/common/util/SSHClient.java
+++ b/common/src/main/java/org/apache/kylin/common/util/SSHClient.java
@@ -49,6 +49,7 @@ public class SSHClient {
public SSHClient(String hostname, int port, String username, String password) {
this.hostname = hostname;
this.username = username;
+ this.port = port;
if (password != null && new File(password).exists()) {
this.identityPath = new File(password).getAbsolutePath();
this.password = null;
@@ -324,7 +325,7 @@ public class SSHClient {
throw ee;
}
if (timeout < 0)
- throw new Exception("Remote commmand not finished within " + timeoutSeconds + " seconds.");
+ throw new Exception("Remote command not finished within " + timeoutSeconds + " seconds.");
}
channel.disconnect();
session.disconnect();
@@ -340,7 +341,7 @@ public class SSHClient {
jsch.addIdentity(identityPath);
}
- Session session = jsch.getSession(username, hostname, 22);
+ Session session = jsch.getSession(username, hostname, port);
if (password != null) {
session.setPassword(password);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 6f64116..07be092 100644
--- a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -22,11 +22,7 @@ package org.apache.kylin.cube.cuboid;
* @author George Song (ysong1)
*
*/
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.cube.model.CubeDesc;
@@ -38,21 +34,21 @@ public class CuboidScheduler {
private final CubeDesc cubeDef;
private final int size;
private final long max;
- private final Map<Long, Collection<Long>> cache;
+ private final Map<Long, List<Long>> cache;
public CuboidScheduler(CubeDesc cube) {
this.cubeDef = cube;
this.size = cube.getRowkey().getRowKeyColumns().length;
this.max = (long) Math.pow(2, size) - 1;
- this.cache = new ConcurrentHashMap<Long, Collection<Long>>();
+ this.cache = new ConcurrentHashMap<Long, List<Long>>();
}
- public Collection<Long> getSpanningCuboid(long cuboid) {
+ public List<Long> getSpanningCuboid(long cuboid) {
if (cuboid > max || cuboid < 0) {
throw new IllegalArgumentException("Cuboid " + cuboid + " is out of scope 0-" + max);
}
- Collection<Long> result = cache.get(cuboid);
+ List<Long> result = cache.get(cuboid);
if (result != null) {
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
new file mode 100644
index 0000000..4b61591
--- /dev/null
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.util.ClassUtil;
+
+import java.io.*;
+
+/**
+ * Created by qianzhou on 5/5/15.
+ */
+public final class DictionarySerializer {
+
+ private DictionarySerializer() {}
+
+ public static Dictionary<?> deserialize(InputStream inputStream) {
+ try {
+ final DataInputStream dataInputStream = new DataInputStream(inputStream);
+ final String type = dataInputStream.readUTF();
+ final Dictionary dictionary = ClassUtil.forName(type, Dictionary.class).newInstance();
+ dictionary.readFields(dataInputStream);
+ return dictionary;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Dictionary<?> deserialize(ImmutableBytesWritable dictBytes) {
+ return deserialize(new ByteArrayInputStream(dictBytes.get(), dictBytes.getOffset(), dictBytes.getLength()));
+ }
+
+ public static void serialize(Dictionary<?> dict, OutputStream outputStream) {
+ try {
+ DataOutputStream out = new DataOutputStream(outputStream);
+ out.writeUTF(dict.getClass().getName());
+ dict.write(out);
+ out.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static ImmutableBytesWritable serialize(Dictionary<?> dict) {
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ out.writeUTF(dict.getClass().getName());
+ dict.write(out);
+ return new ImmutableBytesWritable(baos.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 8dbaed7..b236879 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -23,8 +23,8 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionarySerializer;
import org.apache.kylin.invertedindex.index.ColumnValueContainer;
import org.apache.kylin.invertedindex.index.CompressedValueContainer;
import org.apache.kylin.invertedindex.index.Slice;
@@ -32,7 +32,6 @@ import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
import org.apache.kylin.metadata.model.DataType;
-import java.io.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -73,31 +72,7 @@ public class IIKeyValueCodec implements KeyValueCodec {
if (dictionary == null) {
return new IIRow(key, value, new ImmutableBytesWritable(BytesUtil.EMPTY_BYTE_ARRAY));
} else {
- return new IIRow(key, value, serialize(dictionary));
- }
- }
-
- private static Dictionary<?> deserialize(ImmutableBytesWritable dictBytes) {
- try {
- final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(dictBytes.get(), dictBytes.getOffset(), dictBytes.getLength()));
- final String type = dataInputStream.readUTF();
- final Dictionary dictionary = ClassUtil.forName(type, Dictionary.class).newInstance();
- dictionary.readFields(dataInputStream);
- return dictionary;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private static ImmutableBytesWritable serialize(Dictionary<?> dict) {
- try {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos);
- out.writeUTF(dict.getClass().getName());
- dict.write(out);
- return new ImmutableBytesWritable(baos.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
+ return new IIRow(key, value, DictionarySerializer.serialize(dictionary));
}
}
@@ -221,7 +196,7 @@ public class IIKeyValueCodec implements KeyValueCodec {
} else {
final ImmutableBytesWritable dictBytes = row.getDictionary();
if (dictBytes.getLength() != 0) {
- final Dictionary<?> dictionary = deserialize(dictBytes);
+ final Dictionary<?> dictionary = DictionarySerializer.deserialize(dictBytes);
CompressedValueContainer c = new CompressedValueContainer(dictionary.getSizeOfId(), dictionary.getMaxId() - dictionary.getMinId() + 1, 0);
c.fromBytes(row.getValue());
valueContainers[curCol] = c;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
index 29cdc9a..87ad2d3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Maps;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Pair;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
@@ -51,10 +52,9 @@ import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.serializer.DataTypeSerializer;
import org.apache.kylin.storage.cube.CubeGridTable;
import org.apache.kylin.storage.gridtable.*;
-import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.storage.util.SizeOfUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +69,7 @@ import java.util.concurrent.BlockingQueue;
@SuppressWarnings("rawtypes")
public class InMemCubeBuilder implements Runnable {
+ private static final double AGGREGATION_CACHE_FACTOR = 3;
private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
private BlockingQueue<List<String>> queue;
@@ -78,14 +79,11 @@ public class InMemCubeBuilder implements Runnable {
private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
private CubeJoinedFlatTableDesc intermediateTableDesc;
private MeasureCodec measureCodec;
- private int measureNumber;
private String[] metricsAggrFuncs = null;
private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
public static final LongWritable ONE = new LongWritable(1l);
protected IGTRecordWriter gtRecordWriter;
- private GridTable baseCuboidGT;
- private DataTypeSerializer[] serializers;
/**
@@ -95,30 +93,31 @@ public class InMemCubeBuilder implements Runnable {
* @param gtRecordWriter
*/
public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
+ if (dictionaryMap == null || dictionaryMap.isEmpty()) {
+ throw new IllegalArgumentException();
+ }
this.queue = queue;
this.desc = cube.getDescriptor();
this.cuboidScheduler = new CuboidScheduler(desc);
this.dictionaryMap = dictionaryMap;
this.gtRecordWriter = gtRecordWriter;
- baseCuboidId = Cuboid.getBaseCuboidId(desc);
-
- intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
- measureCodec = new MeasureCodec(desc.getMeasures());
- measureNumber = desc.getMeasures().size();
-
- dependentMeasures = Maps.newHashMap();
+ this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
+ this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+ this.measureCodec = new MeasureCodec(desc.getMeasures());
- Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+ Map<String, Integer> measureIndexMap = Maps.newHashMap();
List<String> metricsAggrFuncsList = Lists.newArrayList();
- for (int i = 0, n = desc.getMeasures().size(); i < n; i++) {
+ final int measureCount = desc.getMeasures().size();
+ for (int i = 0; i < measureCount; i++) {
MeasureDesc measureDesc = desc.getMeasures().get(i);
metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
measureIndexMap.put(desc.getMeasures().get(i).getName(), i);
}
- metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+ this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
- for (int i = 0; i < measureNumber; i++) {
+ this.dependentMeasures = Maps.newHashMap();
+ for (int i = 0; i < measureCount; i++) {
String depMsrRef = desc.getMeasures().get(i).getDependentMeasureRef();
if (depMsrRef != null) {
int index = measureIndexMap.get(depMsrRef);
@@ -126,21 +125,19 @@ public class InMemCubeBuilder implements Runnable {
}
}
- if (dictionaryMap == null || dictionaryMap.isEmpty())
- throw new IllegalArgumentException();
}
- private GridTable newGridTableByCuboidID(long cuboidID) {
+ private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
- GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GTComboStore store = new GTComboStore(info, memStore);
GridTable gridTable = new GridTable(info, store);
return gridTable;
}
- private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
+ private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId, boolean inMem) throws IOException {
logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
- Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumBitSet(parentCuboidId);
+ Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
BitSet parentDimensions = columnBitSets.getFirst();
BitSet measureColumns = columnBitSets.getSecond();
BitSet childDimensions = (BitSet) parentDimensions.clone();
@@ -160,14 +157,14 @@ public class InMemCubeBuilder implements Runnable {
mask = mask >> 1;
}
- return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
+ return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns, inMem);
}
- private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
+ private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns, boolean inMem) throws IOException {
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
IGTScanner scanner = gridTable.scan(req);
- GridTable newGridTable = newGridTableByCuboidID(cuboidId);
+ GridTable newGridTable = newGridTableByCuboidID(cuboidId, inMem);
GTBuilder builder = newGridTable.rebuild();
BitSet allNeededColumns = new BitSet();
@@ -181,7 +178,7 @@ public class InMemCubeBuilder implements Runnable {
try {
BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
for (Integer i : dependentMeasures.keySet()) {
- dependentMetrics.set((allNeededColumns.cardinality() - measureNumber + dependentMeasures.get(i)));
+ dependentMetrics.set((allNeededColumns.cardinality() - desc.getMeasures().size() + dependentMeasures.get(i)));
}
Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
@@ -197,13 +194,13 @@ public class InMemCubeBuilder implements Runnable {
for (Integer i : dependentMeasures.keySet()) {
for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
- if (c == allNeededColumns.cardinality() - measureNumber + dependentMeasures.get(i)) {
+ if (c == allNeededColumns.cardinality() - desc.getMeasures().size() + dependentMeasures.get(i)) {
assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
byteBuffer.clear();
BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
- newRecord.set(allNeededColumns.cardinality() - measureNumber + i, byteArray);
+ newRecord.set(allNeededColumns.cardinality() - desc.getMeasures().size() + i, byteArray);
}
}
@@ -219,16 +216,16 @@ public class InMemCubeBuilder implements Runnable {
return newGridTable;
}
- private Pair<BitSet, BitSet> getDimensionAndMetricColumBitSet(long cuboidId) {
+ private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
BitSet dimension = new BitSet();
dimension.set(0, bitSet.cardinality());
BitSet metrics = new BitSet();
- metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureNumber);
+ metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.desc.getMeasures().size());
return new Pair<BitSet, BitSet>(dimension, metrics);
}
- private Object[] buildKey(List<String> row, DataTypeSerializer[] serializers) {
+ private Object[] buildKey(List<String> row) {
int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
Object[] key = new Object[keySize];
@@ -280,7 +277,8 @@ public class InMemCubeBuilder implements Runnable {
@Override
public void run() {
try {
- createBaseCuboidGT();
+ logger.info("Create base cuboid " + baseCuboidId);
+ final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, true);
GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
@@ -339,7 +337,7 @@ public class InMemCubeBuilder implements Runnable {
}
};
- Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumBitSet(baseCuboidId);
+ Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
@@ -352,8 +350,18 @@ public class InMemCubeBuilder implements Runnable {
aggregationScanner.close();
logger.info("Base cuboid has " + counter + " rows;");
- if (counter > 0)
- createNDCuboidGT(null, -1l, baseCuboidId);
+ SimpleGridTableTree tree = new SimpleGridTableTree();
+ tree.data = baseCuboidGT;
+ tree.id = baseCuboidId;
+ tree.parent = null;
+ if (counter > 0) {
+ List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
+ Collections.sort(children);
+ for (Long childId : children) {
+ createNDCuboidGT(tree, baseCuboidId, childId);
+ }
+ }
+ baseCuboidGT.getStore().drop();
} catch (IOException e) {
logger.error("Fail to build cube", e);
@@ -364,7 +372,7 @@ public class InMemCubeBuilder implements Runnable {
private void buildGTRecord(List<String> row, GTRecord record) {
- Object[] dimensions = buildKey(row, serializers);
+ Object[] dimensions = buildKey(row);
Object[] metricsValues = buildValue(row);
Object[] recordValues = new Object[dimensions.length + metricsValues.length];
System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
@@ -372,43 +380,84 @@ public class InMemCubeBuilder implements Runnable {
record.setValues(recordValues);
}
- private void createBaseCuboidGT() throws IOException {
-
- logger.info("Create base cuboid " + baseCuboidId);
- Cuboid baseCuboid = Cuboid.findById(this.desc, baseCuboidId);
- serializers = new DataTypeSerializer[baseCuboid.getColumns().size()];
-
- for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
- serializers[i] = DataTypeSerializer.create(baseCuboid.getColumns().get(i).getType());
+ private boolean checkMemory(long threshold) {
+ final long freeMemory = Runtime.getRuntime().freeMemory();
+ logger.info("available memory:" + (freeMemory>>10) + " KB");
+ if (freeMemory >= threshold) {
+ logger.info("no need to flush to disk");
+ return true;
+ } else {
+ return false;
}
-
- this.baseCuboidGT = newGridTableByCuboidID(baseCuboidId);
}
+ private boolean gc(TreeNode<GridTable> parentNode) {
+ final long parentCuboidMem = SizeOfUtil.deepSizeOf(parentNode.data.getStore());
+ long threshold = (long) (parentCuboidMem * (AGGREGATION_CACHE_FACTOR + 1));
+ logger.info((threshold >> 10) + " KB is needed to create " + parentNode.id + "'s child");
+ if (checkMemory(threshold)) {
+ return true;
+ }
+ final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
+ for (TreeNode<GridTable> gridTable : gridTables) {
+ logger.info("wait 10 seconds for gc");
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ logger.error("this should not happen", e);
+ }
+ if (checkMemory(threshold)) {
+ return true;
+ } else {
+ logger.info("memory is low, try to select one node to flush to disk from:" + StringUtils.join(",", gridTables));
+ final IGTStore store = gridTable.data.getStore();
+ assert store instanceof GTComboStore;
+ if (store.memoryUsage() > 0) {
+ logger.info("cuboid id:" + gridTable.id + " selected, memory used:" + (SizeOfUtil.deepSizeOf(store)>>10) + " KB");
+ long t = System.currentTimeMillis();
+ ((GTComboStore) store).switchToDiskStore();
+ logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
+ }
+ }
+ }
+ logger.info("no store has been flushed to disk");
+ return true;
+ }
- private void createNDCuboidGT(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
+ private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
- GridTable thisCuboid;
long startTime = System.currentTimeMillis();
- if (parentCuboidId < 0) {
- thisCuboid = this.baseCuboidGT;
- } else {
- thisCuboid = aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
+ assert parentNode.data.getStore() instanceof GTComboStore;
+ if (parentNode.data.getStore().memoryUsage() <= 0) {
+ long t = System.currentTimeMillis();
+ ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
+ logger.info("switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
}
- logger.info("Cuboid " + cuboidId + " build takes (second): " + (System.currentTimeMillis() - startTime) / 1000);
-
- ArrayList<Long> children = (ArrayList<Long>) cuboidScheduler.getSpanningCuboid(cuboidId);
- Collections.sort(children); // sort cuboids
- for (Long childId : children) {
- createNDCuboidGT(thisCuboid, cuboidId, childId);
+ boolean inMem = gc(parentNode);
+ GridTable currentCuboid = aggregateCuboid(parentNode.data, parentCuboidId, cuboidId, inMem);
+ SimpleGridTableTree node = new SimpleGridTableTree();
+ node.parent = parentNode;
+ node.data = currentCuboid;
+ node.id = cuboidId;
+ parentNode.children.add(node);
+
+ logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
+
+ List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+ if (!children.isEmpty()) {
+ Collections.sort(children); // sort cuboids
+ for (Long childId : children) {
+ createNDCuboidGT(node, cuboidId, childId);
+ }
}
-
startTime = System.currentTimeMillis();
//output the grid table
- outputGT(cuboidId, thisCuboid);
- logger.info("Cuboid" + cuboidId + " output takes (second) " + (System.currentTimeMillis() - startTime) / 1000);
+ outputGT(cuboidId, currentCuboid);
+ currentCuboid.getStore().drop();
+ parentNode.children.remove(node);
+ logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
}
@@ -419,4 +468,30 @@ public class InMemCubeBuilder implements Runnable {
this.gtRecordWriter.write(cuboidId, record);
}
}
+
+ private static class TreeNode<T> {
+ T data;
+ long id;
+ TreeNode<T> parent;
+ List<TreeNode<T>> children = Lists.newArrayList();
+
+ List<TreeNode<T>> getAncestorList() {
+ ArrayList<TreeNode<T>> result = Lists.newArrayList();
+ TreeNode<T> parent = this.parent;
+ while (parent != null) {
+ result.add(parent);
+ parent = parent.parent;
+ }
+ return Lists.reverse(result);
+ }
+
+ @Override
+ public String toString() {
+ return id + "";
+ }
+ }
+
+ private static class SimpleGridTableTree extends TreeNode<GridTable> {}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index fc165ea..db690b9 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -104,7 +104,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- long timeout = 1000*60*60l; // 1 hour
+ long timeout = 1000*60*60L; // 1 hour
job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
Configuration conf = HBaseConfiguration.create(getConf());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index 41237d7..3ba80d1 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -72,7 +72,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
try {
mapContext.write(outputKey, outputValue);
} catch (InterruptedException e) {
- throw new IOException(e);
+ throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
index ac6f409..739cde4 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
@@ -20,8 +20,11 @@ package org.apache.kylin.metadata.serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.metadata.model.DataType;
@@ -31,20 +34,23 @@ import org.apache.kylin.metadata.model.DataType;
*/
abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
- final static HashMap<String, Class<?>> implementations = new HashMap<String, Class<?>>();
+ final static Map<String, Class<?>> implementations;
static {
- implementations.put("varchar", StringSerializer.class);
- implementations.put("decimal", BigDecimalSerializer.class);
- implementations.put("double", DoubleSerializer.class);
- implementations.put("float", DoubleSerializer.class);
- implementations.put("bigint", LongSerializer.class);
- implementations.put("long", LongSerializer.class);
- implementations.put("integer", LongSerializer.class);
- implementations.put("int", LongSerializer.class);
- implementations.put("smallint", LongSerializer.class);
- implementations.put("date", DateTimeSerializer.class);
- implementations.put("datetime", DateTimeSerializer.class);
- implementations.put("timestamp", DateTimeSerializer.class);
+ HashMap<String, Class<?>> impl = Maps.newHashMap();
+ impl.put("varchar", StringSerializer.class);
+ impl.put("decimal", BigDecimalSerializer.class);
+ impl.put("double", DoubleSerializer.class);
+ impl.put("float", DoubleSerializer.class);
+ impl.put("bigint", LongSerializer.class);
+ impl.put("long", LongSerializer.class);
+ impl.put("integer", LongSerializer.class);
+ impl.put("int", LongSerializer.class);
+ impl.put("smallint", LongSerializer.class);
+ impl.put("date", DateTimeSerializer.class);
+ impl.put("datetime", DateTimeSerializer.class);
+ impl.put("timestamp", DateTimeSerializer.class);
+ implementations = Collections.unmodifiableMap(impl);
+
}
public static DataTypeSerializer<?> create(String dataType) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
index 70567e5..9b7550f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
@@ -8,6 +8,7 @@ import java.util.Map;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dict.Dictionary;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
index f721148..0784587 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
@@ -49,13 +49,8 @@ public class CubeHBaseReadonlyStore implements IGTStore {
}
@Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public String getStorageDescription() {
- return cubeSeg.toString();
+ public long memoryUsage() {
+ return 0;
}
@Override
@@ -127,6 +122,11 @@ public class CubeHBaseReadonlyStore implements IGTStore {
};
}
+ @Override
+ public void drop() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
private Scan buildScan(ByteArray pkStart, ByteArray pkEnd, List<Pair<byte[], byte[]>> selectedColumns) {
Scan scan = new Scan();
scan.setCaching(SCAN_CACHE);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
index 7195e7f..7552ab3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
@@ -51,7 +51,6 @@ public class GTBuilder implements Closeable, Flushable {
blockWriter.readyForFlush();
storeWriter.write(block);
writtenRowBlockCount++;
-
if (block.isFull()) {
blockWriter.clearForNext();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
new file mode 100644
index 0000000..c7d0c2b
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
@@ -0,0 +1,112 @@
+package org.apache.kylin.storage.gridtable;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.storage.gridtable.diskstore.GTDiskStore;
+import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+/**
+ * Created by qianzhou on 5/6/15.
+ */
+public class GTComboStore implements IGTStore {
+
+ private static final Logger logger = LoggerFactory.getLogger(GTComboStore.class);
+
+ private final GTInfo gtInfo;
+
+ private void convert(IGTStore input, IGTStore output) throws IOException {
+ final IGTStoreScanner scanner = input.scan(ScanKey.makeScanKey(gtInfo, new GTRecord(gtInfo)), ScanKey.makeScanKey(gtInfo, new GTRecord(gtInfo)), null, null);
+ final IGTStoreWriter writer = output.rebuild(-1);
+ while (scanner.hasNext()) {
+ writer.write(scanner.next());
+ }
+ }
+
+ private GTDiskStore gtDiskStore;
+ private GTSimpleMemStore gtSimpleMemStore;
+
+ public GTComboStore(GTInfo gtInfo) {
+ this(gtInfo, true);
+ }
+
+ public GTComboStore(GTInfo gtInfo, boolean useMemStore) {
+ this.gtInfo = gtInfo;
+ if (useMemStore) {
+ this.gtSimpleMemStore = new GTSimpleMemStore(gtInfo);
+ } else {
+ this.gtDiskStore = new GTDiskStore(gtInfo);
+ }
+ }
+
+ private IGTStore getCurrent() {
+ if (gtSimpleMemStore != null) {
+ return gtSimpleMemStore;
+ } else {
+ return gtDiskStore;
+ }
+ }
+
+ public void switchToMemStore() {
+ try {
+ if (gtSimpleMemStore == null) {
+ gtSimpleMemStore = new GTSimpleMemStore(gtInfo);
+ convert(gtDiskStore, gtSimpleMemStore);
+ gtDiskStore.drop();
+ gtDiskStore = null;
+ }
+ } catch (IOException e) {
+ logger.error("fail to switch to mem store", e);
+ throw new RuntimeException(e);
+ }
+ logger.info("switch to mem store");
+ }
+
+ public void switchToDiskStore() {
+ try {
+ if (gtDiskStore == null) {
+ gtDiskStore = new GTDiskStore(gtInfo);
+ convert(gtSimpleMemStore, gtDiskStore);
+ gtSimpleMemStore.drop();
+ gtSimpleMemStore = null;
+ }
+ } catch (IOException e) {
+ logger.error("fail to switch to disk store", e);
+ throw new RuntimeException(e);
+ }
+ logger.info("switch to disk store");
+ }
+
+ @Override
+ public long memoryUsage() {
+ return getCurrent().memoryUsage();
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return getCurrent().rebuild(shard);
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ return getCurrent().append(shard, fillLast);
+ }
+
+ @Override
+ public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ return getCurrent().scan(pkStart, pkEnd, selectedColBlocks, additionalPushDown);
+ }
+
+ @Override
+ public void drop() throws IOException {
+ if (gtSimpleMemStore != null) {
+ gtSimpleMemStore.drop();
+ }
+ if (gtDiskStore != null) {
+ gtDiskStore.drop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
index 954e464..fdabb60 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
@@ -17,7 +17,6 @@ public class GTInfo {
IGTCodeSystem codeSystem;
// column schema
- int nColumns;
DataType[] colTypes;
BitSet colAll;
BitSet colPreferIndex;
@@ -37,7 +36,7 @@ public class GTInfo {
}
public int getColumnCount() {
- return nColumns;
+ return colTypes.length;
}
public DataType getColumnType(int i) {
@@ -74,7 +73,7 @@ public class GTInfo {
public int getMaxColumnLength() {
int max = 0;
- for (int i = 0; i < nColumns; i++)
+ for (int i = 0; i < colTypes.length; i++)
max = Math.max(max, codeSystem.maxCodeLength(i));
return max;
}
@@ -95,7 +94,7 @@ public class GTInfo {
public TblColRef colRef(int i) {
if (colRefs == null) {
- colRefs = new TblColRef[nColumns];
+ colRefs = new TblColRef[colTypes.length];
}
if (colRefs[i] == null) {
colRefs[i] = GTUtil.tblColRef(i, colTypes[i].toString());
@@ -124,7 +123,7 @@ public class GTInfo {
private void validateColumnBlocks() {
colAll = new BitSet();
- colAll.flip(0, nColumns);
+ colAll.flip(0, colTypes.length);
if (colBlocks == null) {
colBlocks = new BitSet[2];
@@ -185,7 +184,6 @@ public class GTInfo {
/** required */
public Builder setColumns(DataType... colTypes) {
- info.nColumns = colTypes.length;
info.colTypes = colTypes;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
index 7c6abec..2756659 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
@@ -34,7 +34,7 @@ public class GTInvertedIndex {
this.colPreferIndex = info.colPreferIndex;
this.colBlocks = info.selectColumnBlocks(colPreferIndex);
- index = new GTInvertedIndexOfColumn[info.nColumns];
+ index = new GTInvertedIndexOfColumn[info.getColumnCount()];
for (int i = colPreferIndex.nextSetBit(0); i >= 0; i = colPreferIndex.nextSetBit(i + 1)) {
index[i] = new GTInvertedIndexOfColumn(info.codeSystem.getFilterCodeSystem());
}
@@ -43,7 +43,7 @@ public class GTInvertedIndex {
public void add(GTRowBlock block) {
@SuppressWarnings("unchecked")
- Set<ByteArray>[] distinctValues = new Set[info.nColumns];
+ Set<ByteArray>[] distinctValues = new Set[info.getColumnCount()];
for (int i = colPreferIndex.nextSetBit(0); i >= 0; i = colPreferIndex.nextSetBit(i + 1)) {
distinctValues[i] = new HashSet<ByteArray>();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
index 895ccf3..ff97cd5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
@@ -24,34 +24,14 @@ public class GTRawScanner implements IGTScanner {
public GTRawScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
this.info = info;
- ByteArray start = makeScanKey(req.getPkStart());
- ByteArray end = makeScanKey(req.getPkEnd());
+ ByteArray start = ScanKey.makeScanKey(info, req.getPkStart());
+ ByteArray end = ScanKey.makeScanKey(info, req.getPkEnd());
this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
this.storeScanner = store.scan(start, end, selectedColBlocks, req);
this.oneRecord = new GTRecord(info);
}
- private ByteArray makeScanKey(GTRecord rec) {
- int firstPKCol = info.primaryKey.nextSetBit(0);
- if (rec == null || rec.cols[firstPKCol].array() == null)
- return null;
-
- BitSet selectedColumns = new BitSet();
- int len = 0;
- for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) {
- if (rec.cols[i].array() == null) {
- break;
- }
- selectedColumns.set(i);
- len += rec.cols[i].length();
- }
-
- ByteArray buf = ByteArray.allocate(len);
- rec.exportColumns(selectedColumns, buf);
- return buf;
- }
-
@Override
public GTInfo getInfo() {
return info;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
index 8516f05..2a38731 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
@@ -16,7 +16,7 @@ public class GTRecord implements Comparable<GTRecord> {
public GTRecord(GTInfo info) {
this.info = info;
- this.cols = new ByteArray[info.nColumns];
+ this.cols = new ByteArray[info.getColumnCount()];
for (int i = 0; i < cols.length; i++)
this.cols[i] = new ByteArray();
this.maskForEqualHashComp = info.colAll;
@@ -55,7 +55,7 @@ public class GTRecord implements Comparable<GTRecord> {
/** decode and return the values of this record */
public Object[] getValues() {
- return getValues(info.colAll, new Object[info.nColumns]);
+ return getValues(info.colAll, new Object[info.getColumnCount()]);
}
/** decode and return the values of this record */
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
index 7f79948..ec24da6 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
@@ -1,5 +1,7 @@
package org.apache.kylin.storage.gridtable;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
@@ -54,7 +56,7 @@ public class GTRowBlock {
public Writer getWriter() {
return new Writer();
}
-
+
public class Writer {
ByteBuffer[] cellBlockBuffers;
@@ -181,6 +183,21 @@ public class GTRowBlock {
return len;
}
+ public void export(DataOutputStream dataOutputStream) throws IOException {
+ dataOutputStream.writeInt(seqId);
+ dataOutputStream.writeInt(nRows);
+ export(dataOutputStream, primaryKey);
+ for (ByteArray cb : cellBlocks) {
+ export(dataOutputStream, cb);
+ }
+ }
+
+ public void export(DataOutputStream dataOutputStream, ByteArray array) throws IOException {
+ dataOutputStream.writeInt(array.length());
+ dataOutputStream.write(array.array(), array.offset(), array.length());
+ }
+
+
/** write data to given buffer, like serialize */
public void export(ByteBuffer buf) {
buf.putInt(seqId);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
index cb8698c..9c758fa 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
@@ -3,6 +3,7 @@ package org.apache.kylin.storage.gridtable;
import java.nio.ByteBuffer;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.filter.IFilterCodeSystem;
import org.apache.kylin.metadata.measure.MeasureAggregator;
@@ -30,8 +31,8 @@ public class GTSampleCodeSystem implements IGTCodeSystem {
public void init(GTInfo info) {
this.info = info;
- this.serializers = new DataTypeSerializer[info.nColumns];
- for (int i = 0; i < info.nColumns; i++) {
+ this.serializers = new DataTypeSerializer[info.getColumnCount()];
+ for (int i = 0; i < info.getColumnCount(); i++) {
this.serializers[i] = DataTypeSerializer.create(info.colTypes[i]);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
index 285a301..3d3c3c8 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
@@ -4,10 +4,10 @@ import java.io.Closeable;
public interface IGTScanner extends Iterable<GTRecord>, Closeable {
- public GTInfo getInfo();
+ GTInfo getInfo();
- public int getScannedRowCount();
+ int getScannedRowCount();
- public int getScannedRowBlockCount();
+ int getScannedRowBlockCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
index f5eb077..0152571 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
@@ -8,22 +8,22 @@ import java.util.Iterator;
import org.apache.kylin.common.util.ByteArray;
public interface IGTStore {
+
+ long memoryUsage();
- public GTInfo getInfo();
-
- public String getStorageDescription();
-
- public IGTStoreWriter rebuild(int shard) throws IOException;
+ IGTStoreWriter rebuild(int shard) throws IOException;
- public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException;
+ IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException;
- public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException;
+ IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException;
+
+ void drop() throws IOException;
- public interface IGTStoreWriter extends Closeable {
+ interface IGTStoreWriter extends Closeable {
void write(GTRowBlock block) throws IOException;
}
- public interface IGTStoreScanner extends Iterator<GTRowBlock>, Closeable {
+ interface IGTStoreScanner extends Iterator<GTRowBlock>, Closeable {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java
new file mode 100644
index 0000000..5c0c436
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.storage.gridtable;
+
+import org.apache.kylin.common.util.ByteArray;
+
+import java.util.BitSet;
+
+/**
+ * Created by qianzhou on 5/6/15.
+ */
+public final class ScanKey {
+
+ private ScanKey() {
+ }
+
+ static ByteArray makeScanKey(GTInfo info, GTRecord rec) {
+ int firstPKCol = info.primaryKey.nextSetBit(0);
+ if (rec == null || rec.cols[firstPKCol].array() == null)
+ return null;
+
+ BitSet selectedColumns = new BitSet();
+ int len = 0;
+ for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) {
+ if (rec.cols[i].array() == null) {
+ break;
+ }
+ selectedColumns.set(i);
+ len += rec.cols[i].length();
+ }
+
+ ByteArray buf = ByteArray.allocate(len);
+ rec.exportColumns(selectedColumns, buf);
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
new file mode 100644
index 0000000..2ab2c7e
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
@@ -0,0 +1,22 @@
+package org.apache.kylin.storage.gridtable.diskstore;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Created by qianzhou on 5/4/15.
+ */
+interface FileSystem {
+
+ boolean checkExistence(String path);
+
+ boolean delete(String path);
+
+ boolean createDirectory(String path);
+
+ boolean createFile(String path);
+
+ OutputStream getWriter(String path);
+
+ InputStream getReader(String path);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
new file mode 100644
index 0000000..f48fce3
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
@@ -0,0 +1,160 @@
+package org.apache.kylin.storage.gridtable.diskstore;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.UUID;
+
+/**
+ * Created by qianzhou on 5/4/15.
+ */
+public class GTDiskStore implements IGTStore {
+
+ private static final Logger logger = LoggerFactory.getLogger(GTDiskStore.class);
+
+ private final String identifier;
+ private final FileSystem fileSystem;
+ private final DiskStoreWriter writer;
+ private final GTInfo gtInfo;
+
+ public GTDiskStore(GTInfo gtInfo) {
+ this.gtInfo = gtInfo;
+ this.fileSystem = new LocalFileSystem();
+ this.identifier = generateIdentifier(fileSystem);
+ logger.info("disk store created, identifier:" + identifier);
+ this.writer = new DiskStoreWriter(fileSystem.getWriter(getRowBlockFile(identifier)));
+ }
+
+ private String generateIdentifier(FileSystem fs) {
+ while (true) {
+ String identifier = UUID.randomUUID().toString();
+ final String path = getRootDirectory(identifier);
+ if (fs.createDirectory(path)) {
+ return identifier;
+ }
+ }
+ }
+
+ private String getRootDirectory(String identifier) {
+ return "/tmp/kylin/gtdiskstore/" + identifier;
+ }
+
+ private String getRowBlockFile(String identifier) {
+ return getRootDirectory(identifier) + "/rowblock";
+ }
+
+ private class DiskStoreWriter implements IGTStoreWriter {
+
+ private final DataOutputStream outputStream;
+
+ DiskStoreWriter(OutputStream outputStream) {
+ this.outputStream = new DataOutputStream(outputStream);
+ }
+
+ @Override
+ public void write(GTRowBlock block) throws IOException {
+ final int blockSize = block.exportLength();
+ outputStream.writeInt(blockSize);
+ block.export(outputStream);
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream.close();
+ }
+ }
+
+ @Override
+ public long memoryUsage() {
+ return 0;
+ }
+
+ @Override
+ public IGTStoreWriter rebuild(int shard) throws IOException {
+ return writer;
+ }
+
+ @Override
+ public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+ return writer;
+ }
+
+ private class DiskStoreScanner implements IGTStoreScanner {
+
+ private final DataInputStream inputStream;
+ private int blockSize = 0;
+
+ DiskStoreScanner(InputStream inputStream) {
+ this.inputStream = new DataInputStream(inputStream);
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ blockSize = inputStream.readInt();
+ return blockSize > 0;
+ } catch (EOFException e) {
+ return false;
+ } catch (IOException e) {
+ logger.error("input stream fail", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public GTRowBlock next() {
+ GTRowBlock block = new GTRowBlock(gtInfo);
+ ByteBuffer buffer = ByteBuffer.allocate(blockSize);
+ int count = blockSize;
+ while (count > 0) {
+ try {
+ count -= inputStream.read(buffer.array(), buffer.position(), buffer.remaining());
+ } catch (IOException e) {
+ logger.error("input stream fail", e);
+ throw new RuntimeException(e);
+ }
+ }
+ Preconditions.checkArgument(count == 0, "invalid read count:" + count + " block size:" + blockSize);
+ block.load(buffer);
+ return block;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+ return new DiskStoreScanner(fileSystem.getReader(getRowBlockFile(identifier)));
+ }
+
+ @Override
+ public void drop() throws IOException {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ logger.error("error to close writer", e);
+ }
+ fileSystem.delete(getRowBlockFile(identifier));
+ fileSystem.delete(getRootDirectory(identifier));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
new file mode 100644
index 0000000..e1efd1b
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
@@ -0,0 +1,88 @@
+package org.apache.kylin.storage.gridtable.diskstore;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Created by qianzhou on 5/6/15.
+ */
+public class HadoopFileSystem implements FileSystem {
+
+ private static final Logger logger = LoggerFactory.getLogger(HadoopFileSystem.class);
+
+ final org.apache.hadoop.fs.FileSystem fileSystem;
+
+ public HadoopFileSystem() {
+ try {
+ fileSystem = org.apache.hadoop.fs.FileSystem.get(HadoopUtil.getCurrentConfiguration());
+ } catch (IOException e) {
+ logger.error("error construct HadoopFileSystem", e);
+ throw new RuntimeException(e);
+ }
+ }
+ @Override
+ public boolean checkExistence(String path) {
+ try {
+ return fileSystem.exists(new Path(path));
+ } catch (IOException e) {
+ logger.error("error checkExistence, path:" + path, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean delete(String path) {
+ try {
+ return fileSystem.delete(new Path(path), true);
+ } catch (IOException e) {
+ logger.error("error delete, path:" + path, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean createDirectory(String path) {
+ try {
+ return fileSystem.mkdirs(new Path(path));
+ } catch (IOException e) {
+ logger.error("error createDirectory, path:" + path, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean createFile(String path) {
+ try {
+ return fileSystem.createNewFile(new Path(path));
+ } catch (IOException e) {
+ logger.error("error createFile, path:" + path, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public OutputStream getWriter(String path) {
+ try {
+ return fileSystem.create(new Path(path));
+ } catch (IOException e) {
+ logger.error("error getWriter, path:" + path, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputStream getReader(String path) {
+ try {
+ return fileSystem.open(new Path(path));
+ } catch (IOException e) {
+ logger.error("error getReader, path:" + path, e);
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
new file mode 100644
index 0000000..1c14e3f
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
@@ -0,0 +1,60 @@
+package org.apache.kylin.storage.gridtable.diskstore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+/**
+ * Created by qianzhou on 5/4/15.
+ */
+class LocalFileSystem implements FileSystem {
+
+ private static Logger logger = LoggerFactory.getLogger(LocalFileSystem.class);
+ @Override
+ public boolean checkExistence(String path) {
+ return new File(path).exists();
+ }
+
+ @Override
+ public boolean delete(String path) {
+ return new File(path).delete();
+ }
+
+ @Override
+ public boolean createDirectory(String path) {
+ return new File(path).mkdirs();
+ }
+
+ @Override
+ public boolean createFile(String path) {
+ try {
+ return new File(path).createNewFile();
+ } catch (IOException e) {
+ logger.warn("create file failed:" + path, e);
+ return false;
+ }
+ }
+
+ @Override
+ public OutputStream getWriter(String path) {
+ try {
+ return new FileOutputStream(path);
+ } catch (FileNotFoundException e) {
+ //should not happen
+ logger.error("path:" + path + " nout found");
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputStream getReader(String path) {
+ try {
+ return new FileInputStream(path);
+ } catch (FileNotFoundException e) {
+ //should not happen
+ logger.error("path:" + path + " nout found");
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index 32c7f36..329c048 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -1,44 +1,41 @@
package org.apache.kylin.storage.gridtable.memstore;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.storage.gridtable.GTInfo;
import org.apache.kylin.storage.gridtable.GTRowBlock;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.IGTStore;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+
public class GTSimpleMemStore implements IGTStore {
- final GTInfo info;
final List<GTRowBlock> rowBlockList;
public GTSimpleMemStore(GTInfo info) {
- this.info = info;
- this.rowBlockList = new ArrayList<GTRowBlock>();
+ this.rowBlockList = Lists.newLinkedList();
if (info.isShardingEnabled())
throw new UnsupportedOperationException();
}
@Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public String getStorageDescription() {
- return this.toString();
+ public long memoryUsage() {
+ if (rowBlockList.size() == 0) {
+ return 0;
+ } else {
+ return rowBlockList.get(0).exportLength() * Long.valueOf(rowBlockList.size());
+ }
}
@Override
public IGTStoreWriter rebuild(int shard) {
rowBlockList.clear();
- return new Writer();
+ return new Writer(rowBlockList);
}
@Override
@@ -47,10 +44,16 @@ public class GTSimpleMemStore implements IGTStore {
GTRowBlock last = rowBlockList.get(rowBlockList.size() - 1);
fillLast.copyFrom(last);
}
- return new Writer();
+ return new Writer(rowBlockList);
}
- private class Writer implements IGTStoreWriter {
+ private static class Writer implements IGTStoreWriter {
+
+ private final List<GTRowBlock> rowBlockList;
+
+ Writer(List<GTRowBlock> rowBlockList) {
+ this.rowBlockList = rowBlockList;
+ }
@Override
public void close() throws IOException {
}
@@ -66,7 +69,7 @@ public class GTSimpleMemStore implements IGTStore {
rowBlockList.add(copy);
}
}
- };
+ }
@Override
public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) {
@@ -95,4 +98,10 @@ public class GTSimpleMemStore implements IGTStore {
};
}
+ @Override
+ public void drop() throws IOException {
+ //will there be any concurrent issue? If yes, ArrayList should be replaced
+ rowBlockList.clear();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java b/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java
new file mode 100644
index 0000000..d390415
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java
@@ -0,0 +1,21 @@
+package org.apache.kylin.storage.util;
+
+import net.sf.ehcache.pool.sizeof.ReflectionSizeOf;
+
+/**
+ * Created by qianzhou on 5/11/15.
+ */
+public final class SizeOfUtil {
+
+ private SizeOfUtil(){}
+
+ private static final ReflectionSizeOf DEFAULT_SIZE_OF = new ReflectionSizeOf();
+
+ public static final long deepSizeOf(Object obj) {
+ return DEFAULT_SIZE_OF.deepSizeOf(Integer.MAX_VALUE, true, obj).getCalculated();
+ }
+
+ public static final long sizeOf(Object obj) {
+ return DEFAULT_SIZE_OF.sizeOf(obj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 773425b..7c1d435 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -71,10 +71,6 @@ public class IIStreamBuilder extends StreamBuilder {
private StreamingManager streamingManager;
public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard) {
- this(queue, streaming, hTableName, iiDesc, shard, true);
- }
-
- public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard, boolean useLocalDict) {
super(queue, iiDesc.getSliceSize());
this.streaming = streaming;
this.desc = iiDesc;
@@ -86,7 +82,7 @@ public class IIStreamBuilder extends StreamBuilder {
logger.error("cannot open htable name:" + hTableName, e);
throw new RuntimeException("cannot open htable name:" + hTableName, e);
}
- this.sliceBuilder = new SliceBuilder(desc, (short) shard, useLocalDict);
+ this.sliceBuilder = new SliceBuilder(desc, (short) shard, iiDesc.isUseLocalDictionary());
this.streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
}