You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/29 09:44:49 UTC
[1/4] incubator-kylin git commit: ci fix: synchronize on project
update
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 cf25daaf9 -> 49362dd14
ci fix: synchronize on project update
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/49362dd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/49362dd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/49362dd1
Branch: refs/heads/0.8.0
Commit: 49362dd1403db640b7809944bd302ccde756cc55
Parents: 341cda0
Author: honma <ho...@ebay.com>
Authored: Fri May 29 10:52:31 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri May 29 15:44:25 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/metadata/project/ProjectManager.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/49362dd1/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index bcafcc7..dd146d9 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -19,7 +19,6 @@
package org.apache.kylin.metadata.project;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
@@ -225,9 +224,11 @@ public class ProjectManager {
}
private void updateProject(ProjectInstance prj) throws IOException {
- getStore().putResource(prj.getResourcePath(), prj, PROJECT_SERIALIZER);
- projectMap.put(norm(prj.getName()), prj); // triggers update broadcast
- clearL2Cache();
+ synchronized (prj) {
+ getStore().putResource(prj.getResourcePath(), prj, PROJECT_SERIALIZER);
+ projectMap.put(norm(prj.getName()), prj); // triggers update broadcast
+ clearL2Cache();
+ }
}
private void removeProject(ProjectInstance proj) throws IOException {
[3/4] incubator-kylin git commit: streaming cubing
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
new file mode 100644
index 0000000..e5a5b5f
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
@@ -0,0 +1,567 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+package org.apache.kylin.streaming.cube;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CubeGridTable;
+import org.apache.kylin.storage.gridtable.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ */
+public class InMemCubeBuilder implements Runnable {
+
+ private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
+ private static final int DEFAULT_TIMEOUT = 25;
+
+ private BlockingQueue<List<String>> queue;
+ private CubeDesc desc = null;
+ private long baseCuboidId;
+ private CuboidScheduler cuboidScheduler = null;
+ private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
+ private CubeJoinedFlatTableDesc intermediateTableDesc;
+ private MeasureCodec measureCodec;
+ private String[] metricsAggrFuncs = null;
+ private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
+ public static final LongWritable ONE = new LongWritable(1l);
+ private int[] hbaseMeasureRefIndex;
+ private MeasureDesc[] measureDescs;
+ private int measureCount;
+
+ protected IGTRecordWriter gtRecordWriter;
+
+
+ /**
+ * @param queue
+ * @param cube
+ * @param dictionaryMap
+ * @param gtRecordWriter
+ */
+ public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
+ if (dictionaryMap == null || dictionaryMap.isEmpty()) {
+ throw new IllegalArgumentException("dictionary cannot be empty");
+ }
+ this.queue = queue;
+ this.desc = cube.getDescriptor();
+ this.cuboidScheduler = new CuboidScheduler(desc);
+ this.dictionaryMap = dictionaryMap;
+ this.gtRecordWriter = gtRecordWriter;
+ this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
+ this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+ this.measureCodec = new MeasureCodec(desc.getMeasures());
+
+ Map<String, Integer> measureIndexMap = Maps.newHashMap();
+ List<String> metricsAggrFuncsList = Lists.newArrayList();
+ measureCount = desc.getMeasures().size();
+
+ List<MeasureDesc> measureDescsList = Lists.newArrayList();
+ hbaseMeasureRefIndex = new int[measureCount];
+ int measureRef = 0;
+ for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+ for (int j = 0; j < measureCount; j++) {
+ if (desc.getMeasures().get(j).equals(measure)) {
+ measureDescsList.add(measure);
+ hbaseMeasureRefIndex[measureRef] = j;
+ break;
+ }
+ }
+ measureRef++;
+ }
+ }
+ }
+
+ for (int i = 0; i < measureCount; i++) {
+ MeasureDesc measureDesc = measureDescsList.get(i);
+ metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+ measureIndexMap.put(measureDesc.getName(), i);
+ }
+ this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+
+ this.dependentMeasures = Maps.newHashMap();
+ for (int i = 0; i < measureCount; i++) {
+ String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
+ if (depMsrRef != null) {
+ int index = measureIndexMap.get(depMsrRef);
+ dependentMeasures.put(i, index);
+ }
+ }
+
+ this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
+ }
+
+
+ private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
+ GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
+ GTComboStore store = new GTComboStore(info, memStore);
+ GridTable gridTable = new GridTable(info, store);
+ return gridTable;
+ }
+
+ private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
+ Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
+ BitSet parentDimensions = columnBitSets.getFirst();
+ BitSet measureColumns = columnBitSets.getSecond();
+ BitSet childDimensions = (BitSet) parentDimensions.clone();
+
+ long mask = Long.highestOneBit(parentCuboidId);
+ long childCuboidId = cuboidId;
+ long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId);
+ int index = 0;
+ for (int i = 0; i < parentCuboidIdActualLength; i++) {
+ if ((mask & parentCuboidId) > 0) {
+ if ((mask & childCuboidId) == 0) {
+ // this dim will be aggregated
+ childDimensions.set(index, false);
+ }
+ index++;
+ }
+ mask = mask >> 1;
+ }
+
+ return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
+
+ }
+
+ private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+ IGTScanner scanner = gridTable.scan(req);
+ GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
+ GTBuilder builder = newGridTable.rebuild();
+
+ BitSet allNeededColumns = new BitSet();
+ allNeededColumns.or(aggregationColumns);
+ allNeededColumns.or(measureColumns);
+
+ GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+ int counter = 0;
+ ByteArray byteArray = new ByteArray(8);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+ try {
+ BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
+ for (Integer i : dependentMeasures.keySet()) {
+ dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
+ }
+
+ Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
+
+ for (GTRecord record : scanner) {
+ counter++;
+ for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
+ newRecord.set(index, record.get(i));
+ }
+
+ if(dependentMeasures.size() > 0) {
+ // update measures which have 'dependent_measure_ref'
+ newRecord.getValues(dependentMetrics, hllObjects);
+
+ for (Integer i : dependentMeasures.keySet()) {
+ for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
+ if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
+ assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
+
+ byteBuffer.clear();
+ BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
+ byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
+ newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
+ }
+ }
+
+ }
+ }
+
+ builder.write(newRecord);
+ }
+ } finally {
+ builder.close();
+ }
+ logger.info("Cuboid " + cuboidId + " has rows: " + counter);
+
+ return newGridTable;
+ }
+
+ private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ BitSet dimension = new BitSet();
+ dimension.set(0, bitSet.cardinality());
+ BitSet metrics = new BitSet();
+ metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
+ return new Pair<BitSet, BitSet>(dimension, metrics);
+ }
+
+ private Object[] buildKey(List<String> row) {
+ int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+ Object[] key = new Object[keySize];
+
+ for (int i = 0; i < keySize; i++) {
+ key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ }
+
+ return key;
+ }
+
+ private Object[] buildValue(List<String> row) {
+
+ Object[] values = new Object[measureCount];
+ MeasureDesc measureDesc = null;
+
+ for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
+ int i = hbaseMeasureRefIndex[position];
+ measureDesc = measureDescs[i];
+
+ Object value = null;
+ int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
+ FunctionDesc function = desc.getMeasures().get(i).getFunction();
+ if (function.isCount() || function.isHolisticCountDistinct()) {
+ // note for holistic count distinct, this value will be ignored
+ value = ONE;
+ } else if (flatTableIdx == null) {
+ value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+ } else if (flatTableIdx.length == 1) {
+ value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
+ } else {
+
+ byte[] result = null;
+ for (int x = 0; x < flatTableIdx.length; x++) {
+ byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
+ if (result == null) {
+ result = Arrays.copyOf(split, split.length);
+ } else {
+ byte[] newResult = new byte[result.length + split.length];
+ System.arraycopy(result, 0, newResult, 0, result.length);
+ System.arraycopy(split, 0, newResult, result.length, split.length);
+ result = newResult;
+ }
+ }
+ value = measureCodec.getSerializer(i).valueOf(result);
+ }
+ values[position] = value;
+ }
+ return values;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ logger.info("Create base cuboid " + baseCuboidId);
+ final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
+
+ GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
+ final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
+
+ IGTScanner queueScanner = new IGTScanner() {
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ List<String> currentObject = null;
+
+ @Override
+ public boolean hasNext() {
+ try {
+ currentObject = queue.take();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return currentObject != null && currentObject.size() > 0;
+ }
+
+ @Override
+ public GTRecord next() {
+ if (currentObject.size() == 0)
+ throw new IllegalStateException();
+
+ buildGTRecord(currentObject, baseGTRecord);
+ return baseGTRecord;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return baseCuboidGT.getInfo();
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return 0;
+ }
+
+ @Override
+ public int getScannedRowBlockCount() {
+ return 0;
+ }
+ };
+
+ Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
+ GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
+ IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
+
+ int counter = 0;
+ for (GTRecord r : aggregationScanner) {
+ baseGTBuilder.write(r);
+ counter++;
+ }
+ baseGTBuilder.close();
+ aggregationScanner.close();
+
+ logger.info("Base cuboid has " + counter + " rows;");
+ SimpleGridTableTree tree = new SimpleGridTableTree();
+ tree.data = baseCuboidGT;
+ tree.id = baseCuboidId;
+ tree.parent = null;
+ if (counter > 0) {
+ List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
+ Collections.sort(children);
+ for (Long childId : children) {
+ createNDCuboidGT(tree, baseCuboidId, childId);
+ }
+ }
+ outputGT(baseCuboidId, baseCuboidGT);
+ dropStore(baseCuboidGT);
+
+ } catch (IOException e) {
+ logger.error("Fail to build cube", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private void buildGTRecord(List<String> row, GTRecord record) {
+
+ Object[] dimensions = buildKey(row);
+ Object[] metricsValues = buildValue(row);
+ Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+ System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+ System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+ record.setValues(recordValues);
+ }
+
+ private boolean gc(TreeNode<GridTable> parentNode) {
+ final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
+ logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
+ for (TreeNode<GridTable> gridTable : gridTables) {
+ final GTComboStore store = (GTComboStore) gridTable.data.getStore();
+ if (store.memoryUsage() > 0) {
+ logger.info("cuboid id:" + gridTable.id + " flush to disk");
+ long t = System.currentTimeMillis();
+ store.switchToDiskStore();
+ logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
+ waitForGc();
+ return true;
+ }
+ }
+ logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
+ return false;
+
+ }
+
+ private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
+ @Override
+ public GridTable call() throws Exception {
+ return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
+ }
+ });
+ try {
+ final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+ return gridTable;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("this should not happen", e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof OutOfMemoryError) {
+ logger.warn("Future.get() OutOfMemory, stop the thread");
+ } else {
+ throw new RuntimeException("this should not happen", e);
+ }
+ } catch (TimeoutException e) {
+ logger.warn("Future.get() timeout, stop the thread");
+ }
+ logger.info("shutdown executor service");
+ final List<Runnable> runnables = executorService.shutdownNow();
+ try {
+ executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+ waitForGc();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("this should not happen", e);
+ }
+ return null;
+
+ }
+
+ private void waitForGc() {
+ System.gc();
+ logger.info("wait 5 seconds for gc");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("should not happen", e);
+ }
+ }
+
+ private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
+
+ long startTime = System.currentTimeMillis();
+
+// GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
+// if (parentStore.memoryUsage() <= 0) {
+// long t = System.currentTimeMillis();
+// parentStore.switchToMemStore();
+// logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
+// }
+
+ GridTable currentCuboid;
+ while (true) {
+ logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
+ currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
+ if (currentCuboid != null) {
+ break;
+ } else {
+ logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
+ if (gc(parentNode)) {
+ continue;
+ } else {
+ logger.warn("all parent node has been flushed into disk, memory is still insufficient");
+ throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
+ }
+ }
+ }
+ SimpleGridTableTree node = new SimpleGridTableTree();
+ node.parent = parentNode;
+ node.data = currentCuboid;
+ node.id = cuboidId;
+ parentNode.children.add(node);
+
+ logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
+
+ List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+ if (!children.isEmpty()) {
+ Collections.sort(children); // sort cuboids
+ for (Long childId : children) {
+ createNDCuboidGT(node, cuboidId, childId);
+ }
+ }
+
+
+ //output the grid table
+ outputGT(cuboidId, currentCuboid);
+ dropStore(currentCuboid);
+ parentNode.children.remove(node);
+ if (parentNode.children.size() > 0) {
+ logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
+ ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
+ }
+ }
+
+ private void dropStore(GridTable gt) throws IOException {
+ ((GTComboStore) gt.getStore()).drop();
+ }
+
+
+ private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
+ long startTime = System.currentTimeMillis();
+ GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+ IGTScanner scanner = gridTable.scan(req);
+ for (GTRecord record : scanner) {
+ this.gtRecordWriter.write(cuboidId, record);
+ }
+ logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+
+ private static class TreeNode<T> {
+ T data;
+ long id;
+ TreeNode<T> parent;
+ List<TreeNode<T>> children = Lists.newArrayList();
+
+ List<TreeNode<T>> getAncestorList() {
+ ArrayList<TreeNode<T>> result = Lists.newArrayList();
+ TreeNode<T> parent = this;
+ while (parent != null) {
+ result.add(parent);
+ parent = parent.parent;
+ }
+ return Lists.reverse(result);
+ }
+
+ @Override
+ public String toString() {
+ return id + "";
+ }
+ }
+
+ private static class SimpleGridTableTree extends TreeNode<GridTable> {
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 755b490..56435bd 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -61,16 +61,19 @@ import java.util.concurrent.TimeUnit;
public class IIStreamBuilder extends StreamBuilder {
private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
+ private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000;
private final IIDesc desc;
private final HTableInterface hTable;
private final SliceBuilder sliceBuilder;
private final int shardId;
private final String streaming;
+ private final int batchSize;
private StreamingManager streamingManager;
public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard) {
- super(queue, iiDesc.getSliceSize());
+ super(queue);
+ this.batchSize = iiDesc.getSliceSize();
this.streaming = streaming;
this.desc = iiDesc;
this.shardId = shard;
@@ -117,6 +120,16 @@ public class IIStreamBuilder extends StreamBuilder {
}
}
+ @Override
+ protected int batchInterval() {
+ return BATCH_BUILD_INTERVAL_THRESHOLD;
+ }
+
+ @Override
+ protected int batchSize() {
+ return batchSize;
+ }
+
private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
List<Put> data = Lists.newArrayList();
for (IIRow row : codec.encodeKeyValue(slice)) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
new file mode 100644
index 0000000..b530cdc
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
@@ -0,0 +1,117 @@
+package org.apache.kylin.streaming.cube;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
+
+ private static final int BENCHMARK_RECORD_LIMIT = 2000000;
+ private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
+ final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+ final CubeDesc desc = cubeSegment.getCubeDesc();
+ for (DimensionDesc dim : desc.getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (desc.getRowkey().isUseDictionary(col)) {
+ Dictionary dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
+ }
+ logger.info("Dictionary for " + col + " was put into dictionary map.");
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
+ }
+ }
+ return dictionaryMap;
+ }
+
+ private static class ConsoleGTRecordWriter implements IGTRecordWriter {
+
+ boolean verbose = false;
+
+ @Override
+ public void write(Long cuboidId, GTRecord record) throws IOException {
+ if (verbose)
+ System.out.println(record.toString());
+ }
+ }
+
+ private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+ String line;
+ int counter = 0;
+ while ((line = br.readLine()) != null) {
+ queue.put(Arrays.asList(line.split("\t")));
+ counter++;
+ if (counter == BENCHMARK_RECORD_LIMIT) {
+ break;
+ }
+ }
+ queue.put(Collections.emptyList());
+ }
+
+ private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+ queue.put(Collections.emptyList());
+ }
+
+
+ @Test
+ public void test() throws Exception {
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+ final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
+ final CubeSegment cubeSegment = cube.getFirstSegment();
+
+ LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
+
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(cubeBuilder);
+ loadDataFromLocalFile(queue);
+ future.get();
+ logger.info("stream build finished");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
deleted file mode 100644
index b3d7742..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.invertedindex;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.streaming.JsonStreamParser;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-/**
- */
-public class PrintOutStreamBuilder extends StreamBuilder {
-
- private final List<TblColRef> allColumns;
-
- public PrintOutStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, int sliceSize, List<TblColRef> allColumns) {
- super(streamMessageQueue, sliceSize);
- setStreamParser(new JsonStreamParser(allColumns));
- this.allColumns = allColumns;
- }
-
- @Override
- protected void build(List<StreamMessage> streamsToBuild) throws Exception {
- for (StreamMessage streamMessage : streamsToBuild) {
- final List<String> row = getStreamParser().parse(streamMessage);
- System.out.println("offset:" + streamMessage.getOffset() + " " + StringUtils.join(row, ","));
- }
- }
-
- @Override
- protected void onStop() {
-
- }
-}
[4/4] incubator-kylin git commit: streaming cubing
Posted by ma...@apache.org.
streaming cubing
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a6a9d940
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a6a9d940
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a6a9d940
Branch: refs/heads/0.8.0
Commit: a6a9d940ca220f9c94d4c4ff6cc3a2cccb617c0b
Parents: cf25daa
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri May 22 17:13:25 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri May 29 15:44:25 2015 +0800
----------------------------------------------------------------------
.../common/persistence/HBaseResourceStore.java | 35 +-
.../org/apache/kylin/common/util/TimeUtil.java | 13 +-
.../apache/kylin/common/util/TimeUtilTest.java | 25 +-
.../job/hadoop/cubev2/IGTRecordWriter.java | 11 -
.../job/hadoop/cubev2/InMemCubeBuilder.java | 567 -------------------
.../job/hadoop/cubev2/InMemCuboidMapper.java | 1 +
.../hadoop/cubev2/MapContextGTRecordWriter.java | 1 +
.../kylin/job/streaming/CubeStreamBuilder.java | 403 +++++++++++++
.../kylin/job/streaming/StreamingBootstrap.java | 96 +++-
.../kylin/job/BuildCubeWithStreamTest.java | 4 +-
.../job/ITKafkaBasedIIStreamBuilderTest.java | 32 +-
.../apache/kylin/job/InMemCubeBuilderTest.java | 4 +-
.../cubev2/InMemCubeBuilderBenchmarkTest.java | 119 ----
.../job/streaming/CubeStreamBuilderTest.java | 76 +++
.../kylin/storage/StorageEngineFactory.java | 3 +-
.../org/apache/kylin/streaming/KafkaConfig.java | 12 +-
.../kylin/streaming/SEOJsonStreamParser.java | 100 ++++
.../apache/kylin/streaming/StreamBuilder.java | 20 +-
.../kylin/streaming/cube/IGTRecordWriter.java | 11 +
.../kylin/streaming/cube/InMemCubeBuilder.java | 567 +++++++++++++++++++
.../invertedindex/IIStreamBuilder.java | 15 +-
.../cube/InMemCubeBuilderBenchmarkTest.java | 117 ++++
.../invertedindex/PrintOutStreamBuilder.java | 70 ---
23 files changed, 1437 insertions(+), 865 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index 2868368..e665298 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -18,34 +18,27 @@
package org.apache.kylin.common.persistence;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.kylin.common.util.Bytes;
-
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.HadoopUtil;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
public class HBaseResourceStore extends ResourceStore {
private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
@@ -197,8 +190,10 @@ public class HBaseResourceStore extends ResourceStore {
Put put = buildPut(resPath, newTS, row, content, table);
boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
- if (!ok)
- throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + getResourceTimestamp(resPath));
+ if (!ok) {
+ long real = getResourceTimestamp(resPath);
+ throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
+ }
table.flushCommits();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index 26e3e06..0aa58e4 100644
--- a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -3,8 +3,13 @@ package org.apache.kylin.common.util;
/**
*/
public class TimeUtil {
- private static int ONE_MINUTE_TS = 60 * 1000;
- private static int ONE_HOUR_TS = 60 * 60 * 1000;
+ public enum NormalizedTimeUnit {
+ MINUTE, HOUR, DAY
+ }
+
+ private static long ONE_MINUTE_TS = 60 * 1000;
+ private static long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
+ private static long ONE_DAY_TS = 24 * ONE_HOUR_TS;
public static long getMinuteStart(long ts) {
return ts / ONE_MINUTE_TS * ONE_MINUTE_TS;
@@ -13,4 +18,8 @@ public class TimeUtil {
public static long getHourStart(long ts) {
return ts / ONE_HOUR_TS * ONE_HOUR_TS;
}
+
+ public static long getDayStart(long ts) {
+ return ts / ONE_DAY_TS * ONE_DAY_TS;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
index 90a0c40..cfa11d8 100644
--- a/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
@@ -1,24 +1,25 @@
package org.apache.kylin.common.util;
+import org.apache.kylin.common.util.TimeUtil.NormalizedTimeUnit;
+import org.junit.Assert;
+import org.junit.Test;
+
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
-import org.junit.Assert;
-import org.junit.Test;
-
/**
*/
public class TimeUtilTest {
- public static long normalizeTime(long timeMillis, NormalizeUnit unit) {
+ public static long normalizeTime(long timeMillis, NormalizedTimeUnit unit) {
Calendar a = Calendar.getInstance();
Calendar b = Calendar.getInstance();
b.clear();
a.setTimeInMillis(timeMillis);
- if (unit == NormalizeUnit.MINUTE) {
+ if (unit == NormalizedTimeUnit.MINUTE) {
b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), a.get(Calendar.MINUTE));
- } else if (unit == NormalizeUnit.HOUR) {
+ } else if (unit == NormalizedTimeUnit.HOUR) {
b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), 0);
}
return b.getTimeInMillis();
@@ -29,15 +30,13 @@ public class TimeUtilTest {
java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime();
- Assert.assertEquals(normalizeTime(t1, NormalizeUnit.HOUR), TimeUtil.getHourStart(t1));
- Assert.assertEquals(normalizeTime(t1, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t1));
+ Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t1));
+ Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t1));
long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime();
- Assert.assertEquals(normalizeTime(t2, NormalizeUnit.HOUR), TimeUtil.getHourStart(t2));
- Assert.assertEquals(normalizeTime(t2, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
+ Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t2));
+ Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
}
- public enum NormalizeUnit {
- MINUTE, HOUR
- }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
deleted file mode 100644
index cccc995..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.job.hadoop.cubev2;
-
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-import java.io.IOException;
-
-/**
- */
-public interface IGTRecordWriter {
- void write(Long cuboidId, GTRecord record) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
deleted file mode 100644
index 56989a6..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- */
-public class InMemCubeBuilder implements Runnable {
-
- private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
- private static final int DEFAULT_TIMEOUT = 25;
-
- private BlockingQueue<List<String>> queue;
- private CubeDesc desc = null;
- private long baseCuboidId;
- private CuboidScheduler cuboidScheduler = null;
- private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
- private CubeJoinedFlatTableDesc intermediateTableDesc;
- private MeasureCodec measureCodec;
- private String[] metricsAggrFuncs = null;
- private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
- public static final LongWritable ONE = new LongWritable(1l);
- private int[] hbaseMeasureRefIndex;
- private MeasureDesc[] measureDescs;
- private int measureCount;
-
- protected IGTRecordWriter gtRecordWriter;
-
-
- /**
- * @param queue
- * @param cube
- * @param dictionaryMap
- * @param gtRecordWriter
- */
- public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
- if (dictionaryMap == null || dictionaryMap.isEmpty()) {
- throw new IllegalArgumentException();
- }
- this.queue = queue;
- this.desc = cube.getDescriptor();
- this.cuboidScheduler = new CuboidScheduler(desc);
- this.dictionaryMap = dictionaryMap;
- this.gtRecordWriter = gtRecordWriter;
- this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
- this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
- this.measureCodec = new MeasureCodec(desc.getMeasures());
-
- Map<String, Integer> measureIndexMap = Maps.newHashMap();
- List<String> metricsAggrFuncsList = Lists.newArrayList();
- measureCount = desc.getMeasures().size();
-
- List<MeasureDesc> measureDescsList = Lists.newArrayList();
- hbaseMeasureRefIndex = new int[measureCount];
- int measureRef = 0;
- for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
- for (int j = 0; j < measureCount; j++) {
- if (desc.getMeasures().get(j).equals(measure)) {
- measureDescsList.add(measure);
- hbaseMeasureRefIndex[measureRef] = j;
- break;
- }
- }
- measureRef++;
- }
- }
- }
-
- for (int i = 0; i < measureCount; i++) {
- MeasureDesc measureDesc = measureDescsList.get(i);
- metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
- measureIndexMap.put(measureDesc.getName(), i);
- }
- this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-
- this.dependentMeasures = Maps.newHashMap();
- for (int i = 0; i < measureCount; i++) {
- String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
- if (depMsrRef != null) {
- int index = measureIndexMap.get(depMsrRef);
- dependentMeasures.put(i, index);
- }
- }
-
- this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
- }
-
-
- private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
- GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
- GTComboStore store = new GTComboStore(info, memStore);
- GridTable gridTable = new GridTable(info, store);
- return gridTable;
- }
-
- private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
- Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
- BitSet parentDimensions = columnBitSets.getFirst();
- BitSet measureColumns = columnBitSets.getSecond();
- BitSet childDimensions = (BitSet) parentDimensions.clone();
-
- long mask = Long.highestOneBit(parentCuboidId);
- long childCuboidId = cuboidId;
- long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId);
- int index = 0;
- for (int i = 0; i < parentCuboidIdActualLength; i++) {
- if ((mask & parentCuboidId) > 0) {
- if ((mask & childCuboidId) == 0) {
- // this dim will be aggregated
- childDimensions.set(index, false);
- }
- index++;
- }
- mask = mask >> 1;
- }
-
- return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
-
- }
-
- private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
- IGTScanner scanner = gridTable.scan(req);
- GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
- GTBuilder builder = newGridTable.rebuild();
-
- BitSet allNeededColumns = new BitSet();
- allNeededColumns.or(aggregationColumns);
- allNeededColumns.or(measureColumns);
-
- GTRecord newRecord = new GTRecord(newGridTable.getInfo());
- int counter = 0;
- ByteArray byteArray = new ByteArray(8);
- ByteBuffer byteBuffer = ByteBuffer.allocate(8);
- try {
- BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
- for (Integer i : dependentMeasures.keySet()) {
- dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
- }
-
- Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
-
- for (GTRecord record : scanner) {
- counter++;
- for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
- newRecord.set(index, record.get(i));
- }
-
- if(dependentMeasures.size() > 0) {
- // update measures which have 'dependent_measure_ref'
- newRecord.getValues(dependentMetrics, hllObjects);
-
- for (Integer i : dependentMeasures.keySet()) {
- for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
- if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
- assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
-
- byteBuffer.clear();
- BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
- byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
- newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
- }
- }
-
- }
- }
-
- builder.write(newRecord);
- }
- } finally {
- builder.close();
- }
- logger.info("Cuboid " + cuboidId + " has rows: " + counter);
-
- return newGridTable;
- }
-
- private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
- BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
- BitSet dimension = new BitSet();
- dimension.set(0, bitSet.cardinality());
- BitSet metrics = new BitSet();
- metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
- return new Pair<BitSet, BitSet>(dimension, metrics);
- }
-
- private Object[] buildKey(List<String> row) {
- int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
- Object[] key = new Object[keySize];
-
- for (int i = 0; i < keySize; i++) {
- key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
- }
-
- return key;
- }
-
- private Object[] buildValue(List<String> row) {
-
- Object[] values = new Object[measureCount];
- MeasureDesc measureDesc = null;
-
- for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
- int i = hbaseMeasureRefIndex[position];
- measureDesc = measureDescs[i];
-
- Object value = null;
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
- FunctionDesc function = desc.getMeasures().get(i).getFunction();
- if (function.isCount() || function.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
- value = ONE;
- } else if (flatTableIdx == null) {
- value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
- } else if (flatTableIdx.length == 1) {
- value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
- } else {
-
- byte[] result = null;
- for (int x = 0; x < flatTableIdx.length; x++) {
- byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
- if (result == null) {
- result = Arrays.copyOf(split, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split, 0, newResult, result.length, split.length);
- result = newResult;
- }
- }
- value = measureCodec.getSerializer(i).valueOf(result);
- }
- values[position] = value;
- }
- return values;
- }
-
-
- @Override
- public void run() {
- try {
- logger.info("Create base cuboid " + baseCuboidId);
- final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
-
- GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
- final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
-
- IGTScanner queueScanner = new IGTScanner() {
-
- @Override
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- List<String> currentObject = null;
-
- @Override
- public boolean hasNext() {
- try {
- currentObject = queue.take();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return currentObject != null && currentObject.size() > 0;
- }
-
- @Override
- public GTRecord next() {
- if (currentObject.size() == 0)
- throw new IllegalStateException();
-
- buildGTRecord(currentObject, baseGTRecord);
- return baseGTRecord;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public GTInfo getInfo() {
- return baseCuboidGT.getInfo();
- }
-
- @Override
- public int getScannedRowCount() {
- return 0;
- }
-
- @Override
- public int getScannedRowBlockCount() {
- return 0;
- }
- };
-
- Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
- GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
- IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
-
- int counter = 0;
- for (GTRecord r : aggregationScanner) {
- baseGTBuilder.write(r);
- counter++;
- }
- baseGTBuilder.close();
- aggregationScanner.close();
-
- logger.info("Base cuboid has " + counter + " rows;");
- SimpleGridTableTree tree = new SimpleGridTableTree();
- tree.data = baseCuboidGT;
- tree.id = baseCuboidId;
- tree.parent = null;
- if (counter > 0) {
- List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
- Collections.sort(children);
- for (Long childId : children) {
- createNDCuboidGT(tree, baseCuboidId, childId);
- }
- }
- outputGT(baseCuboidId, baseCuboidGT);
- dropStore(baseCuboidGT);
-
- } catch (IOException e) {
- logger.error("Fail to build cube", e);
- throw new RuntimeException(e);
- }
-
- }
-
- private void buildGTRecord(List<String> row, GTRecord record) {
-
- Object[] dimensions = buildKey(row);
- Object[] metricsValues = buildValue(row);
- Object[] recordValues = new Object[dimensions.length + metricsValues.length];
- System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
- System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
- record.setValues(recordValues);
- }
-
- private boolean gc(TreeNode<GridTable> parentNode) {
- final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
- logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
- for (TreeNode<GridTable> gridTable : gridTables) {
- final GTComboStore store = (GTComboStore) gridTable.data.getStore();
- if (store.memoryUsage() > 0) {
- logger.info("cuboid id:" + gridTable.id + " flush to disk");
- long t = System.currentTimeMillis();
- store.switchToDiskStore();
- logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
- waitForGc();
- return true;
- }
- }
- logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
- return false;
-
- }
-
- private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
- final ExecutorService executorService = Executors.newSingleThreadExecutor();
- final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
- @Override
- public GridTable call() throws Exception {
- return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
- }
- });
- try {
- final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
- return gridTable;
- } catch (InterruptedException e) {
- throw new RuntimeException("this should not happen", e);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof OutOfMemoryError) {
- logger.warn("Future.get() OutOfMemory, stop the thread");
- } else {
- throw new RuntimeException("this should not happen", e);
- }
- } catch (TimeoutException e) {
- logger.warn("Future.get() timeout, stop the thread");
- }
- logger.info("shutdown executor service");
- final List<Runnable> runnables = executorService.shutdownNow();
- try {
- executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
- waitForGc();
- } catch (InterruptedException e) {
- throw new RuntimeException("this should not happen", e);
- }
- return null;
-
- }
-
- private void waitForGc() {
- System.gc();
- logger.info("wait 5 seconds for gc");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException("should not happen", e);
- }
- }
-
- private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
-
- long startTime = System.currentTimeMillis();
-
-// GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
-// if (parentStore.memoryUsage() <= 0) {
-// long t = System.currentTimeMillis();
-// parentStore.switchToMemStore();
-// logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
-// }
-
- GridTable currentCuboid;
- while (true) {
- logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
- currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
- if (currentCuboid != null) {
- break;
- } else {
- logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
- if (gc(parentNode)) {
- continue;
- } else {
- logger.warn("all parent node has been flushed into disk, memory is still insufficient");
- throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
- }
- }
- }
- SimpleGridTableTree node = new SimpleGridTableTree();
- node.parent = parentNode;
- node.data = currentCuboid;
- node.id = cuboidId;
- parentNode.children.add(node);
-
- logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
-
- List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
- if (!children.isEmpty()) {
- Collections.sort(children); // sort cuboids
- for (Long childId : children) {
- createNDCuboidGT(node, cuboidId, childId);
- }
- }
-
-
- //output the grid table
- outputGT(cuboidId, currentCuboid);
- dropStore(currentCuboid);
- parentNode.children.remove(node);
- if (parentNode.children.size() > 0) {
- logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
- ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
- }
- }
-
- private void dropStore(GridTable gt) throws IOException {
- ((GTComboStore) gt.getStore()).drop();
- }
-
-
- private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
- long startTime = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
- IGTScanner scanner = gridTable.scan(req);
- for (GTRecord record : scanner) {
- this.gtRecordWriter.write(cuboidId, record);
- }
- logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
- }
-
- private static class TreeNode<T> {
- T data;
- long id;
- TreeNode<T> parent;
- List<TreeNode<T>> children = Lists.newArrayList();
-
- List<TreeNode<T>> getAncestorList() {
- ArrayList<TreeNode<T>> result = Lists.newArrayList();
- TreeNode<T> parent = this;
- while (parent != null) {
- result.add(parent);
- parent = parent.parent;
- }
- return Lists.reverse(result);
- }
-
- @Override
- public String toString() {
- return id + "";
- }
- }
-
- private static class SimpleGridTableTree extends TreeNode<GridTable> {
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index 4454e43..4efff16 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -20,6 +20,7 @@ import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
import java.io.IOException;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index b8e1ffe..283bed6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -16,6 +16,7 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
new file mode 100644
index 0000000..82892dc
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -0,0 +1,403 @@
+package org.apache.kylin.job.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ReadableTable;
+import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
+import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.SEOJsonStreamParser;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class CubeStreamBuilder extends StreamBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilder.class);
+
+ private final CubeManager cubeManager;
+ private final String cubeName;
+ private final KylinConfig kylinConfig;
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+
+ public CubeStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, String cubeName) {
+ super(streamMessageQueue);
+ this.kylinConfig = KylinConfig.getInstanceFromEnv();
+ this.cubeManager = CubeManager.getInstance(kylinConfig);
+ this.cubeName = cubeName;
+ setStreamParser(new SEOJsonStreamParser(cubeManager.getCube(cubeName).getAllColumns()));
+ }
+
+ @Override
+ protected void build(List<StreamMessage> streamMessages) throws Exception {
+ if (CollectionUtils.isEmpty(streamMessages)) {
+ logger.info("nothing to build, skip to next iteration");
+ return;
+ }
+ final List<List<String>> parsedStreamMessages = parseStream(streamMessages);
+ long startOffset = streamMessages.get(0).getOffset();
+ long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
+ LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+ blockingQueue.put(Collections.<String>emptyList());
+
+ final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis());
+ final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+
+ final Configuration conf = HadoopUtil.getCurrentConfiguration();
+ final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString();
+ FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+ ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
+
+ final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
+ writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+
+ final HTableInterface hTable = createHTable(cubeSegment);
+
+ final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
+ InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
+ dictionaryMap, gtRecordWriter);
+
+ executorService.submit(inMemCubeBuilder).get();
+ gtRecordWriter.flush();
+ commitSegment(cubeSegment);
+ }
+
+ private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
+ for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
+ final TblColRef tblColRef = entry.getKey();
+ final Dictionary<?> dictionary = entry.getValue();
+ TableSignature signature = new TableSignature();
+ signature.setLastModifiedTime(System.currentTimeMillis());
+ signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
+ signature.setSize(endOffset - startOffset);
+ DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(),
+ tblColRef.getName(),
+ tblColRef.getColumnDesc().getZeroBasedIndex(),
+ tblColRef.getDatatype(),
+ signature,
+ ReadableTable.DELIM_AUTO);
+ logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+ DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
+ try {
+ cubeSegment.putDictResPath(tblColRef, dictionaryManager.trySaveNewDict(dictionary, dictInfo).getResourcePath());
+ } catch (IOException e) {
+ logger.error("error save dictionary for column:" + tblColRef, e);
+ throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+ }
+ }
+ }
+
+ private class CubeStreamRecordWriter implements IGTRecordWriter {
+ final List<InMemKeyValueCreator> keyValueCreators;
+ final int nColumns;
+ final HTableInterface hTable;
+ private final ByteBuffer byteBuffer;
+ private final CubeDesc cubeDesc;
+ private List<Put> puts = Lists.newArrayList();
+
+ private CubeStreamRecordWriter(CubeDesc cubeDesc, HTableInterface hTable) {
+ this.keyValueCreators = Lists.newArrayList();
+ this.cubeDesc = cubeDesc;
+ int startPosition = 0;
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
+ startPosition += colDesc.getMeasures().length;
+ }
+ }
+ this.nColumns = keyValueCreators.size();
+ this.hTable = hTable;
+ this.byteBuffer = ByteBuffer.allocate(1<<20);
+ }
+
+ private byte[] copy(byte[] array, int offset, int length) {
+ byte[] result = new byte[length];
+ System.arraycopy(array, offset, result, 0, length);
+ return result;
+ }
+
+ private ByteBuffer createKey(Long cuboidId, GTRecord record) {
+ byteBuffer.clear();
+ byteBuffer.put(Bytes.toBytes(cuboidId));
+ final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+ for (int i = 0; i < cardinality; i++) {
+ final ByteArray byteArray = record.get(i);
+ byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
+ }
+ return byteBuffer;
+ }
+
+ @Override
+ public void write(Long cuboidId, GTRecord record) throws IOException {
+ final ByteBuffer key = createKey(cuboidId, record);
+ final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
+ final BitSet bitSet = new BitSet();
+ bitSet.set(mapping.getDimensionCount(), mapping.getColumnCount());
+ for (int i = 0; i < nColumns; i++) {
+ final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
+ final Put put = new Put(copy(key.array(), 0, key.position()));
+ byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
+ byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
+ byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
+ put.add(family, qualifier, value);
+ puts.add(put);
+ }
+ if (puts.size() >= batchSize()) {
+ flush();
+ }
+ }
+
+ public final void flush() {
+ try {
+ if (!puts.isEmpty()) {
+ long t = System.currentTimeMillis();
+ if (hTable != null) {
+ hTable.put(puts);
+ hTable.flushCommits();
+ }
+ logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
+ puts.clear();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Map<Integer, TblColRef> getTblColRefMap(CubeInstance cubeInstance) {
+ final List<TblColRef> columns = cubeInstance.getAllColumns();
+ final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
+ final HashMap<Integer, TblColRef> result = Maps.newHashMap();
+ for (int i = 0; i < columns.size(); i++) {
+ final TblColRef tblColRef = columns.get(i);
+ if (allDimensions.contains(tblColRef)) {
+ result.put(i, tblColRef);
+ }
+ }
+ return result;
+ }
+
+ private Map<TblColRef, Dictionary<?>> buildDictionary(final Map<Integer, TblColRef> tblColRefMap, List<List<String>> recordList) throws IOException {
+ HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+
+ HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+ for (List<String> row : recordList) {
+ for (int i = 0; i < row.size(); i++) {
+ String cell = row.get(i);
+ if (tblColRefMap.containsKey(i)) {
+ valueMap.put(tblColRefMap.get(i), cell);
+ }
+ }
+ }
+ for (TblColRef tblColRef : valueMap.keySet()) {
+ final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+ @Nullable
+ @Override
+ public byte[] apply(String input) {
+ return input == null ? null : input.getBytes();
+ }
+ });
+ final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+ result.put(tblColRef, dict);
+ }
+ return result;
+ }
+
+ private Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, List<List<String>> streams) {
+ CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+ final List<Long> allCuboidIds = getAllCuboidIds(cubeDesc);
+ final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+
+
+ Lists.transform(allCuboidIds, new Function<Long, Integer[]>() {
+ @Nullable
+ @Override
+ public Integer[] apply(@Nullable Long cuboidId) {
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ Integer[] result = new Integer[bitSet.cardinality()];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < rowkeyLength; i++) {
+ if ((mask & cuboidId) > 0) {
+ result[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+ return result;
+ }
+ });
+ final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+ for (Long cuboidId : allCuboidIds) {
+ result.put(cuboidId, new HyperLogLogPlusCounter(14));
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < rowkeyLength; i++) {
+ if ((mask & cuboidId) > 0) {
+ cuboidBitSet[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+ allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+ }
+
+ HashFunction hf = Hashing.murmur3_32();
+ ByteArray[] row_hashcodes = new ByteArray[rowkeyLength];
+ for (int i = 0; i < rowkeyLength; i++) {
+ row_hashcodes[i] = new ByteArray();
+ }
+ for (List<String> row : streams) {
+ //generate hash for each row key column
+ for (int i = 0; i < rowkeyLength; i++) {
+ Hasher hc = hf.newHasher();
+ final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ if (cell != null) {
+ row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
+ } else {
+ row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+ }
+ }
+
+ for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) {
+ Long cuboidId = longHyperLogLogPlusCounterEntry.getKey();
+ HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue();
+ Hasher hc = hf.newHasher();
+ final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+ for (int position = 0; position < cuboidBitSet.length; position++) {
+ hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
+ }
+ counter.add(hc.hash().asBytes());
+ }
+ }
+ return result;
+ }
+
+ private void commitSegment(CubeSegment cubeSegment) throws IOException {
+ cubeSegment.setStatus(SegmentStatusEnum.READY);
+ CubeManager.getInstance(kylinConfig).updateCube(cubeSegment.getCubeInstance(), true);
+ }
+
+ private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
+ final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ List<Long> result = Lists.newArrayList();
+ CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+ getSubCuboidIds(cuboidScheduler, baseCuboidId, result);
+ return result;
+ }
+
+ private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long> result) {
+ result.add(parentCuboidId);
+ for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
+ getSubCuboidIds(cuboidScheduler, cuboidId, result);
+ }
+ }
+
+
+ private List<List<String>> parseStream(List<StreamMessage> streamMessages) {
+ return Lists.transform(streamMessages, new Function<StreamMessage, List<String>>() {
+ @Nullable
+ @Override
+ public List<String> apply(StreamMessage input) {
+ return getStreamParser().parse(input);
+ }
+ });
+ }
+
+ private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
+ final String hTableName = cubeSegment.getStorageLocationIdentifier();
+ String[] args = new String[]{"-cubename", cubeName,
+ "-segmentname", cubeSegment.getName(),
+ "-input", "/empty",
+ "-htablename", hTableName,
+ "-statisticsenabled", "true"};
+ ToolRunner.run(new CreateHTableJob(), args);
+ final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+ logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
+ return hTable;
+ }
+
+ private void loadToHTable(String hTableName) throws IOException {
+ final HTableInterface table = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+
+ }
+
+ @Override
+ protected void onStop() {
+
+ }
+
+ @Override
+ protected int batchInterval() {
+ return 30 * 60 * 1000;//30 min
+ }
+
+ @Override
+ protected int batchSize() {
+ return 1000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 7854fd5..3929098 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -35,6 +35,7 @@
package org.apache.kylin.job.streaming;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
@@ -42,20 +43,22 @@ import kafka.javaapi.PartitionMetadata;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.streaming.*;
import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
/**
*/
@@ -65,14 +68,12 @@ public class StreamingBootstrap {
private KylinConfig kylinConfig;
private StreamingManager streamingManager;
- private IIManager iiManager;
private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
private StreamingBootstrap(KylinConfig kylinConfig) {
this.kylinConfig = kylinConfig;
this.streamingManager = StreamingManager.getInstance(kylinConfig);
- this.iiManager = IIManager.getInstance(kylinConfig);
}
public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
@@ -115,10 +116,73 @@ public class StreamingBootstrap {
public void start(String streaming, int partitionId) throws Exception {
final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
- final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
- Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
+
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
+
+ if (!StringUtils.isEmpty(kafkaConfig.getIiName())) {
+ startIIStreaming(kafkaConfig, partitionId, partitionCount);
+ } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) {
+ startCubeStreaming(kafkaConfig, partitionId, partitionCount);
+ } else {
+ throw new IllegalArgumentException("no cube or ii in kafka config");
+ }
+ }
+
+ private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
+ List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
+ for (int partitionId = 0 ; partitionId < partitionCount && partitionId < 10; ++partitionId) {
+ final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+ long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+ final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
+ streamingOffset = Math.max(streamingOffset, latestOffset);
+ KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
+ streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+ Executors.newSingleThreadExecutor().submit(consumer);
+ result.add(consumer.getStreamQueue(0));
+ }
+ return result;
+ }
+
+ private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
+ final String cubeName = kafkaConfig.getCubeName();
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+ final List<BlockingQueue<StreamMessage>> queues = consume(kafkaConfig, partitionCount);
+ final LinkedBlockingDeque<StreamMessage> streamQueue = new LinkedBlockingDeque<>();
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ for (BlockingQueue<StreamMessage> queue : queues) {
+ try {
+ streamQueue.put(queue.take());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ });
+ CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
+ cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+ final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+ future.get();
+ }
+
+ private StreamParser getStreamParser(KafkaConfig kafkaConfig, List<TblColRef> columns) throws Exception {
+ if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
+ Class clazz = Class.forName(kafkaConfig.getParserName());
+ Constructor constructor = clazz.getConstructor(List.class);
+ return (StreamParser) constructor.newInstance(columns);
+ } else {
+ return new JsonStreamParser(columns);
+ }
+ }
+
+ private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
+ final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
+ Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
Preconditions.checkArgument(ii.getSegments().size() > 0);
final IISegment iiSegment = ii.getSegments().get(0);
@@ -129,7 +193,8 @@ public class StreamingBootstrap {
final int parallelism = shard / partitionCount;
final int startShard = partitionId * parallelism;
final int endShard = startShard + parallelism;
- long streamingOffset = getEarliestStreamingOffset(streaming, startShard, endShard);
+
+ long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), startShard, endShard);
streamingOffset = streamingOffset - (streamingOffset % parallelism);
logger.info("offset from ii desc is " + streamingOffset);
final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
@@ -143,21 +208,14 @@ public class StreamingBootstrap {
}
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
- kafkaConsumers.put(getKey(streaming, partitionId), consumer);
+ kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
+
- StreamParser parser;
- if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
- Class clazz = Class.forName(kafkaConfig.getParserName());
- Constructor constructor = clazz.getConstructor(List.class);
- parser = (StreamParser) constructor.newInstance(ii.getDescriptor().listAllColumns());
- } else {
- parser = new JsonStreamParser(ii.getDescriptor().listAllColumns());
- }
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streaming, iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
- task.setStreamParser(parser);
+ final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
+ task.setStreamParser(getStreamParser(kafkaConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index a437bba..319d7fa 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -55,11 +55,11 @@ import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
-import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
index 2432220..71c9644 100644
--- a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
@@ -34,19 +34,19 @@
package org.apache.kylin.job;
-import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.junit.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
/**
*/
@@ -63,30 +63,6 @@ public class ITKafkaBasedIIStreamBuilderTest {
System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
}
- @AfterClass
- public static void afterClass() throws Exception {
-// backup();
- }
-
- private static void backup() throws Exception {
- int exitCode = cleanupOldStorage();
- if (exitCode == 0) {
- exportHBaseData();
- }
- }
-
- private static int cleanupOldStorage() throws Exception {
- String[] args = {"--delete", "true"};
-
- int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
- return exitCode;
- }
-
- private static void exportHBaseData() throws IOException {
- ExportHBaseData export = new ExportHBaseData();
- export.exportTables();
- }
-
@Before
public void before() throws Exception {
HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
index c4bda5f..781273d 100644
--- a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
@@ -38,10 +38,10 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.lookup.FileTableReader;
-import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
-import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
deleted file mode 100644
index f61aa66..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- */
-@Ignore
-public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
-
- private static final int BENCHMARK_RECORD_LIMIT = 2000000;
- private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
-
- @Before
- public void setUp() throws Exception {
- this.createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
- final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
- final CubeDesc desc = cubeSegment.getCubeDesc();
- for (DimensionDesc dim : desc.getDimensions()) {
- // dictionary
- for (TblColRef col : dim.getColumnRefs()) {
- if (desc.getRowkey().isUseDictionary(col)) {
- Dictionary dict = cubeSegment.getDictionary(col);
- if (dict == null) {
- throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
- }
- logger.info("Dictionary for " + col + " was put into dictionary map.");
- dictionaryMap.put(col, cubeSegment.getDictionary(col));
- }
- }
- }
- return dictionaryMap;
- }
-
- private static class ConsoleGTRecordWriter implements IGTRecordWriter {
-
- boolean verbose = false;
-
- @Override
- public void write(Long cuboidId, GTRecord record) throws IOException {
- if (verbose)
- System.out.println(record.toString());
- }
- }
-
- private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
- BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
- String line;
- int counter = 0;
- while ((line = br.readLine()) != null) {
- queue.put(Arrays.asList(line.split("\t")));
- counter++;
- if (counter == BENCHMARK_RECORD_LIMIT) {
- break;
- }
- }
- queue.put(Collections.emptyList());
- }
-
- private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
- queue.put(Collections.emptyList());
- }
-
-
- @Test
- public void test() throws Exception {
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
- final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
- final CubeSegment cubeSegment = cube.getFirstSegment();
-
- LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
-
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cubeBuilder);
- loadDataFromLocalFile(queue);
- future.get();
- logger.info("stream build finished");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
new file mode 100644
index 0000000..9d62fda
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
@@ -0,0 +1,76 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.streaming.StreamMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ */
+public class CubeStreamBuilderTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilderTest.class);
+
+ private KylinConfig kylinConfig;
+
+ private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ DeployUtil.initCliWorkDir();
+ DeployUtil.deployMetadata();
+ DeployUtil.overrideJobJarLocations();
+ final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
+ cube.getSegments().clear();
+ CubeManager.getInstance(kylinConfig).updateCube(cube, true);
+
+ }
+
+ @Test
+ public void test() throws Exception {
+ LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+ CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, CUBE_NAME);
+ final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+ loadDataFromLocalFile(queue, 100000);
+ future.get();
+ }
+
+ private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+ String line;
+ int count = 0;
+ while ((line = br.readLine()) != null && count++ < maxCount) {
+ final List<String> strings = Arrays.asList(line.split("\t"));
+ queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
+ }
+ queue.put(StreamMessage.EOF);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index f880b73..0d720ab 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -29,6 +29,7 @@ import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
+import org.apache.kylin.storage.hbase.CubeStorageEngine;
import org.apache.kylin.storage.hbase.InvertedIndexStorageEngine;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.apache.kylin.storage.hybrid.HybridStorageEngine;
@@ -51,7 +52,7 @@ public class StorageEngineFactory {
return ret;
}
} else if (realization.getType() == RealizationType.CUBE) {
- ICachableStorageEngine ret = new org.apache.kylin.storage.cube.CubeStorageEngine((CubeInstance) realization);
+ ICachableStorageEngine ret = new CubeStorageEngine((CubeInstance) realization);
if (allowStorageLayerCache) {
return wrapWithCache(ret, realization);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index 9949c96..b6f5025 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -75,6 +75,9 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("iiName")
private String iiName;
+ @JsonProperty("cubeName")
+ private String cubeName;
+
@JsonProperty("parserName")
private String parserName;
@@ -86,7 +89,6 @@ public class KafkaConfig extends RootPersistentEntity {
this.parserName = parserName;
}
-
public int getTimeout() {
return timeout;
}
@@ -133,6 +135,14 @@ public class KafkaConfig extends RootPersistentEntity {
});
}
+ public String getCubeName() {
+ return cubeName;
+ }
+
+ public void setCubeName(String cubeName) {
+ this.cubeName = cubeName;
+ }
+
public String getIiName() {
return iiName;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
new file mode 100644
index 0000000..5dab7f9
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
@@ -0,0 +1,100 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public final class SEOJsonStreamParser implements StreamParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(SEOJsonStreamParser.class);
+
+ private final List<TblColRef> allColumns;
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+ public SEOJsonStreamParser(List<TblColRef> allColumns) {
+ this.allColumns = allColumns;
+ }
+
+ @Override
+ public List<String> parse(StreamMessage stream) {
+ try {
+ Map<String, String> root = mapper.readValue(stream.getRawData(), mapType);
+ String trafficSource = root.get("trafficsourceid");
+ if ("20".equals(trafficSource) || "21".equals(trafficSource) || "22".equals(trafficSource) || "23".equals(trafficSource)) {
+ ArrayList<String> result = Lists.newArrayList();
+ for (TblColRef column : allColumns) {
+ String columnName = column.getName();
+ if (columnName.equalsIgnoreCase("minute_start")) {
+ result.add(String.valueOf(TimeUtil.getMinuteStart(Long.valueOf(root.get("timestamp")))));
+ } else if (columnName.equalsIgnoreCase("hour_start")) {
+ result.add(String.valueOf(TimeUtil.getHourStart(Long.valueOf(root.get("timestamp")))));
+ } else if (columnName.equalsIgnoreCase("day")) {
+ //of day start we'll add yyyy-mm-dd
+ long ts = TimeUtil.getDayStart(Long.valueOf(root.get("timestamp")));
+ result.add(DateFormat.formatToDateStr(ts));
+ } else {
+ String x = root.get(columnName.toLowerCase());
+ result.add(x);
+ }
+ }
+
+ return result;
+ } else {
+ return null;
+ }
+ } catch (IOException e) {
+ logger.error("error parsing:" + new String(stream.getRawData()), e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 67cb109..e9cb046 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -48,16 +48,14 @@ public abstract class StreamBuilder implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
- private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000;
- private final int sliceSize;
+
private StreamParser streamParser = StringStreamParser.instance;
private BlockingQueue<StreamMessage> streamMessageQueue;
private long lastBuildTime = System.currentTimeMillis();
- public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, int sliceSize) {
+ public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue) {
this.streamMessageQueue = streamMessageQueue;
- this.sliceSize = sliceSize;
}
protected abstract void build(List<StreamMessage> streamsToBuild) throws Exception;
@@ -81,10 +79,11 @@ public abstract class StreamBuilder implements Runnable {
logger.warn("stream queue interrupted", e);
continue;
}
- if (streamMessage == null) {
-
- logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size());
- if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
+ if (streamMessage == null || getStreamParser().parse(streamMessage) == null) {
+ if (streamMessage == null) {
+ logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size());
+ }
+ if ((System.currentTimeMillis() - lastBuildTime) > batchInterval()) {
build(streamMessageToBuild);
clearCounter();
streamMessageToBuild.clear();
@@ -98,7 +97,7 @@ public abstract class StreamBuilder implements Runnable {
}
}
streamMessageToBuild.add(streamMessage);
- if (streamMessageToBuild.size() >= this.sliceSize) {
+ if (streamMessageToBuild.size() >= batchSize()) {
build(streamMessageToBuild);
clearCounter();
streamMessageToBuild.clear();
@@ -117,4 +116,7 @@ public abstract class StreamBuilder implements Runnable {
public final void setStreamParser(StreamParser streamParser) {
this.streamParser = streamParser;
}
+
+ protected abstract int batchInterval();
+ protected abstract int batchSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
new file mode 100644
index 0000000..2d1e97e
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
@@ -0,0 +1,11 @@
+package org.apache.kylin.streaming.cube;
+
+import org.apache.kylin.storage.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public interface IGTRecordWriter {
+ void write(Long cuboidId, GTRecord record) throws IOException;
+}
[2/4] incubator-kylin git commit: streaming cubing: fix demo
Posted by ma...@apache.org.
streaming cubing: fix demo
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/341cda00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/341cda00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/341cda00
Branch: refs/heads/0.8.0
Commit: 341cda005b5805ffa678f18fa85d354a2a4a1529
Parents: a6a9d94
Author: honma <ho...@ebay.com>
Authored: Thu May 28 23:06:12 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri May 29 15:44:25 2015 +0800
----------------------------------------------------------------------
.../main/java/org/apache/kylin/common/KylinConfig.java | 3 +--
.../java/org/apache/kylin/common/util/BasicTest.java | 1 +
.../apache/kylin/job/streaming/CubeStreamBuilder.java | 3 +--
.../apache/kylin/job/streaming/StreamingBootstrap.java | 13 ++++++-------
.../org/apache/kylin/storage/StorageEngineFactory.java | 2 +-
5 files changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index cbb049d..b049fd0 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -19,7 +19,6 @@
package org.apache.kylin.common;
import com.google.common.collect.Sets;
-import jodd.util.StringUtil;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.IOUtils;
@@ -496,7 +495,7 @@ public class KylinConfig {
private String[] getOptionalStringArray(String prop) {
final String property = System.getProperty(prop);
- if (!StringUtil.isBlank(property))
+ if (!StringUtils.isBlank(property))
return property.split("\\s*,\\s*");
return kylinConfig.getStringArray(prop);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 98ee807..daab36f 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -62,6 +62,7 @@ public class BasicTest {
@Test
public void test0() throws Exception {
+ System.out.println(Long.MAX_VALUE);
IdentityHashMap<String, Void> a = new IdentityHashMap<>();
IdentityHashMap<String, Void> b = new IdentityHashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 82892dc..0bd2792 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -9,7 +9,6 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -393,7 +392,7 @@ public class CubeStreamBuilder extends StreamBuilder {
@Override
protected int batchInterval() {
- return 30 * 60 * 1000;//30 min
+ return 5 * 60 * 1000;//30 min
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 3929098..0c67d78 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@@ -131,13 +130,14 @@ public class StreamingBootstrap {
private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
- for (int partitionId = 0 ; partitionId < partitionCount && partitionId < 10; ++partitionId) {
+ for (int partitionId = 0; partitionId < partitionCount && partitionId < 3; ++partitionId) {
final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
- long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+
final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
- streamingOffset = Math.max(streamingOffset, latestOffset);
- KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
- streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+ long streamingOffset = latestOffset;
+ logger.info("submitting offset:" + streamingOffset);
+
+ KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
Executors.newSingleThreadExecutor().submit(consumer);
result.add(consumer.getStreamQueue(0));
}
@@ -210,7 +210,6 @@ public class StreamingBootstrap {
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
-
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 0d720ab..5fc757a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
* @author xjiang
*/
public class StorageEngineFactory {
- private static boolean allowStorageLayerCache = true;
+ private static boolean allowStorageLayerCache = false;
public static IStorageEngine getStorageEngine(IRealization realization) {