You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/28 10:54:21 UTC

incubator-kylin git commit: streaming cubing: half way

Repository: incubator-kylin
Updated Branches:
  refs/heads/streaming-cubing 399176bf2 -> 6eb0cfc54


streaming cubing: half way


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6eb0cfc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6eb0cfc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6eb0cfc5

Branch: refs/heads/streaming-cubing
Commit: 6eb0cfc54e93442e37b20461ccb1189db9a1f826
Parents: 399176b
Author: honma <ho...@ebay.com>
Authored: Thu May 28 16:54:10 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu May 28 16:54:10 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java | 27 +++++++++++++++-----
 .../org/apache/kylin/streaming/KafkaConfig.java | 12 ++++++++-
 2 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6eb0cfc5/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 534a261..95be7fd 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -64,14 +64,12 @@ public class StreamingBootstrap {
 
     private KylinConfig kylinConfig;
     private StreamingManager streamingManager;
-    private IIManager iiManager;
 
     private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
 
     private StreamingBootstrap(KylinConfig kylinConfig) {
         this.kylinConfig = kylinConfig;
         this.streamingManager = StreamingManager.getInstance(kylinConfig);
-        this.iiManager = IIManager.getInstance(kylinConfig);
     }
 
     public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
@@ -114,8 +112,24 @@ public class StreamingBootstrap {
     public void start(String streaming, int partitionId) throws Exception {
         final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
         Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
-        final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
+
+        if (!StringUtils.isEmpty(kafkaConfig.getIiName())) {
+            startIIStreaming(kafkaConfig, partitionId);
+        } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) {
+            startIIStreaming(kafkaConfig, partitionId);
+        } else {
+            throw new IllegalArgumentException("no cube or ii in kafka config");
+        }
+    }
+
+    private void startCubeStreaming(KafkaConfig kafkaConfig) {
+
+    }
+
+    private void startIIStreaming(KafkaConfig kafkaConfig, int partitionId) throws Exception {
+        final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
         Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
+
         final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
         Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
         Preconditions.checkArgument(ii.getSegments().size() > 0);
@@ -128,7 +142,8 @@ public class StreamingBootstrap {
         final int parallelism = shard / partitionCount;
         final int startShard = partitionId * parallelism;
         final int endShard = startShard + parallelism;
-        long streamingOffset = getEarliestStreamingOffset(streaming, startShard, endShard);
+
+        long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), startShard, endShard);
         streamingOffset = streamingOffset - (streamingOffset % parallelism);
         logger.info("offset from ii desc is " + streamingOffset);
         final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
@@ -142,7 +157,7 @@ public class StreamingBootstrap {
         }
 
         KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
-        kafkaConsumers.put(getKey(streaming, partitionId), consumer);
+        kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
 
         StreamParser parser;
         if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
@@ -155,7 +170,7 @@ public class StreamingBootstrap {
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streaming, iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
+            final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
             task.setStreamParser(parser);
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6eb0cfc5/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index 9949c96..b6f5025 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -75,6 +75,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("iiName")
     private String iiName;
 
+    @JsonProperty("cubeName")
+    private String cubeName;
+
     @JsonProperty("parserName")
     private String parserName;
 
@@ -86,7 +89,6 @@ public class KafkaConfig extends RootPersistentEntity {
         this.parserName = parserName;
     }
 
-
     public int getTimeout() {
         return timeout;
     }
@@ -133,6 +135,14 @@ public class KafkaConfig extends RootPersistentEntity {
         });
     }
 
+    public String getCubeName() {
+        return cubeName;
+    }
+
+    public void setCubeName(String cubeName) {
+        this.cubeName = cubeName;
+    }
+
     public String getIiName() {
         return iiName;
     }