You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/28 10:54:21 UTC
incubator-kylin git commit: streaming cubing: half way
Repository: incubator-kylin
Updated Branches:
refs/heads/streaming-cubing 399176bf2 -> 6eb0cfc54
streaming cubing: half way
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6eb0cfc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6eb0cfc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6eb0cfc5
Branch: refs/heads/streaming-cubing
Commit: 6eb0cfc54e93442e37b20461ccb1189db9a1f826
Parents: 399176b
Author: honma <ho...@ebay.com>
Authored: Thu May 28 16:54:10 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu May 28 16:54:10 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 27 +++++++++++++++-----
.../org/apache/kylin/streaming/KafkaConfig.java | 12 ++++++++-
2 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6eb0cfc5/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 534a261..95be7fd 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -64,14 +64,12 @@ public class StreamingBootstrap {
private KylinConfig kylinConfig;
private StreamingManager streamingManager;
- private IIManager iiManager;
private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
private StreamingBootstrap(KylinConfig kylinConfig) {
this.kylinConfig = kylinConfig;
this.streamingManager = StreamingManager.getInstance(kylinConfig);
- this.iiManager = IIManager.getInstance(kylinConfig);
}
public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
@@ -114,8 +112,24 @@ public class StreamingBootstrap {
public void start(String streaming, int partitionId) throws Exception {
final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
- final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
+
+ if (!StringUtils.isEmpty(kafkaConfig.getIiName())) {
+ startIIStreaming(kafkaConfig, partitionId);
+ } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) {
+ startIIStreaming(kafkaConfig, partitionId);
+ } else {
+ throw new IllegalArgumentException("no cube or ii in kafka config");
+ }
+ }
+
+ private void startCubeStreaming(KafkaConfig kafkaConfig) {
+
+ }
+
+ private void startIIStreaming(KafkaConfig kafkaConfig, int partitionId) throws Exception {
+ final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
+
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
Preconditions.checkArgument(ii.getSegments().size() > 0);
@@ -128,7 +142,8 @@ public class StreamingBootstrap {
final int parallelism = shard / partitionCount;
final int startShard = partitionId * parallelism;
final int endShard = startShard + parallelism;
- long streamingOffset = getEarliestStreamingOffset(streaming, startShard, endShard);
+
+ long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), startShard, endShard);
streamingOffset = streamingOffset - (streamingOffset % parallelism);
logger.info("offset from ii desc is " + streamingOffset);
final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
@@ -142,7 +157,7 @@ public class StreamingBootstrap {
}
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
- kafkaConsumers.put(getKey(streaming, partitionId), consumer);
+ kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
StreamParser parser;
if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
@@ -155,7 +170,7 @@ public class StreamingBootstrap {
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streaming, iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
+ final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
task.setStreamParser(parser);
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6eb0cfc5/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index 9949c96..b6f5025 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -75,6 +75,9 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("iiName")
private String iiName;
+ @JsonProperty("cubeName")
+ private String cubeName;
+
@JsonProperty("parserName")
private String parserName;
@@ -86,7 +89,6 @@ public class KafkaConfig extends RootPersistentEntity {
this.parserName = parserName;
}
-
public int getTimeout() {
return timeout;
}
@@ -133,6 +135,14 @@ public class KafkaConfig extends RootPersistentEntity {
});
}
+ public String getCubeName() {
+ return cubeName;
+ }
+
+ public void setCubeName(String cubeName) {
+ this.cubeName = cubeName;
+ }
+
public String getIiName() {
return iiName;
}