You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/28 11:59:58 UTC
incubator-kylin git commit: add bootstrapper for streaming cubing
Repository: incubator-kylin
Updated Branches:
refs/heads/streaming-cubing 6eb0cfc54 -> 7717e6edc
add bootstrapper for streaming cubing
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7717e6ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7717e6ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7717e6ed
Branch: refs/heads/streaming-cubing
Commit: 7717e6edcee1cc999a6f4013bda34fcd0d41fadb
Parents: 6eb0cfc
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu May 28 17:59:14 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu May 28 17:59:14 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/CubeStreamBuilder.java | 15 +++---
.../kylin/job/streaming/StreamingBootstrap.java | 57 +++++++++++++-------
2 files changed, 48 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7717e6ed/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index e0cd1ae..846ade5 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -103,7 +103,7 @@ public class CubeStreamBuilder extends StreamBuilder {
FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
- final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(cubeInstance.getDescriptor(), parsedStreamMessages);
+ final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
final HTableInterface hTable = createHTable(cubeSegment);
@@ -219,17 +219,20 @@ public class CubeStreamBuilder extends StreamBuilder {
}
}
- private Map<Integer, TblColRef> getTblColRefMap(CubeDesc cubeDesc) {
- final List<TblColRef> columns = cubeDesc.listDimensionColumnsExcludingDerived();
+ private Map<Integer, TblColRef> getTblColRefMap(CubeInstance cubeInstance) {
+ final List<TblColRef> columns = cubeInstance.getAllColumns();
+ final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
final HashMap<Integer, TblColRef> result = Maps.newHashMap();
for (int i = 0; i < columns.size(); i++) {
- result.put(i, columns.get(i));
+ final TblColRef tblColRef = columns.get(i);
+ if (allDimensions.contains(tblColRef)) {
+ result.put(i, tblColRef);
+ }
}
return result;
}
- private Map<TblColRef, Dictionary<?>> buildDictionary(CubeDesc cubeDesc, List<List<String>> recordList) throws IOException {
- final Map<Integer, TblColRef> tblColRefMap = getTblColRefMap(cubeDesc);
+ private Map<TblColRef, Dictionary<?>> buildDictionary(final Map<Integer, TblColRef> tblColRefMap, List<List<String>> recordList) throws IOException {
HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7717e6ed/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 95be7fd..adb9ea6 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -42,19 +42,22 @@ import kafka.javaapi.PartitionMetadata;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.streaming.*;
import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
/**
*/
@@ -113,25 +116,50 @@ public class StreamingBootstrap {
final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
+ Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
+
if (!StringUtils.isEmpty(kafkaConfig.getIiName())) {
- startIIStreaming(kafkaConfig, partitionId);
+ startIIStreaming(kafkaConfig, partitionId, partitionCount);
} else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) {
- startIIStreaming(kafkaConfig, partitionId);
+ startCubeStreaming(kafkaConfig, partitionId, partitionCount);
} else {
throw new IllegalArgumentException("no cube or ii in kafka config");
}
}
- private void startCubeStreaming(KafkaConfig kafkaConfig) {
+ private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
+ final String cubeName = kafkaConfig.getCubeName();
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+ CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, cubeName);
+ final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+
+ long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+ final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+ streamingOffset = Math.max(streamingOffset, earliestOffset);
+
+ KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
+ streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+ Executors.newSingleThreadExecutor().submit(consumer);
+ final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+ cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+ future.get();
+ }
+ private StreamParser getStreamParser(KafkaConfig kafkaConfig, List<TblColRef> columns) throws Exception {
+ if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
+ Class clazz = Class.forName(kafkaConfig.getParserName());
+ Constructor constructor = clazz.getConstructor(List.class);
+ return (StreamParser) constructor.newInstance(columns);
+ } else {
+ return new JsonStreamParser(columns);
+ }
}
- private void startIIStreaming(KafkaConfig kafkaConfig, int partitionId) throws Exception {
+ private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
-
- final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
- Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
Preconditions.checkArgument(ii.getSegments().size() > 0);
final IISegment iiSegment = ii.getSegments().get(0);
@@ -159,19 +187,12 @@ public class StreamingBootstrap {
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
- StreamParser parser;
- if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
- Class clazz = Class.forName(kafkaConfig.getParserName());
- Constructor constructor = clazz.getConstructor(List.class);
- parser = (StreamParser) constructor.newInstance(ii.getDescriptor().listAllColumns());
- } else {
- parser = new JsonStreamParser(ii.getDescriptor().listAllColumns());
- }
+
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
- task.setStreamParser(parser);
+ task.setStreamParser(getStreamParser(kafkaConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
} else {