You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/28 14:35:56 UTC
[2/3] incubator-kylin git commit: streaming cubing
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be5b1c21/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
new file mode 100644
index 0000000..e5a5b5f
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
@@ -0,0 +1,567 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+package org.apache.kylin.streaming.cube;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CubeGridTable;
+import org.apache.kylin.storage.gridtable.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ */
+public class InMemCubeBuilder implements Runnable {
+
+ private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
+ private static final int DEFAULT_TIMEOUT = 25;
+
+ private BlockingQueue<List<String>> queue;
+ private CubeDesc desc = null;
+ private long baseCuboidId;
+ private CuboidScheduler cuboidScheduler = null;
+ private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
+ private CubeJoinedFlatTableDesc intermediateTableDesc;
+ private MeasureCodec measureCodec;
+ private String[] metricsAggrFuncs = null;
+ private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
+ public static final LongWritable ONE = new LongWritable(1l);
+ private int[] hbaseMeasureRefIndex;
+ private MeasureDesc[] measureDescs;
+ private int measureCount;
+
+ protected IGTRecordWriter gtRecordWriter;
+
+
+ /**
+ * @param queue
+ * @param cube
+ * @param dictionaryMap
+ * @param gtRecordWriter
+ */
+ public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
+ if (dictionaryMap == null || dictionaryMap.isEmpty()) {
+ throw new IllegalArgumentException("dictionary cannot be empty");
+ }
+ this.queue = queue;
+ this.desc = cube.getDescriptor();
+ this.cuboidScheduler = new CuboidScheduler(desc);
+ this.dictionaryMap = dictionaryMap;
+ this.gtRecordWriter = gtRecordWriter;
+ this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
+ this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+ this.measureCodec = new MeasureCodec(desc.getMeasures());
+
+ Map<String, Integer> measureIndexMap = Maps.newHashMap();
+ List<String> metricsAggrFuncsList = Lists.newArrayList();
+ measureCount = desc.getMeasures().size();
+
+ List<MeasureDesc> measureDescsList = Lists.newArrayList();
+ hbaseMeasureRefIndex = new int[measureCount];
+ int measureRef = 0;
+ for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+ for (int j = 0; j < measureCount; j++) {
+ if (desc.getMeasures().get(j).equals(measure)) {
+ measureDescsList.add(measure);
+ hbaseMeasureRefIndex[measureRef] = j;
+ break;
+ }
+ }
+ measureRef++;
+ }
+ }
+ }
+
+ for (int i = 0; i < measureCount; i++) {
+ MeasureDesc measureDesc = measureDescsList.get(i);
+ metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+ measureIndexMap.put(measureDesc.getName(), i);
+ }
+ this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+
+ this.dependentMeasures = Maps.newHashMap();
+ for (int i = 0; i < measureCount; i++) {
+ String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
+ if (depMsrRef != null) {
+ int index = measureIndexMap.get(depMsrRef);
+ dependentMeasures.put(i, index);
+ }
+ }
+
+ this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
+ }
+
+
+ private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
+ GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
+ GTComboStore store = new GTComboStore(info, memStore);
+ GridTable gridTable = new GridTable(info, store);
+ return gridTable;
+ }
+
+ private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
+ Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
+ BitSet parentDimensions = columnBitSets.getFirst();
+ BitSet measureColumns = columnBitSets.getSecond();
+ BitSet childDimensions = (BitSet) parentDimensions.clone();
+
+ long mask = Long.highestOneBit(parentCuboidId);
+ long childCuboidId = cuboidId;
+ long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId);
+ int index = 0;
+ for (int i = 0; i < parentCuboidIdActualLength; i++) {
+ if ((mask & parentCuboidId) > 0) {
+ if ((mask & childCuboidId) == 0) {
+ // this dim will be aggregated
+ childDimensions.set(index, false);
+ }
+ index++;
+ }
+ mask = mask >> 1;
+ }
+
+ return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
+
+ }
+
+ private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+ IGTScanner scanner = gridTable.scan(req);
+ GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
+ GTBuilder builder = newGridTable.rebuild();
+
+ BitSet allNeededColumns = new BitSet();
+ allNeededColumns.or(aggregationColumns);
+ allNeededColumns.or(measureColumns);
+
+ GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+ int counter = 0;
+ ByteArray byteArray = new ByteArray(8);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+ try {
+ BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
+ for (Integer i : dependentMeasures.keySet()) {
+ dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
+ }
+
+ Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
+
+ for (GTRecord record : scanner) {
+ counter++;
+ for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
+ newRecord.set(index, record.get(i));
+ }
+
+ if(dependentMeasures.size() > 0) {
+ // update measures which have 'dependent_measure_ref'
+ newRecord.getValues(dependentMetrics, hllObjects);
+
+ for (Integer i : dependentMeasures.keySet()) {
+ for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
+ if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
+ assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
+
+ byteBuffer.clear();
+ BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
+ byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
+ newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
+ }
+ }
+
+ }
+ }
+
+ builder.write(newRecord);
+ }
+ } finally {
+ builder.close();
+ }
+ logger.info("Cuboid " + cuboidId + " has rows: " + counter);
+
+ return newGridTable;
+ }
+
+ private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ BitSet dimension = new BitSet();
+ dimension.set(0, bitSet.cardinality());
+ BitSet metrics = new BitSet();
+ metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
+ return new Pair<BitSet, BitSet>(dimension, metrics);
+ }
+
+ private Object[] buildKey(List<String> row) {
+ int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+ Object[] key = new Object[keySize];
+
+ for (int i = 0; i < keySize; i++) {
+ key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ }
+
+ return key;
+ }
+
+ private Object[] buildValue(List<String> row) {
+
+ Object[] values = new Object[measureCount];
+ MeasureDesc measureDesc = null;
+
+ for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
+ int i = hbaseMeasureRefIndex[position];
+ measureDesc = measureDescs[i];
+
+ Object value = null;
+ int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
+ FunctionDesc function = desc.getMeasures().get(i).getFunction();
+ if (function.isCount() || function.isHolisticCountDistinct()) {
+ // note for holistic count distinct, this value will be ignored
+ value = ONE;
+ } else if (flatTableIdx == null) {
+ value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+ } else if (flatTableIdx.length == 1) {
+ value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
+ } else {
+
+ byte[] result = null;
+ for (int x = 0; x < flatTableIdx.length; x++) {
+ byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
+ if (result == null) {
+ result = Arrays.copyOf(split, split.length);
+ } else {
+ byte[] newResult = new byte[result.length + split.length];
+ System.arraycopy(result, 0, newResult, 0, result.length);
+ System.arraycopy(split, 0, newResult, result.length, split.length);
+ result = newResult;
+ }
+ }
+ value = measureCodec.getSerializer(i).valueOf(result);
+ }
+ values[position] = value;
+ }
+ return values;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ logger.info("Create base cuboid " + baseCuboidId);
+ final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
+
+ GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
+ final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
+
+ IGTScanner queueScanner = new IGTScanner() {
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ List<String> currentObject = null;
+
+ @Override
+ public boolean hasNext() {
+ try {
+ currentObject = queue.take();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return currentObject != null && currentObject.size() > 0;
+ }
+
+ @Override
+ public GTRecord next() {
+ if (currentObject.size() == 0)
+ throw new IllegalStateException();
+
+ buildGTRecord(currentObject, baseGTRecord);
+ return baseGTRecord;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return baseCuboidGT.getInfo();
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return 0;
+ }
+
+ @Override
+ public int getScannedRowBlockCount() {
+ return 0;
+ }
+ };
+
+ Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
+ GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
+ IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
+
+ int counter = 0;
+ for (GTRecord r : aggregationScanner) {
+ baseGTBuilder.write(r);
+ counter++;
+ }
+ baseGTBuilder.close();
+ aggregationScanner.close();
+
+ logger.info("Base cuboid has " + counter + " rows;");
+ SimpleGridTableTree tree = new SimpleGridTableTree();
+ tree.data = baseCuboidGT;
+ tree.id = baseCuboidId;
+ tree.parent = null;
+ if (counter > 0) {
+ List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
+ Collections.sort(children);
+ for (Long childId : children) {
+ createNDCuboidGT(tree, baseCuboidId, childId);
+ }
+ }
+ outputGT(baseCuboidId, baseCuboidGT);
+ dropStore(baseCuboidGT);
+
+ } catch (IOException e) {
+ logger.error("Fail to build cube", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private void buildGTRecord(List<String> row, GTRecord record) {
+
+ Object[] dimensions = buildKey(row);
+ Object[] metricsValues = buildValue(row);
+ Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+ System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+ System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+ record.setValues(recordValues);
+ }
+
+ private boolean gc(TreeNode<GridTable> parentNode) {
+ final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
+ logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
+ for (TreeNode<GridTable> gridTable : gridTables) {
+ final GTComboStore store = (GTComboStore) gridTable.data.getStore();
+ if (store.memoryUsage() > 0) {
+ logger.info("cuboid id:" + gridTable.id + " flush to disk");
+ long t = System.currentTimeMillis();
+ store.switchToDiskStore();
+ logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
+ waitForGc();
+ return true;
+ }
+ }
+ logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
+ return false;
+
+ }
+
+ private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
+ @Override
+ public GridTable call() throws Exception {
+ return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
+ }
+ });
+ try {
+ final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+ return gridTable;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("this should not happen", e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof OutOfMemoryError) {
+ logger.warn("Future.get() OutOfMemory, stop the thread");
+ } else {
+ throw new RuntimeException("this should not happen", e);
+ }
+ } catch (TimeoutException e) {
+ logger.warn("Future.get() timeout, stop the thread");
+ }
+ logger.info("shutdown executor service");
+ final List<Runnable> runnables = executorService.shutdownNow();
+ try {
+ executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+ waitForGc();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("this should not happen", e);
+ }
+ return null;
+
+ }
+
+ private void waitForGc() {
+ System.gc();
+ logger.info("wait 5 seconds for gc");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("should not happen", e);
+ }
+ }
+
+ private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
+
+ long startTime = System.currentTimeMillis();
+
+// GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
+// if (parentStore.memoryUsage() <= 0) {
+// long t = System.currentTimeMillis();
+// parentStore.switchToMemStore();
+// logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
+// }
+
+ GridTable currentCuboid;
+ while (true) {
+ logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
+ currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
+ if (currentCuboid != null) {
+ break;
+ } else {
+ logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
+ if (gc(parentNode)) {
+ continue;
+ } else {
+ logger.warn("all parent node has been flushed into disk, memory is still insufficient");
+ throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
+ }
+ }
+ }
+ SimpleGridTableTree node = new SimpleGridTableTree();
+ node.parent = parentNode;
+ node.data = currentCuboid;
+ node.id = cuboidId;
+ parentNode.children.add(node);
+
+ logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
+
+ List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+ if (!children.isEmpty()) {
+ Collections.sort(children); // sort cuboids
+ for (Long childId : children) {
+ createNDCuboidGT(node, cuboidId, childId);
+ }
+ }
+
+
+ //output the grid table
+ outputGT(cuboidId, currentCuboid);
+ dropStore(currentCuboid);
+ parentNode.children.remove(node);
+ if (parentNode.children.size() > 0) {
+ logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
+ ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
+ }
+ }
+
+ private void dropStore(GridTable gt) throws IOException {
+ ((GTComboStore) gt.getStore()).drop();
+ }
+
+
+ private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
+ long startTime = System.currentTimeMillis();
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+ IGTScanner scanner = gridTable.scan(req);
+ for (GTRecord record : scanner) {
+ this.gtRecordWriter.write(cuboidId, record);
+ }
+ logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+
+ private static class TreeNode<T> {
+ T data;
+ long id;
+ TreeNode<T> parent;
+ List<TreeNode<T>> children = Lists.newArrayList();
+
+ List<TreeNode<T>> getAncestorList() {
+ ArrayList<TreeNode<T>> result = Lists.newArrayList();
+ TreeNode<T> parent = this;
+ while (parent != null) {
+ result.add(parent);
+ parent = parent.parent;
+ }
+ return Lists.reverse(result);
+ }
+
+ @Override
+ public String toString() {
+ return id + "";
+ }
+ }
+
+ private static class SimpleGridTableTree extends TreeNode<GridTable> {
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be5b1c21/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 755b490..56435bd 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -61,16 +61,19 @@ import java.util.concurrent.TimeUnit;
public class IIStreamBuilder extends StreamBuilder {
private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
+ private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000;
private final IIDesc desc;
private final HTableInterface hTable;
private final SliceBuilder sliceBuilder;
private final int shardId;
private final String streaming;
+ private final int batchSize;
private StreamingManager streamingManager;
public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard) {
- super(queue, iiDesc.getSliceSize());
+ super(queue);
+ this.batchSize = iiDesc.getSliceSize();
this.streaming = streaming;
this.desc = iiDesc;
this.shardId = shard;
@@ -117,6 +120,16 @@ public class IIStreamBuilder extends StreamBuilder {
}
}
+ @Override
+ protected int batchInterval() {
+ return BATCH_BUILD_INTERVAL_THRESHOLD;
+ }
+
+ @Override
+ protected int batchSize() {
+ return batchSize;
+ }
+
private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
List<Put> data = Lists.newArrayList();
for (IIRow row : codec.encodeKeyValue(slice)) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be5b1c21/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
new file mode 100644
index 0000000..b530cdc
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
@@ -0,0 +1,117 @@
+package org.apache.kylin.streaming.cube;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
+
+ private static final int BENCHMARK_RECORD_LIMIT = 2000000;
+ private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
+ final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+ final CubeDesc desc = cubeSegment.getCubeDesc();
+ for (DimensionDesc dim : desc.getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (desc.getRowkey().isUseDictionary(col)) {
+ Dictionary dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
+ }
+ logger.info("Dictionary for " + col + " was put into dictionary map.");
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
+ }
+ }
+ return dictionaryMap;
+ }
+
+ private static class ConsoleGTRecordWriter implements IGTRecordWriter {
+
+ boolean verbose = false;
+
+ @Override
+ public void write(Long cuboidId, GTRecord record) throws IOException {
+ if (verbose)
+ System.out.println(record.toString());
+ }
+ }
+
+ private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+ String line;
+ int counter = 0;
+ while ((line = br.readLine()) != null) {
+ queue.put(Arrays.asList(line.split("\t")));
+ counter++;
+ if (counter == BENCHMARK_RECORD_LIMIT) {
+ break;
+ }
+ }
+ queue.put(Collections.emptyList());
+ }
+
+ private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+ queue.put(Collections.emptyList());
+ }
+
+
+ @Test
+ public void test() throws Exception {
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+ final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
+ final CubeSegment cubeSegment = cube.getFirstSegment();
+
+ LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
+
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(cubeBuilder);
+ loadDataFromLocalFile(queue);
+ future.get();
+ logger.info("stream build finished");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be5b1c21/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
deleted file mode 100644
index b3d7742..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.invertedindex;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.streaming.JsonStreamParser;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-/**
- */
-public class PrintOutStreamBuilder extends StreamBuilder {
-
- private final List<TblColRef> allColumns;
-
- public PrintOutStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, int sliceSize, List<TblColRef> allColumns) {
- super(streamMessageQueue, sliceSize);
- setStreamParser(new JsonStreamParser(allColumns));
- this.allColumns = allColumns;
- }
-
- @Override
- protected void build(List<StreamMessage> streamsToBuild) throws Exception {
- for (StreamMessage streamMessage : streamsToBuild) {
- final List<String> row = getStreamParser().parse(streamMessage);
- System.out.println("offset:" + streamMessage.getOffset() + " " + StringUtils.join(row, ","));
- }
- }
-
- @Override
- protected void onStop() {
-
- }
-}