You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/28 11:59:58 UTC

incubator-kylin git commit: add bootstrapper for streaming cubing

Repository: incubator-kylin
Updated Branches:
  refs/heads/streaming-cubing 6eb0cfc54 -> 7717e6edc


add  bootstrapper for streaming cubing


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7717e6ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7717e6ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7717e6ed

Branch: refs/heads/streaming-cubing
Commit: 7717e6edcee1cc999a6f4013bda34fcd0d41fadb
Parents: 6eb0cfc
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu May 28 17:59:14 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu May 28 17:59:14 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/CubeStreamBuilder.java  | 15 +++---
 .../kylin/job/streaming/StreamingBootstrap.java | 57 +++++++++++++-------
 2 files changed, 48 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7717e6ed/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index e0cd1ae..846ade5 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -103,7 +103,7 @@ public class CubeStreamBuilder extends StreamBuilder {
         FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
         ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
 
-        final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(cubeInstance.getDescriptor(), parsedStreamMessages);
+        final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
         writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
 
         final HTableInterface hTable = createHTable(cubeSegment);
@@ -219,17 +219,20 @@ public class CubeStreamBuilder extends StreamBuilder {
         }
     }
 
-    private Map<Integer, TblColRef> getTblColRefMap(CubeDesc cubeDesc) {
-        final List<TblColRef> columns = cubeDesc.listDimensionColumnsExcludingDerived();
+    private Map<Integer, TblColRef> getTblColRefMap(CubeInstance cubeInstance) {
+        final List<TblColRef> columns = cubeInstance.getAllColumns();
+        final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
         final HashMap<Integer, TblColRef> result = Maps.newHashMap();
         for (int i = 0; i < columns.size(); i++) {
-            result.put(i, columns.get(i));
+            final TblColRef tblColRef = columns.get(i);
+            if (allDimensions.contains(tblColRef)) {
+                result.put(i, tblColRef);
+            }
         }
         return result;
     }
 
-    private Map<TblColRef, Dictionary<?>> buildDictionary(CubeDesc cubeDesc, List<List<String>> recordList) throws IOException {
-        final Map<Integer, TblColRef> tblColRefMap = getTblColRefMap(cubeDesc);
+    private Map<TblColRef, Dictionary<?>> buildDictionary(final Map<Integer, TblColRef> tblColRefMap, List<List<String>> recordList) throws IOException {
         HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
 
         HashMultimap<TblColRef, String> valueMap = HashMultimap.create();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7717e6ed/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 95be7fd..adb9ea6 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -42,19 +42,22 @@ import kafka.javaapi.PartitionMetadata;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.streaming.*;
 import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 
 /**
  */
@@ -113,25 +116,50 @@ public class StreamingBootstrap {
         final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
         Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
 
+        final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
+        Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
+
         if (!StringUtils.isEmpty(kafkaConfig.getIiName())) {
-            startIIStreaming(kafkaConfig, partitionId);
+            startIIStreaming(kafkaConfig, partitionId, partitionCount);
         } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) {
-            startIIStreaming(kafkaConfig, partitionId);
+            startCubeStreaming(kafkaConfig, partitionId, partitionCount);
         } else {
             throw new IllegalArgumentException("no cube or ii in kafka config");
         }
     }
 
-    private void startCubeStreaming(KafkaConfig kafkaConfig) {
+    private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
+        final String cubeName = kafkaConfig.getCubeName();
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, cubeName);
+        final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+
+        long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+        final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+        streamingOffset = Math.max(streamingOffset, earliestOffset);
+
+        KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
+                streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+        Executors.newSingleThreadExecutor().submit(consumer);
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+        cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+        future.get();
+    }
 
+    private StreamParser getStreamParser(KafkaConfig kafkaConfig, List<TblColRef> columns) throws Exception {
+        if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
+            Class clazz = Class.forName(kafkaConfig.getParserName());
+            Constructor constructor = clazz.getConstructor(List.class);
+            return (StreamParser) constructor.newInstance(columns);
+        } else {
+            return new JsonStreamParser(columns);
+        }
     }
 
-    private void startIIStreaming(KafkaConfig kafkaConfig, int partitionId) throws Exception {
+    private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
         final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
         Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
-
-        final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
-        Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
         Preconditions.checkArgument(ii.getSegments().size() > 0);
         final IISegment iiSegment = ii.getSegments().get(0);
 
@@ -159,19 +187,12 @@ public class StreamingBootstrap {
         KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
         kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
 
-        StreamParser parser;
-        if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
-            Class clazz = Class.forName(kafkaConfig.getParserName());
-            Constructor constructor = clazz.getConstructor(List.class);
-            parser = (StreamParser) constructor.newInstance(ii.getDescriptor().listAllColumns());
-        } else {
-            parser = new JsonStreamParser(ii.getDescriptor().listAllColumns());
-        }
+
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
             final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
-            task.setStreamParser(parser);
+            task.setStreamParser(getStreamParser(kafkaConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();
             } else {