You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/03 12:04:26 UTC
[2/2] incubator-kylin git commit: KYLIN-808
KYLIN-808
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3eee24f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3eee24f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3eee24f9
Branch: refs/heads/0.8.0
Commit: 3eee24f945b2a770d6acb9b60113a21ccaba83fc
Parents: 30acdb9
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Jun 2 16:48:23 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jun 3 18:01:33 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/CubeStreamBuilder.java | 383 -------------------
.../kylin/job/streaming/CubeStreamConsumer.java | 370 ++++++++++++++++++
.../kylin/job/streaming/StreamingBootstrap.java | 37 +-
.../apache/kylin/job/BuildIIWithStreamTest.java | 6 +-
.../kylin/job/hadoop/invertedindex/IITest.java | 35 +-
.../job/streaming/CubeStreamBuilderTest.java | 81 ----
.../job/streaming/CubeStreamConsumerTest.java | 83 ++++
.../kylin/streaming/JsonStreamParser.java | 2 +
.../kylin/streaming/MicroBatchCondition.java | 22 ++
.../kylin/streaming/MicroStreamBatch.java | 45 ++-
.../streaming/MicroStreamBatchConsumer.java | 11 +
.../apache/kylin/streaming/StreamBuilder.java | 199 ++++++----
.../invertedindex/IIStreamBuilder.java | 164 --------
.../invertedindex/IIStreamConsumer.java | 148 +++++++
14 files changed, 828 insertions(+), 758 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
deleted file mode 100644
index 6914b73..0000000
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ /dev/null
@@ -1,383 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.TableSignature;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
-import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
-import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
-import org.apache.kylin.job.inmemcubing.ICuboidWriter;
-import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- */
-public class CubeStreamBuilder extends StreamBuilder {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilder.class);
-
- private final CubeManager cubeManager;
- private final String cubeName;
- private final KylinConfig kylinConfig;
- private final ExecutorService executorService = Executors.newSingleThreadExecutor();
-
-
- public CubeStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, String cubeName) {
- super(streamMessageQueue);
- this.kylinConfig = KylinConfig.getInstanceFromEnv();
- this.cubeManager = CubeManager.getInstance(kylinConfig);
- this.cubeName = cubeName;
- }
-
- @Override
- protected void build(MicroStreamBatch microStreamBatch) throws Exception {
- if (microStreamBatch.size() == 0) {
- logger.info("nothing to build, skip to next iteration");
- return;
- }
- final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
- long startOffset = microStreamBatch.getOffset().getFirst();
- long endOffset = microStreamBatch.getOffset().getSecond();
- LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
- blockingQueue.put(Collections.<String>emptyList());
-
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- final CubeDesc cubeDesc = cubeInstance.getDescriptor();
- final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis(), false, false);
- final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
-
- final Configuration conf = HadoopUtil.getCurrentConfiguration();
- final Path outputPath = new Path("file:///tmp/cuboidstatistics/" + UUID.randomUUID().toString());
- FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
- ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
-
- final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
- writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
-
- final HTableInterface hTable = createHTable(cubeSegment);
-
- final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
- InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(),
- dictionaryMap, gtRecordWriter);
-
- executorService.submit(inMemCubeBuilder).get();
- gtRecordWriter.flush();
- commitSegment(cubeSegment);
- }
-
- private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
- for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
- final TblColRef tblColRef = entry.getKey();
- final Dictionary<?> dictionary = entry.getValue();
- TableSignature signature = new TableSignature();
- signature.setLastModifiedTime(System.currentTimeMillis());
- signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
- signature.setSize(endOffset - startOffset);
- DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(),
- tblColRef.getName(),
- tblColRef.getColumnDesc().getZeroBasedIndex(),
- tblColRef.getDatatype(),
- signature,
- ReadableTable.DELIM_AUTO);
- logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
- DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
- try {
- cubeSegment.putDictResPath(tblColRef, dictionaryManager.trySaveNewDict(dictionary, dictInfo).getResourcePath());
- } catch (IOException e) {
- logger.error("error save dictionary for column:" + tblColRef, e);
- throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
- }
- }
- }
-
- private class CubeStreamRecordWriter implements ICuboidWriter {
- final List<InMemKeyValueCreator> keyValueCreators;
- final int nColumns;
- final HTableInterface hTable;
- private final ByteBuffer byteBuffer;
- private final CubeDesc cubeDesc;
- private List<Put> puts = Lists.newArrayList();
-
- private CubeStreamRecordWriter(CubeDesc cubeDesc, HTableInterface hTable) {
- this.keyValueCreators = Lists.newArrayList();
- this.cubeDesc = cubeDesc;
- int startPosition = 0;
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
- startPosition += colDesc.getMeasures().length;
- }
- }
- this.nColumns = keyValueCreators.size();
- this.hTable = hTable;
- this.byteBuffer = ByteBuffer.allocate(1<<20);
- }
-
- private byte[] copy(byte[] array, int offset, int length) {
- byte[] result = new byte[length];
- System.arraycopy(array, offset, result, 0, length);
- return result;
- }
-
- private ByteBuffer createKey(Long cuboidId, GTRecord record) {
- byteBuffer.clear();
- byteBuffer.put(Bytes.toBytes(cuboidId));
- final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality();
- for (int i = 0; i < cardinality; i++) {
- final ByteArray byteArray = record.get(i);
- byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
- }
- return byteBuffer;
- }
-
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
- final ByteBuffer key = createKey(cuboidId, record);
- final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
- final ImmutableBitSet bitSet = new ImmutableBitSet(mapping.getDimensionCount(), mapping.getColumnCount());
- for (int i = 0; i < nColumns; i++) {
- final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
- final Put put = new Put(copy(key.array(), 0, key.position()));
- byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
- byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
- byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
- put.add(family, qualifier, value);
- puts.add(put);
- }
- if (puts.size() >= batchSize()) {
- flush();
- }
- }
-
- public final void flush() {
- try {
- if (!puts.isEmpty()) {
- long t = System.currentTimeMillis();
- if (hTable != null) {
- hTable.put(puts);
- hTable.flushCommits();
- }
- logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
- puts.clear();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private Map<Integer, TblColRef> getTblColRefMap(CubeInstance cubeInstance) {
- final List<TblColRef> columns = cubeInstance.getAllColumns();
- final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
- final HashMap<Integer, TblColRef> result = Maps.newHashMap();
- for (int i = 0; i < columns.size(); i++) {
- final TblColRef tblColRef = columns.get(i);
- if (allDimensions.contains(tblColRef)) {
- result.put(i, tblColRef);
- }
- }
- return result;
- }
-
- private Map<TblColRef, Dictionary<?>> buildDictionary(final Map<Integer, TblColRef> tblColRefMap, List<List<String>> recordList) throws IOException {
- HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
-
- HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
- for (List<String> row : recordList) {
- for (int i = 0; i < row.size(); i++) {
- String cell = row.get(i);
- if (tblColRefMap.containsKey(i)) {
- valueMap.put(tblColRefMap.get(i), cell);
- }
- }
- }
- for (TblColRef tblColRef : valueMap.keySet()) {
- final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
- @Nullable
- @Override
- public byte[] apply(String input) {
- return input == null ? null : input.getBytes();
- }
- });
- final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
- result.put(tblColRef, dict);
- }
- return result;
- }
-
- private Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, List<List<String>> streams) {
- CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
- final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
- final List<Long> allCuboidIds = getAllCuboidIds(cubeDesc);
- final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
-
-
- Lists.transform(allCuboidIds, new Function<Long, Integer[]>() {
- @Nullable
- @Override
- public Integer[] apply(@Nullable Long cuboidId) {
- BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
- Integer[] result = new Integer[bitSet.cardinality()];
-
- long mask = Long.highestOneBit(baseCuboidId);
- int position = 0;
- for (int i = 0; i < rowkeyLength; i++) {
- if ((mask & cuboidId) > 0) {
- result[position] = i;
- position++;
- }
- mask = mask >> 1;
- }
- return result;
- }
- });
- final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
- for (Long cuboidId : allCuboidIds) {
- result.put(cuboidId, new HyperLogLogPlusCounter(14));
- BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
- Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
-
- long mask = Long.highestOneBit(baseCuboidId);
- int position = 0;
- for (int i = 0; i < rowkeyLength; i++) {
- if ((mask & cuboidId) > 0) {
- cuboidBitSet[position] = i;
- position++;
- }
- mask = mask >> 1;
- }
- allCuboidsBitSet.put(cuboidId, cuboidBitSet);
- }
-
- HashFunction hf = Hashing.murmur3_32();
- ByteArray[] row_hashcodes = new ByteArray[rowkeyLength];
- for (int i = 0; i < rowkeyLength; i++) {
- row_hashcodes[i] = new ByteArray();
- }
- for (List<String> row : streams) {
- //generate hash for each row key column
- for (int i = 0; i < rowkeyLength; i++) {
- Hasher hc = hf.newHasher();
- final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
- if (cell != null) {
- row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
- } else {
- row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
- }
- }
-
- for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) {
- Long cuboidId = longHyperLogLogPlusCounterEntry.getKey();
- HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue();
- Hasher hc = hf.newHasher();
- final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
- for (int position = 0; position < cuboidBitSet.length; position++) {
- hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
- }
- counter.add(hc.hash().asBytes());
- }
- }
- return result;
- }
-
- //TODO: should we use cubeManager.promoteNewlyBuiltSegments?
- private void commitSegment(CubeSegment cubeSegment) throws IOException {
- cubeSegment.setStatus(SegmentStatusEnum.READY);
- CubeBuilder cubeBuilder = new CubeBuilder(cubeSegment.getCubeInstance());
- cubeBuilder.setToAddSegs(cubeSegment);
- CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
- }
-
- private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
- final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- List<Long> result = Lists.newArrayList();
- CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
- getSubCuboidIds(cuboidScheduler, baseCuboidId, result);
- return result;
- }
-
- private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long> result) {
- result.add(parentCuboidId);
- for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
- getSubCuboidIds(cuboidScheduler, cuboidId, result);
- }
- }
-
-
- private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
- final String hTableName = cubeSegment.getStorageLocationIdentifier();
- CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
- final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
- logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
- return hTable;
- }
-
- @Override
- protected void onStop() {
-
- }
-
- @Override
- protected int batchInterval() {
- return 5 * 60 * 1000;//5 min
- }
-
- @Override
- protected int batchSize() {
- return 1000;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
new file mode 100644
index 0000000..a229c65
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -0,0 +1,370 @@
+package org.apache.kylin.job.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ReadableTable;
+import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
+import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.MicroStreamBatchConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class CubeStreamConsumer implements MicroStreamBatchConsumer {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumer.class);
+
+ private final CubeManager cubeManager;
+ private final String cubeName;
+ private final KylinConfig kylinConfig;
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ private static final int BATCH_PUT_THRESHOLD = 10000;
+
+
+ public CubeStreamConsumer(String cubeName) {
+ this.kylinConfig = KylinConfig.getInstanceFromEnv();
+ this.cubeManager = CubeManager.getInstance(kylinConfig);
+ this.cubeName = cubeName;
+ }
+
+ @Override
+ public void consume(MicroStreamBatch microStreamBatch) throws Exception {
+ if (microStreamBatch.size() == 0) {
+ logger.info("nothing to build, skip to next iteration");
+ return;
+ }
+ final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
+ long startOffset = microStreamBatch.getOffset().getFirst();
+ long endOffset = microStreamBatch.getOffset().getSecond();
+ LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+ blockingQueue.put(Collections.<String>emptyList());
+
+ final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis(), false, false);
+ final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+
+ final Configuration conf = HadoopUtil.getCurrentConfiguration();
+ final Path outputPath = new Path("file:///tmp/cuboidstatistics/" + UUID.randomUUID().toString());
+ FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+ ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
+
+ final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(cubeInstance, parsedStreamMessages);
+ writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+
+ final HTableInterface hTable = createHTable(cubeSegment);
+
+ final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
+ InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(),
+ dictionaryMap, gtRecordWriter);
+
+ executorService.submit(inMemCubeBuilder).get();
+ gtRecordWriter.flush();
+ commitSegment(cubeSegment);
+ }
+
+ private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
+ for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
+ final TblColRef tblColRef = entry.getKey();
+ final Dictionary<?> dictionary = entry.getValue();
+ TableSignature signature = new TableSignature();
+ signature.setLastModifiedTime(System.currentTimeMillis());
+ signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
+ signature.setSize(endOffset - startOffset);
+ DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(),
+ tblColRef.getName(),
+ tblColRef.getColumnDesc().getZeroBasedIndex(),
+ tblColRef.getDatatype(),
+ signature,
+ ReadableTable.DELIM_AUTO);
+ logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+ DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
+ try {
+ cubeSegment.putDictResPath(tblColRef, dictionaryManager.trySaveNewDict(dictionary, dictInfo).getResourcePath());
+ } catch (IOException e) {
+ logger.error("error save dictionary for column:" + tblColRef, e);
+ throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+ }
+ }
+ }
+
+ private class CubeStreamRecordWriter implements ICuboidWriter {
+ final List<InMemKeyValueCreator> keyValueCreators;
+ final int nColumns;
+ final HTableInterface hTable;
+ private final ByteBuffer byteBuffer;
+ private final CubeDesc cubeDesc;
+ private List<Put> puts = Lists.newArrayList();
+
+ private CubeStreamRecordWriter(CubeDesc cubeDesc, HTableInterface hTable) {
+ this.keyValueCreators = Lists.newArrayList();
+ this.cubeDesc = cubeDesc;
+ int startPosition = 0;
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
+ startPosition += colDesc.getMeasures().length;
+ }
+ }
+ this.nColumns = keyValueCreators.size();
+ this.hTable = hTable;
+ this.byteBuffer = ByteBuffer.allocate(1<<20);
+ }
+
+ private byte[] copy(byte[] array, int offset, int length) {
+ byte[] result = new byte[length];
+ System.arraycopy(array, offset, result, 0, length);
+ return result;
+ }
+
+ private ByteBuffer createKey(Long cuboidId, GTRecord record) {
+ byteBuffer.clear();
+ byteBuffer.put(Bytes.toBytes(cuboidId));
+ final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+ for (int i = 0; i < cardinality; i++) {
+ final ByteArray byteArray = record.get(i);
+ byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
+ }
+ return byteBuffer;
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ final ByteBuffer key = createKey(cuboidId, record);
+ final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
+ final ImmutableBitSet bitSet = new ImmutableBitSet(mapping.getDimensionCount(), mapping.getColumnCount());
+ for (int i = 0; i < nColumns; i++) {
+ final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
+ final Put put = new Put(copy(key.array(), 0, key.position()));
+ byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
+ byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
+ byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
+ put.add(family, qualifier, value);
+ puts.add(put);
+ }
+ if (puts.size() >= BATCH_PUT_THRESHOLD) {
+ flush();
+ }
+ }
+
+ public final void flush() {
+ try {
+ if (!puts.isEmpty()) {
+ long t = System.currentTimeMillis();
+ if (hTable != null) {
+ hTable.put(puts);
+ hTable.flushCommits();
+ }
+ logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
+ puts.clear();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Map<TblColRef, Dictionary<?>> buildDictionary(final CubeInstance cubeInstance, List<List<String>> recordList) throws IOException {
+ final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor().listDimensionColumnsExcludingDerived();
+ final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
+ final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+ for (TblColRef column: columnsNeedToBuildDictionary) {
+ final int index = allDimensions.indexOf(column);
+ Preconditions.checkArgument(index >= 0);
+ tblColRefMap.put(index, column);
+ }
+
+ HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+
+ HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+ for (List<String> row : recordList) {
+ for (int i = 0; i < row.size(); i++) {
+ String cell = row.get(i);
+ if (tblColRefMap.containsKey(i)) {
+ valueMap.put(tblColRefMap.get(i), cell);
+ }
+ }
+ }
+ for (TblColRef tblColRef : valueMap.keySet()) {
+ final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+ @Nullable
+ @Override
+ public byte[] apply(String input) {
+ return input == null ? null : input.getBytes();
+ }
+ });
+ final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+ result.put(tblColRef, dict);
+ }
+ return result;
+ }
+
+ private Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, List<List<String>> streams) {
+ CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+ final List<Long> allCuboidIds = getAllCuboidIds(cubeDesc);
+ final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+
+
+ Lists.transform(allCuboidIds, new Function<Long, Integer[]>() {
+ @Nullable
+ @Override
+ public Integer[] apply(@Nullable Long cuboidId) {
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ Integer[] result = new Integer[bitSet.cardinality()];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < rowkeyLength; i++) {
+ if ((mask & cuboidId) > 0) {
+ result[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+ return result;
+ }
+ });
+ final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+ for (Long cuboidId : allCuboidIds) {
+ result.put(cuboidId, new HyperLogLogPlusCounter(14));
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < rowkeyLength; i++) {
+ if ((mask & cuboidId) > 0) {
+ cuboidBitSet[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+ allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+ }
+
+ HashFunction hf = Hashing.murmur3_32();
+ ByteArray[] row_hashcodes = new ByteArray[rowkeyLength];
+ for (int i = 0; i < rowkeyLength; i++) {
+ row_hashcodes[i] = new ByteArray();
+ }
+ for (List<String> row : streams) {
+ //generate hash for each row key column
+ for (int i = 0; i < rowkeyLength; i++) {
+ Hasher hc = hf.newHasher();
+ final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ if (cell != null) {
+ row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
+ } else {
+ row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+ }
+ }
+
+ for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) {
+ Long cuboidId = longHyperLogLogPlusCounterEntry.getKey();
+ HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue();
+ Hasher hc = hf.newHasher();
+ final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+ for (int position = 0; position < cuboidBitSet.length; position++) {
+ hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
+ }
+ counter.add(hc.hash().asBytes());
+ }
+ }
+ return result;
+ }
+
+ //TODO: should we use cubeManager.promoteNewlyBuiltSegments?
+ private void commitSegment(CubeSegment cubeSegment) throws IOException {
+ cubeSegment.setStatus(SegmentStatusEnum.READY);
+ CubeBuilder cubeBuilder = new CubeBuilder(cubeSegment.getCubeInstance());
+ cubeBuilder.setToAddSegs(cubeSegment);
+ CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+ }
+
+ private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
+ final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ List<Long> result = Lists.newArrayList();
+ CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+ getSubCuboidIds(cuboidScheduler, baseCuboidId, result);
+ return result;
+ }
+
+ private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long> result) {
+ result.add(parentCuboidId);
+ for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
+ getSubCuboidIds(cuboidScheduler, cuboidId, result);
+ }
+ }
+
+
+ private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
+ final String hTableName = cubeSegment.getStorageLocationIdentifier();
+ CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
+ final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+ logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
+ return hTable;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 36f7dcf..cdade80 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -48,9 +48,10 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.streaming.*;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,7 +145,7 @@ public class StreamingBootstrap {
private void startCubeStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
List<KafkaClusterConfig> kafkaClusterConfigs = streamingConfig.getKafkaClusterConfigs();
- final List<List<BlockingQueue<StreamMessage>>> allClustersData = Lists.newArrayList();
+ final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
@@ -152,35 +153,13 @@ public class StreamingBootstrap {
final List<BlockingQueue<StreamMessage>> oneClusterData = consume(kafkaClusterConfig, partitionCount);
logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
- allClustersData.add(oneClusterData);
+ allClustersData.addAll(oneClusterData);
}
- final LinkedBlockingDeque<StreamMessage> alldata = new LinkedBlockingDeque<>();
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- int totalMessage = 0;
- while (true) {
- for (List<BlockingQueue<StreamMessage>> oneCluster : allClustersData) {
- for (BlockingQueue<StreamMessage> onePartition : oneCluster) {
- try {
- alldata.put(onePartition.take());
- if (totalMessage++ % 10000 == 0) {
- logger.info("Total stream message count: " + totalMessage);
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
- }
- });
-
final String cubeName = streamingConfig.getCubeName();
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(alldata, cubeName);
+ StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, new MicroBatchCondition(Integer.MAX_VALUE, 5 * 60 * 1000), new CubeStreamConsumer(cubeName));
cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
@@ -246,10 +225,14 @@ public class StreamingBootstrap {
KafkaConsumer consumer = new KafkaConsumer(kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, parallelism);
kafkaConsumers.put(getKey(streamingConfig.getName(), partitionId), consumer);
+ final IIDesc iiDesc = iiSegment.getIIDesc();
+
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
+ final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism),
+ new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
+ new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i));
task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index a73c5b9..43dc769 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -58,8 +58,10 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.streaming.MicroBatchCondition;
+import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -212,7 +214,7 @@ public class BuildIIWithStreamTest {
ToolRunner.run(new IICreateHTableJob(), args);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0);
+ final StreamBuilder streamBuilder = new StreamBuilder(queue, new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE), new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0));
List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
int count = sorted.size();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 7915080..080a2fd 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -63,10 +63,10 @@ public class IITest extends LocalFileMetadataTestCase {
List<IIRow> iiRows;
- final String[] inputData = new String[] { //
- "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
+ final String[] inputData = new String[]{ //
+ "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
"ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
- "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
+ "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0"};
@Before
public void setUp() throws Exception {
@@ -85,34 +85,15 @@ public class IITest extends LocalFileMetadataTestCase {
List<List<String>> parsedStreamMessages = Lists.newArrayList();
StreamParser parser = StringStreamParser.instance;
StreamFilter filter = DefaultStreamFilter.instance;
- long startOffset = Long.MAX_VALUE;
- long endOffset = Long.MIN_VALUE;
- long startTimestamp = Long.MAX_VALUE;
- long endTimestamp = Long.MIN_VALUE;
- for(StreamMessage message: streamMessages)
- {
+ MicroStreamBatch batch = new MicroStreamBatch();
+ for (StreamMessage message : streamMessages) {
ParsedStreamMessage parsedStreamMessage = parser.parse(message);
- if(filter.filter(parsedStreamMessage))
- {
- if (startOffset > parsedStreamMessage.getOffset()) {
- startOffset = parsedStreamMessage.getOffset();
- }
- if (endOffset < parsedStreamMessage.getOffset()) {
- endOffset = parsedStreamMessage.getOffset();
- }
- if (startTimestamp > parsedStreamMessage.getTimestamp()) {
- startTimestamp = parsedStreamMessage.getTimestamp();
- }
- if (endTimestamp < parsedStreamMessage.getTimestamp()) {
- endTimestamp = parsedStreamMessage.getTimestamp();
- }
- parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
+ if (filter.filter(parsedStreamMessage)) {
+ batch.add(parsedStreamMessage);
}
}
- MicroStreamBatch batch = new MicroStreamBatch(parsedStreamMessages, org.apache.kylin.common.util.Pair.newPair(startTimestamp, endTimestamp), org.apache.kylin.common.util.Pair.newPair(startOffset, endOffset));
-
iiRows = Lists.newArrayList();
final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
@@ -267,7 +248,7 @@ public class IITest extends LocalFileMetadataTestCase {
@Nullable
@Override
public Pair<ImmutableBytesWritable, Result> apply(@Nullable IIRow input) {
- return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[] { 1 }), Result.create(input.makeCells()));
+ return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[]{1}), Result.create(input.makeCells()));
}
})));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
deleted file mode 100644
index 3ef542d..0000000
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.streaming.StreamMessage;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-/**
- */
-@Ignore
-public class CubeStreamBuilderTest {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilderTest.class);
-
- private KylinConfig kylinConfig;
-
- private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
- }
-
- @Before
- public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- DeployUtil.overrideJobJarLocations();
- final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
- cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
- // remove all existing segments
- CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-
- }
-
- @Test
- public void test() throws Exception {
- LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
- CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, CUBE_NAME);
- final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
- loadDataFromLocalFile(queue, 100000);
- future.get();
- }
-
- private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
- BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
- String line;
- int count = 0;
- while ((line = br.readLine()) != null && count++ < maxCount) {
- final List<String> strings = Arrays.asList(line.split("\t"));
- queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
- }
- queue.put(StreamMessage.EOF);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
new file mode 100644
index 0000000..8377851
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.streaming.MicroBatchCondition;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ */
+@Ignore
+public class CubeStreamConsumerTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
+
+ private KylinConfig kylinConfig;
+
+ private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ DeployUtil.initCliWorkDir();
+ DeployUtil.deployMetadata();
+ DeployUtil.overrideJobJarLocations();
+ final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
+ CubeBuilder cubeBuilder = new CubeBuilder(cube);
+ cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+ // remove all existing segments
+ CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+
+ }
+
+ @Test
+ public void test() throws Exception {
+ LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+ StreamBuilder cubeStreamBuilder = new StreamBuilder(queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME));
+ final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+ loadDataFromLocalFile(queue, 100000);
+ future.get();
+ }
+
+ private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+ String line;
+ int count = 0;
+ while ((line = br.readLine()) != null && count++ < maxCount) {
+ final List<String> strings = Arrays.asList(line.split("\t"));
+ queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
+ }
+ queue.put(StreamMessage.EOF);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
index 78de231..1ca881e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
@@ -72,8 +72,10 @@ public final class JsonStreamParser implements StreamParser {
for (Map.Entry<String, String> entry : json.entrySet()) {
if (entry.getKey().equalsIgnoreCase(column.getName())) {
result.add(entry.getValue());
+ continue;
}
}
+ result.add(null);
}
return new ParsedStreamMessage(result, streamMessage.getOffset(), streamMessage.getOffset());
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
new file mode 100644
index 0000000..baf7b04
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
@@ -0,0 +1,22 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public final class MicroBatchCondition {
+
+ private final int batchSize;
+ private final int batchInterval;
+
+ public MicroBatchCondition(int batchSize, int batchInterval) {
+ this.batchSize = batchSize;
+ this.batchInterval = batchInterval;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public int getBatchInterval() {
+ return batchInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index 0adcee2..268c98c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -1,5 +1,6 @@
package org.apache.kylin.streaming;
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.Pair;
import java.util.List;
@@ -14,10 +15,16 @@ public final class MicroStreamBatch {
private final Pair<Long, Long> offset;
- public MicroStreamBatch(List<List<String>> streams, Pair<Long, Long> timestamp, Pair<Long, Long> offset) {
- this.streams = streams;
- this.timestamp = timestamp;
- this.offset = offset;
+ public MicroStreamBatch() {
+ this.streams = Lists.newLinkedList();
+ this.timestamp = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
+ this.offset = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
+ }
+
+ private MicroStreamBatch(MicroStreamBatch batch) {
+ this.streams = Lists.newLinkedList(batch.streams);
+ this.timestamp = Pair.newPair(batch.timestamp.getFirst(), batch.timestamp.getSecond());
+ this.offset = Pair.newPair(batch.offset.getFirst(), batch.offset.getSecond());
}
public final List<List<String>> getStreams() {
@@ -35,4 +42,34 @@ public final class MicroStreamBatch {
public final int size() {
return streams.size();
}
+
+ public final void add(ParsedStreamMessage parsedStreamMessage) {
+ if (offset.getFirst() > parsedStreamMessage.getOffset()) {
+ offset.setFirst(parsedStreamMessage.getOffset());
+ }
+ if (offset.getSecond() < parsedStreamMessage.getOffset()) {
+ offset.setSecond(parsedStreamMessage.getOffset());
+ }
+ if (timestamp.getFirst() > parsedStreamMessage.getTimestamp()) {
+ timestamp.setFirst(parsedStreamMessage.getTimestamp());
+ }
+ if (timestamp.getSecond() < parsedStreamMessage.getTimestamp()) {
+ timestamp.setSecond(parsedStreamMessage.getTimestamp());
+ }
+ this.streams.add(parsedStreamMessage.getStreamMessage());
+ }
+
+ public static MicroStreamBatch union(MicroStreamBatch one, MicroStreamBatch another) {
+ MicroStreamBatch result = new MicroStreamBatch(one);
+ result.streams.addAll(another.streams);
+ result.offset.setFirst(Math.min(result.offset.getFirst(), another.offset.getFirst()));
+ result.offset.setSecond(Math.min(result.offset.getSecond(), another.offset.getSecond()));
+ result.timestamp.setFirst(Math.min(result.timestamp.getFirst(), another.timestamp.getFirst()));
+ result.timestamp.setSecond(Math.min(result.timestamp.getSecond(), another.timestamp.getSecond()));
+ return result;
+ }
+
+ public boolean isEmpty() {
+ return streams.isEmpty();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatchConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatchConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatchConsumer.java
new file mode 100644
index 0000000..37d6076
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatchConsumer.java
@@ -0,0 +1,11 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public interface MicroStreamBatchConsumer {
+
+ void consume(MicroStreamBatch microStreamBatch) throws Exception;
+
+ void stop();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 07b8616..c9d2795 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -34,18 +34,18 @@
package org.apache.kylin.streaming;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
*/
-public abstract class StreamBuilder implements Runnable {
+public class StreamBuilder implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
@@ -53,97 +53,159 @@ public abstract class StreamBuilder implements Runnable {
private StreamFilter streamFilter = DefaultStreamFilter.instance;
- private BlockingQueue<StreamMessage> streamMessageQueue;
- private long lastBuildTime = System.currentTimeMillis();
+ private final List<BlockingQueue<StreamMessage>> streamMessageQueues;
- private long startOffset;
- private long endOffset;
+ private final MicroStreamBatchConsumer consumer;
- private long startTimestamp;
- private long endTimestamp;
+ private final MicroBatchCondition condition;
- public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue) {
- this.streamMessageQueue = streamMessageQueue;
+ public StreamBuilder(List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer) {
+ Preconditions.checkArgument(inputs.size() > 0);
+ this.streamMessageQueues = Lists.newArrayList();
+ this.consumer = Preconditions.checkNotNull(consumer);
+ this.condition = condition;
+ init(inputs);
}
- protected abstract void build(MicroStreamBatch microStreamBatch) throws Exception;
+ public StreamBuilder(BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer) {
+ this.streamMessageQueues = Lists.newArrayList();
+ this.consumer = Preconditions.checkNotNull(consumer);
+ this.condition = condition;
+ init(Preconditions.checkNotNull(input));
+ }
- protected abstract void onStop();
+ private void init(BlockingQueue<StreamMessage> input) {
+ this.streamMessageQueues.add(input);
+ }
- private void clearCounter() {
- lastBuildTime = System.currentTimeMillis();
- startOffset = Long.MAX_VALUE;
- endOffset = Long.MIN_VALUE;
- startTimestamp = Long.MAX_VALUE;
- endTimestamp = Long.MIN_VALUE;
+ private void init(List<BlockingQueue<StreamMessage>> inputs) {
+ this.streamMessageQueues.addAll(inputs);
}
@Override
public void run() {
try {
- List<List<String>> parsedStreamMessages = null;
- int filteredMsgCount = 0;
+ final int inputCount = streamMessageQueues.size();
+ final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
while (true) {
- if (parsedStreamMessages == null) {
- parsedStreamMessages = Lists.newLinkedList();
- clearCounter();
- }
- StreamMessage streamMessage;
- try {
- streamMessage = streamMessageQueue.poll(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- logger.warn("stream queue should not be interrupted", e);
- continue;
+ CountDownLatch countDownLatch = new CountDownLatch(inputCount);
+ ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
+ for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
+ futures.add(executorService.submit(new StreamFetcher(streamMessageQueue, countDownLatch)));
}
- if (streamMessage == null) {
- logger.info("The stream queue is drained, current available stream count: " + parsedStreamMessages.size());
- if ((System.currentTimeMillis() - lastBuildTime) > batchInterval() && !parsedStreamMessages.isEmpty()) {
- logger.info("Building batch due to time threshold, batch size: " + parsedStreamMessages.size());
- build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
- parsedStreamMessages = null;
+ countDownLatch.await();
+ ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
+ for (Future<MicroStreamBatch> future : futures) {
+ if (future.get() != null) {
+ batches.add(future.get());
+ } else {
+ //EOF occurs, stop consumer
+ consumer.stop();
+ return;
}
- continue;
}
- if (streamMessage.getOffset() < 0) {
- onStop();
- logger.warn("streaming encountered EOF, stop building. The remaining {} filtered messages will be discarded", filteredMsgCount);
- break;
+ MicroStreamBatch batch = batches.get(0);
+ if (batches.size() > 1) {
+ for (int i = 1; i < inputCount; i++) {
+ batch = MicroStreamBatch.union(batch, batches.get(i));
+ }
}
+ consumer.consume(batches.get(0));
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("stream fetcher thread should not be interrupted", e);
+ } catch (ExecutionException e) {
+ logger.error("stream fetch thread encountered exception", e);
+ throw new RuntimeException("stream fetch thread encountered exception", e);
+ } catch (Exception e) {
+ logger.error("consumer encountered exception", e);
+ throw new RuntimeException("consumer encountered exception", e);
+ }
+ }
- final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
+ private class StreamFetcher implements Callable<MicroStreamBatch> {
- if (getStreamFilter().filter(parsedStreamMessage)) {
+ private final BlockingQueue<StreamMessage> streamMessageQueue;
+ private final CountDownLatch countDownLatch;
+ private long lastBuildTime = System.currentTimeMillis();
+ private long lastBatchTimestamp = -1;
- if (filteredMsgCount++ % 10000 == 0) {
- logger.info("Total filtered stream message count: " + filteredMsgCount);
- }
+ public StreamFetcher(BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch) {
+ this.streamMessageQueue = streamMessageQueue;
+ this.countDownLatch = countDownLatch;
+ }
+
+ private void clearCounter() {
+ lastBuildTime = System.currentTimeMillis();
+ }
- if (startOffset > parsedStreamMessage.getOffset()) {
- startOffset = parsedStreamMessage.getOffset();
+ private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
+ long t = System.currentTimeMillis();
+ while (true) {
+ final StreamMessage peek = queue.peek();
+ if (peek != null) {
+ return peek;
+ } else {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.warn("stream queue should not be interrupted", e);
+ return null;
+ }
+ if (System.currentTimeMillis() - t <= timeout) {
+ break;
}
- if (endOffset < parsedStreamMessage.getOffset()) {
- endOffset = parsedStreamMessage.getOffset();
+ }
+ }
+ return queue.peek();
+ }
+
+ @Override
+ public MicroStreamBatch call() throws Exception {
+ try {
+ MicroStreamBatch microStreamBatch = null;
+ while (true) {
+ if (microStreamBatch == null) {
+ microStreamBatch = new MicroStreamBatch();
+ clearCounter();
}
- if (startTimestamp > parsedStreamMessage.getTimestamp()) {
- startTimestamp = parsedStreamMessage.getTimestamp();
+ StreamMessage streamMessage = peek(streamMessageQueue, 30000);
+ if (streamMessage == null) {
+ logger.info("The stream queue is drained, current available stream count: " + microStreamBatch.size());
+ if (!microStreamBatch.isEmpty()) {
+ return microStreamBatch;
+ } else {
+ continue;
+ }
}
- if (endTimestamp < parsedStreamMessage.getTimestamp()) {
- endTimestamp = parsedStreamMessage.getTimestamp();
+ if (streamMessage.getOffset() < 0) {
+ consumer.stop();
+ logger.warn("streaming encountered EOF, stop building");
+ return null;
}
- parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
- if (parsedStreamMessages.size() >= batchSize()) {
- logger.info("Building batch due to size threshold, batch size: " + parsedStreamMessages.size());
- build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
- parsedStreamMessages = null;
+
+ final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
+ if (parsedStreamMessage.getTimestamp() - microStreamBatch.getTimestamp().getFirst() > condition.getBatchInterval()) {
+ streamMessageQueue.take();
+ if (getStreamFilter().filter(parsedStreamMessage)) {
+ if (microStreamBatch.size() >= condition.getBatchSize()) {
+ return microStreamBatch;
+ } else {
+ microStreamBatch.add(parsedStreamMessage);
+ }
+ } else {
+ //ignore unfiltered stream message
+ }
+ } else {
+ return microStreamBatch;
}
- } else {
- //ignore unfiltered stream message
}
-
+ } catch (Exception e) {
+ logger.error("build stream error, stop building", e);
+ throw new RuntimeException("build stream error, stop building", e);
+ } finally {
+ countDownLatch.countDown();
}
- } catch (Exception e) {
- logger.error("build stream error, stop building", e);
- throw new RuntimeException("build stream error, stop building", e);
}
}
@@ -163,7 +225,4 @@ public abstract class StreamBuilder implements Runnable {
this.streamFilter = streamFilter;
}
- protected abstract int batchInterval();
-
- protected abstract int batchSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
deleted file mode 100644
index cf86b44..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.invertedindex;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamingManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- */
-public class IIStreamBuilder extends StreamBuilder {
-
- private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
- private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000;
-
- private final IIDesc desc;
- private final HTableInterface hTable;
- private final SliceBuilder sliceBuilder;
- private final int shardId;
- private final String streaming;
- private final int batchSize;
- private StreamingManager streamingManager;
-
- public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard) {
- super(queue);
- this.batchSize = iiDesc.getSliceSize();
- this.streaming = streaming;
- this.desc = iiDesc;
- this.shardId = shard;
- try {
- this.hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
- this.hTable.setAutoFlushTo(true);
- } catch (IOException e) {
- logger.error("cannot open htable name:" + hTableName, e);
- throw new RuntimeException("cannot open htable name:" + hTableName, e);
- }
- this.sliceBuilder = new SliceBuilder(desc, (short) shard, iiDesc.isUseLocalDictionary());
- this.streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
- }
-
- @Override
- protected void build(MicroStreamBatch microStreamBatch) throws IOException {
- if (microStreamBatch.size() > 0) {
- long offset = microStreamBatch.getOffset().getFirst();
- if (offset < streamingManager.getOffset(streaming, shardId)) {
- logger.info("this batch has already been built, skip building");
- return;
- }
- logger.info("stream build start, size:" + microStreamBatch.size());
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.start();
- final Slice slice = sliceBuilder.buildSlice(microStreamBatch);
- logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
-
- loadToHBase(hTable, slice, new IIKeyValueCodec(slice.getInfo()));
- submitOffset(offset);
- stopwatch.stop();
- logger.info("stream build finished, size:" + microStreamBatch.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " " + TimeUnit.MILLISECONDS);
- } else {
- logger.info("nothing to build, skip building");
- }
- }
-
- @Override
- protected void onStop() {
- try {
- this.hTable.close();
- } catch (IOException e) {
- logger.error("onStop throw exception", e);
- }
- }
-
- @Override
- protected int batchInterval() {
- return BATCH_BUILD_INTERVAL_THRESHOLD;
- }
-
- @Override
- protected int batchSize() {
- return batchSize;
- }
-
- private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
- List<Put> data = Lists.newArrayList();
- for (IIRow row : codec.encodeKeyValue(slice)) {
- final byte[] key = row.getKey().get();
- final byte[] value = row.getValue().get();
- Put put = new Put(key);
- put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
- final ImmutableBytesWritable dictionary = row.getDictionary();
- final byte[] dictBytes = dictionary.get();
- if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) {
- put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
- } else {
- throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
- }
- data.add(put);
- }
- hTable.put(data);
- //omit hTable.flushCommits(), because htable is auotflush
- }
-
- private void submitOffset(long offset) {
- try {
- streamingManager.updateOffset(streaming, shardId, offset);
- logger.info("submit offset:" + offset);
- } catch (Exception e) {
- logger.warn("error submit offset: " + offset + " retrying", e);
- throw new RuntimeException(e);
- }
- }
-
-}