You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/03 11:34:07 UTC
[1/3] incubator-kylin git commit: KYLIN-809 streaming cubing support
multiple kafka clusters
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 f2fa12708 -> 30acdb901
KYLIN-809 streaming cubing support multiple kafka clusters
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/473b1745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/473b1745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/473b1745
Branch: refs/heads/0.8.0
Commit: 473b174571bafbf84b7b2db5411b3d31970d0645
Parents: f2fa127
Author: honma <ho...@ebay.com>
Authored: Wed Jun 3 11:00:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 3 15:28:55 2015 +0800
----------------------------------------------------------------------
.../localmeta/streaming/kafka_test.json | 30 +--
.../streaming/test_streaming_table.json | 12 +-
.../kylin/job/streaming/KafkaDataLoader.java | 6 +-
.../kylin/job/streaming/StreamingBootstrap.java | 114 +++++++-----
.../kylin/streaming/KafkaClusterConfig.java | 54 ++++++
.../org/apache/kylin/streaming/KafkaConfig.java | 185 -------------------
.../apache/kylin/streaming/KafkaConsumer.java | 16 +-
.../apache/kylin/streaming/KafkaRequester.java | 33 ++--
.../apache/kylin/streaming/StreamBuilder.java | 2 +-
.../apache/kylin/streaming/StreamingConfig.java | 172 +++++++++++++++++
.../kylin/streaming/StreamingManager.java | 8 +-
.../kylin/streaming/ITKafkaConsumerTest.java | 10 +-
.../kylin/streaming/ITKafkaRequesterTest.java | 16 +-
.../kylin/streaming/OneOffStreamProducer.java | 7 +-
14 files changed, 369 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/examples/test_case_data/localmeta/streaming/kafka_test.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/kafka_test.json b/examples/test_case_data/localmeta/streaming/kafka_test.json
index 4582dd8..5445417 100644
--- a/examples/test_case_data/localmeta/streaming/kafka_test.json
+++ b/examples/test_case_data/localmeta/streaming/kafka_test.json
@@ -1,14 +1,20 @@
{
- "uuid" : "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
- "name" : "kafka_test",
- "topic" : "kafka_stream_test",
- "timeout" : 60000,
- "maxReadCount" : 1000,
- "bufferSize" : 65536,
- "last_modified" : 0,
- "brokers" : [ {
- "id" : 0,
- "host" : "sandbox.hortonworks.com",
- "port" : 6667
- } ]
+ "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
+ "name": "kafka_test",
+ "topic": "kafka_stream_test",
+ "timeout": 60000,
+ "maxReadCount": 1000,
+ "bufferSize": 65536,
+ "last_modified": 0,
+ "clusters": [
+ {
+ "brokers": [
+ {
+ "id": 0,
+ "host": "sandbox.hortonworks.com",
+ "port": 6667
+ }
+ ]
+ }
+ ]
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/examples/test_case_data/localmeta/streaming/test_streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table.json b/examples/test_case_data/localmeta/streaming/test_streaming_table.json
index 537919a..92ec1d6 100644
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table.json
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table.json
@@ -10,11 +10,15 @@
"parserName": "org.apache.kylin.streaming.JsonStreamParser",
"partition": 1,
"last_modified": 0,
- "brokers": [
+ "clusters": [
{
- "id": 0,
- "host": "sandbox",
- "port": 6667
+ "brokers": [
+ {
+ "id": 0,
+ "host": "sandbox",
+ "port": 6667
+ }
+ ]
}
]
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index d1be49a..8db89d3 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -5,7 +5,7 @@ import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.streaming.KafkaConfig;
+import org.apache.kylin.streaming.StreamingConfig;
import org.apache.kylin.streaming.StreamingManager;
import java.io.File;
@@ -23,7 +23,7 @@ public class KafkaDataLoader {
*/
public static void main(String[] args) throws IOException {
StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
- KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(args[1]);
+ StreamingConfig streamingConfig = streamingManager.getKafkaConfig(args[1]);
List<String> alldata = FileUtils.readLines(new File(args[0]));
@@ -37,7 +37,7 @@ public class KafkaDataLoader {
Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < alldata.size(); ++i) {
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(kafkaConfig.getTopic(), String.valueOf(i), alldata.get(i));
+ KeyedMessage<String, String> data = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), alldata.get(i));
producer.send(data);
}
producer.close();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index dcfa774..1c96e16 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -86,8 +86,8 @@ public class StreamingBootstrap {
return bootstrap;
}
- private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
- final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+ private static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
+ final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
return partitionMetadata.leader();
} else {
@@ -113,71 +113,83 @@ public class StreamingBootstrap {
}
public void start(String streaming, int partitionId) throws Exception {
- final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
- Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
+ final StreamingConfig streamingConfig = streamingManager.getKafkaConfig(streaming);
+ Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
- final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
- Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
-
- if (!StringUtils.isEmpty(kafkaConfig.getIiName())) {
- startIIStreaming(kafkaConfig, partitionId, partitionCount);
- } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) {
- startCubeStreaming(kafkaConfig, partitionId, partitionCount);
+ if (!StringUtils.isEmpty(streamingConfig.getIiName())) {
+ startIIStreaming(streamingConfig, partitionId);
+ } else if (!StringUtils.isEmpty(streamingConfig.getCubeName())) {
+ startCubeStreaming(streamingConfig, partitionId);
} else {
throw new IllegalArgumentException("no cube or ii in kafka config");
}
}
- private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
+ private List<BlockingQueue<StreamMessage>> consume(KafkaClusterConfig kafkaClusterConfig, final int partitionCount) {
List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
- final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+ final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
- final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
+ final long latestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
long streamingOffset = latestOffset;
logger.info("submitting offset:" + streamingOffset);
- KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+ KafkaConsumer consumer = new KafkaConsumer(kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, 1);
Executors.newSingleThreadExecutor().submit(consumer);
result.add(consumer.getStreamQueue(0));
}
return result;
}
- private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
- final String cubeName = kafkaConfig.getCubeName();
- final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ private void startCubeStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
+ List<KafkaClusterConfig> kafkaClusterConfigs = streamingConfig.getKafkaClusterConfigs();
+
+ final List<List<BlockingQueue<StreamMessage>>> allClustersData = Lists.newArrayList();
- final List<BlockingQueue<StreamMessage>> queues = consume(kafkaConfig, partitionCount);
- final LinkedBlockingDeque<StreamMessage> streamQueue = new LinkedBlockingDeque<>();
+ for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
+
+ final List<BlockingQueue<StreamMessage>> oneClusterData = consume(kafkaClusterConfig, partitionCount);
+ logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
+ allClustersData.add(oneClusterData);
+ }
+
+ final LinkedBlockingDeque<StreamMessage> alldata = new LinkedBlockingDeque<>();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
int totalMessage = 0;
while (true) {
- for (BlockingQueue<StreamMessage> queue : queues) {
- try {
- streamQueue.put(queue.take());
- if (totalMessage++ % 10000 == 1) {
- logger.info("Total stream message count: " + totalMessage);
+ for (List<BlockingQueue<StreamMessage>> oneCluster : allClustersData) {
+ for (BlockingQueue<StreamMessage> onePartition : oneCluster) {
+ try {
+ alldata.put(onePartition.take());
+ if (totalMessage++ % 10000 == 1) {
+ logger.info("Total stream message count: " + totalMessage);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
}
}
}
}
});
- CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
- cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
- cubeStreamBuilder.setStreamFilter(getStreamFilter(kafkaConfig));
+
+ final String cubeName = streamingConfig.getCubeName();
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+ CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(alldata, cubeName);
+ cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
+ cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
future.get();
}
- private StreamParser getStreamParser(KafkaConfig kafkaConfig, List<TblColRef> columns) throws Exception {
- if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
- Class clazz = Class.forName(kafkaConfig.getParserName());
+ private StreamParser getStreamParser(StreamingConfig streamingConfig, List<TblColRef> columns) throws Exception {
+ if (!StringUtils.isEmpty(streamingConfig.getParserName())) {
+ Class clazz = Class.forName(streamingConfig.getParserName());
Constructor constructor = clazz.getConstructor(List.class);
return (StreamParser) constructor.newInstance(columns);
} else {
@@ -185,22 +197,32 @@ public class StreamingBootstrap {
}
}
- private StreamFilter getStreamFilter(KafkaConfig kafkaConfig) throws Exception {
- if (!StringUtils.isEmpty(kafkaConfig.getFilterName())) {
- Class clazz = Class.forName(kafkaConfig.getFilterName());
+ private StreamFilter getStreamFilter(StreamingConfig streamingConfig) throws Exception {
+ if (!StringUtils.isEmpty(streamingConfig.getFilterName())) {
+ Class clazz = Class.forName(streamingConfig.getFilterName());
return (StreamFilter) clazz.newInstance();
} else {
return DefaultStreamFilter.instance;
}
}
- private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
- final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
- Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
+ private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
+
+ List<KafkaClusterConfig> allClustersConfigs = streamingConfig.getKafkaClusterConfigs();
+ if (allClustersConfigs.size() != 1) {
+ throw new RuntimeException("II streaming only support one kafka cluster");
+ }
+ KafkaClusterConfig kafkaClusterConfig = allClustersConfigs.get(0);
+
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
+
+ final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(streamingConfig.getIiName());
+ Preconditions.checkNotNull(ii, "cannot find ii name:" + streamingConfig.getIiName());
Preconditions.checkArgument(ii.getSegments().size() > 0);
final IISegment iiSegment = ii.getSegments().get(0);
- final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+ final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
Preconditions.checkState(leadBroker != null, "cannot find lead broker");
final int shard = ii.getDescriptor().getSharding();
Preconditions.checkArgument(shard % partitionCount == 0);
@@ -208,10 +230,10 @@ public class StreamingBootstrap {
final int startShard = partitionId * parallelism;
final int endShard = startShard + parallelism;
- long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), startShard, endShard);
+ long streamingOffset = getEarliestStreamingOffset(streamingConfig.getName(), startShard, endShard);
streamingOffset = streamingOffset - (streamingOffset % parallelism);
logger.info("offset from ii desc is " + streamingOffset);
- final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+ final long earliestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
logger.info("offset from KafkaRequester is " + earliestOffset);
streamingOffset = Math.max(streamingOffset, earliestOffset);
logger.info("starting offset is " + streamingOffset);
@@ -221,14 +243,14 @@ public class StreamingBootstrap {
throw new IllegalStateException("please create htable:" + iiSegment.getStorageLocationIdentifier() + " first");
}
- KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
- kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
+ KafkaConsumer consumer = new KafkaConsumer(kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, parallelism);
+ kafkaConsumers.put(getKey(streamingConfig.getName(), partitionId), consumer);
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
- task.setStreamParser(getStreamParser(kafkaConfig, ii.getDescriptor().listAllColumns()));
+ final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
+ task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
new file mode 100644
index 0000000..0661498
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.streaming;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 6/3/15.
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class KafkaClusterConfig extends RootPersistentEntity {
+ public static Serializer<KafkaClusterConfig> SERIALIZER = new JsonSerializer<KafkaClusterConfig>(KafkaClusterConfig.class);
+
+ @JsonProperty("brokers")
+ private List<BrokerConfig> brokerConfigs;
+
+ @JsonBackReference
+ private StreamingConfig streamingConfig;
+
+ public int getBufferSize() {
+ return streamingConfig.getBufferSize();
+ }
+
+ public String getTopic() {
+ return streamingConfig.getTopic();
+ }
+
+ public int getTimeout() {
+ return streamingConfig.getTimeout();
+ }
+
+ public int getMaxReadCount() {
+ return streamingConfig.getMaxReadCount();
+ }
+
+ public List<Broker> getBrokers() {
+ return Lists.transform(brokerConfigs, new Function<BrokerConfig, Broker>() {
+ @Nullable
+ @Override
+ public Broker apply(BrokerConfig input) {
+ return new Broker(input.getId(), input.getHost(), input.getPort());
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
deleted file mode 100644
index f0f3b6f..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
-
-import javax.annotation.Nullable;
-import java.io.*;
-import java.util.List;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class KafkaConfig extends RootPersistentEntity {
-
- public static Serializer<KafkaConfig> SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
-
- @JsonProperty("name")
- private String name;
-
- @JsonProperty("brokers")
- private List<BrokerConfig> brokerConfigs;
-
- @JsonProperty("topic")
- private String topic;
-
- @JsonProperty("timeout")
- private int timeout;
-
- @JsonProperty("maxReadCount")
- private int maxReadCount;
-
- @JsonProperty("bufferSize")
- private int bufferSize;
-
- @JsonProperty("iiName")
- private String iiName;
-
- @JsonProperty("cubeName")
- private String cubeName;
-
- @JsonProperty("parserName")
- private String parserName;
-
- @JsonProperty("filterName")
- private String filterName;
-
- public String getFilterName() {
- return filterName;
- }
-
- public void setFilterName(String filterName) {
- this.filterName = filterName;
- }
-
-
- public String getParserName() {
- return parserName;
- }
-
- public void setParserName(String parserName) {
- this.parserName = parserName;
- }
-
- public int getTimeout() {
- return timeout;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public int getMaxReadCount() {
- return maxReadCount;
- }
-
- public void setMaxReadCount(int maxReadCount) {
- this.maxReadCount = maxReadCount;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public void setBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public void setBrokerConfigs(List<BrokerConfig> brokerConfigs) {
- this.brokerConfigs = brokerConfigs;
- }
-
- public List<Broker> getBrokers() {
- return Lists.transform(brokerConfigs, new Function<BrokerConfig, Broker>() {
- @Nullable
- @Override
- public Broker apply(BrokerConfig input) {
- return new Broker(input.getId(), input.getHost(), input.getPort());
- }
- });
- }
-
- public String getCubeName() {
- return cubeName;
- }
-
- public void setCubeName(String cubeName) {
- this.cubeName = cubeName;
- }
-
- public String getIiName() {
- return iiName;
- }
-
- public void setIiName(String iiName) {
- this.iiName = iiName;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public KafkaConfig clone() {
- try {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SERIALIZER.serialize(this, new DataOutputStream(baos));
- return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
- } catch (IOException e) {
- throw new RuntimeException(e);//in mem, should not happen
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 70d37ef..cfb8d82 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -54,7 +54,7 @@ public class KafkaConsumer implements Runnable {
private final String topic;
private final int partitionId;
- private final KafkaConfig kafkaConfig;
+ private final KafkaClusterConfig streamingConfig;
private final transient int parallelism;
private final LinkedBlockingQueue<StreamMessage>[] streamQueue;
@@ -65,22 +65,22 @@ public class KafkaConsumer implements Runnable {
private volatile boolean isRunning = true;
- public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
- this(topic, partitionId, startOffset, initialBrokers, kafkaConfig, 1);
+ public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaClusterConfig kafkaClusterConfig) {
+ this(topic, partitionId, startOffset, initialBrokers, kafkaClusterConfig, 1);
}
- public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig, int parallelism) {
+ public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaClusterConfig kafkaClusterConfig, int parallelism) {
Preconditions.checkArgument(parallelism > 0);
this.topic = topic;
this.partitionId = partitionId;
- this.kafkaConfig = kafkaConfig;
+ this.streamingConfig = kafkaClusterConfig;
this.offset = startOffset;
this.replicaBrokers = initialBrokers;
this.logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
this.parallelism = parallelism;
this.streamQueue = new LinkedBlockingQueue[parallelism];
for (int i = 0; i < parallelism; ++i) {
- streamQueue[i] = new LinkedBlockingQueue<StreamMessage>(kafkaConfig.getMaxReadCount());
+ streamQueue[i] = new LinkedBlockingQueue<StreamMessage>(kafkaClusterConfig.getMaxReadCount());
}
}
@@ -89,7 +89,7 @@ public class KafkaConsumer implements Runnable {
}
private Broker getLeadBroker() {
- final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
+ final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(topic, partitionId, replicaBrokers, streamingConfig);
if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
replicaBrokers = partitionMetadata.replicas();
return partitionMetadata.leader();
@@ -124,7 +124,7 @@ public class KafkaConsumer implements Runnable {
logger.info("fetching topic {} partition id {} offset {} leader {}", new String[] { topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString() });
- final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaConfig);
+ final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, streamingConfig);
if (fetchResponse.errorCode(topic, partitionId) != 0) {
logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
Thread.sleep(30000);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
index 02e2759..c21974f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
@@ -84,11 +84,11 @@ public final class KafkaRequester {
return broker.getConnectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
}
- public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
+ public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {
SimpleConsumer consumer;
- for (Broker broker : kafkaConfig.getBrokers()) {
- consumer = getSimpleConsumer(broker, kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
+ for (Broker broker : kafkaClusterConfig.getBrokers()) {
+ consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(kafkaClusterConfig.getTopic());
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
@@ -106,16 +106,16 @@ public final class KafkaRequester {
return partitionMetadata.partitionId();
}
});
- return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+ return new TopicMeta(kafkaClusterConfig.getTopic(), partitionIds);
}
- logger.debug("cannot find topic:" + kafkaConfig.getTopic());
+ logger.debug("cannot find topic:" + kafkaClusterConfig.getTopic());
return null;
}
- public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
+ public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) {
SimpleConsumer consumer;
for (Broker broker : brokers) {
- consumer = getSimpleConsumer(broker, kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
@@ -139,25 +139,21 @@ public final class KafkaRequester {
return null;
}
- public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
+ public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
final String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = getSimpleConsumer(broker, kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
- kafka.api.FetchRequest req = new FetchRequestBuilder()
- .clientId(clientName)
- .addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell
+ SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
+ kafka.api.FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell
.build();
return consumer.fetch(req);
}
- public static long getLastOffset(String topic, int partitionId,
- long whichTime, Broker broker, KafkaConfig kafkaConfig) {
+ public static long getLastOffset(String topic, int partitionId, long whichTime, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = getSimpleConsumer(broker, kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+ SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
@@ -174,5 +170,4 @@ public final class KafkaRequester {
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index ce5e3d7..8f3ce36 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -115,7 +115,7 @@ public abstract class StreamBuilder implements Runnable {
if (getStreamFilter().filter(parsedStreamMessage)) {
if (filteredMsgCount++ % 10000 == 1) {
- logger.info("Total stream message count: " + filteredMsgCount);
+ logger.info("Total filtered stream message count: " + filteredMsgCount);
}
if (startOffset > parsedStreamMessage.getOffset()) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
new file mode 100644
index 0000000..68aaf6b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
@@ -0,0 +1,172 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+
+import java.io.*;
+import java.util.List;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class StreamingConfig extends RootPersistentEntity {
+
+ public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonManagedReference
+ @JsonProperty("clusters")
+ private List<KafkaClusterConfig> kafkaClusterConfigs;
+
+ @JsonProperty("topic")
+ private String topic;
+
+ @JsonProperty("timeout")
+ private int timeout;
+
+ @JsonProperty("maxReadCount")
+ private int maxReadCount;
+
+ @JsonProperty("bufferSize")
+ private int bufferSize;
+
+ @JsonProperty("iiName")
+ private String iiName;
+
+ @JsonProperty("cubeName")
+ private String cubeName;
+
+ @JsonProperty("parserName")
+ private String parserName;
+
+ @JsonProperty("filterName")
+ private String filterName;
+
+ public List<KafkaClusterConfig> getKafkaClusterConfigs() {
+ return kafkaClusterConfigs;
+ }
+
+ public String getFilterName() {
+ return filterName;
+ }
+
+ public void setFilterName(String filterName) {
+ this.filterName = filterName;
+ }
+
+ public String getParserName() {
+ return parserName;
+ }
+
+ public void setParserName(String parserName) {
+ this.parserName = parserName;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMaxReadCount() {
+ return maxReadCount;
+ }
+
+ public void setMaxReadCount(int maxReadCount) {
+ this.maxReadCount = maxReadCount;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getCubeName() {
+ return cubeName;
+ }
+
+ public void setCubeName(String cubeName) {
+ this.cubeName = cubeName;
+ }
+
+ public String getIiName() {
+ return iiName;
+ }
+
+ public void setIiName(String iiName) {
+ this.iiName = iiName;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public StreamingConfig clone() {
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ SERIALIZER.serialize(this, new DataOutputStream(baos));
+ return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+ } catch (IOException e) {
+ throw new RuntimeException(e);//in mem, should not happen
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
index 46c34fa..3da35f1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -96,9 +96,9 @@ public class StreamingManager {
}
- public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
+ public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
try {
- getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+ getStore().putResource(formatStreamingConfigPath(name), config, StreamingConfig.SERIALIZER);
return true;
} catch (IOException e) {
logger.error("error save resource name:" + name, e);
@@ -106,9 +106,9 @@ public class StreamingManager {
}
}
- public KafkaConfig getKafkaConfig(String name) {
+ public StreamingConfig getKafkaConfig(String name) {
try {
- return getStore().getResource(formatStreamingConfigPath(name), KafkaConfig.class, KafkaConfig.SERIALIZER);
+ return getStore().getResource(formatStreamingConfigPath(name), StreamingConfig.class, StreamingConfig.SERIALIZER);
} catch (IOException e) {
logger.error("error get resource name:" + name, e);
throw new RuntimeException("error get resource name:" + name, e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
index 3008314..db1a27e 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
@@ -57,13 +57,15 @@ public class ITKafkaConsumerTest extends KafkaBaseTest {
private static final int TOTAL_SEND_COUNT = 100;
- private KafkaConfig kafkaConfig;
+ private StreamingConfig streamingConfig;
+ private KafkaClusterConfig kafkaClusterConfig;
@Before
public void before() throws IOException {
producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
producer.start();
- kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
}
@After
@@ -84,11 +86,11 @@ public class ITKafkaConsumerTest extends KafkaBaseTest {
@Test
@Ignore("since ci does not have the topic")
public void test() throws InterruptedException {
- final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
+ final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig);
final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
- KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaConfig.getBrokers(), kafkaConfig);
+ KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
queues.add(consumer.getStreamQueue(0));
executorService.execute(consumer);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
index d8853f6..e3cd4a6 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
@@ -47,12 +47,13 @@ import static org.junit.Assert.*;
public class ITKafkaRequesterTest extends KafkaBaseTest {
private static final String NON_EXISTED_TOPIC = "non_existent_topic";
- private KafkaConfig kafkaConfig;
-
+ private StreamingConfig streamingConfig;
+ private KafkaClusterConfig kafkaClusterConfig;
@Before
public void before() {
- kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
}
@AfterClass
@@ -62,15 +63,16 @@ public class ITKafkaRequesterTest extends KafkaBaseTest {
@Test
@Ignore("since ci does not enable kafka")
public void testTopicMeta() throws Exception {
- TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
+ TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig);
assertNotNull(kafkaTopicMeta);
assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
- assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
+ assertEquals(streamingConfig.getTopic(), kafkaTopicMeta.getName());
- KafkaConfig anotherTopicConfig = kafkaConfig.clone();
+ StreamingConfig anotherTopicConfig = streamingConfig.clone();
+ KafkaClusterConfig anotherClusterConfig = anotherTopicConfig.getKafkaClusterConfigs().get(0);
anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
- kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(anotherTopicConfig);
+ kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(anotherClusterConfig);
assertTrue(kafkaTopicMeta == null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
index c17a3fc..6630256 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
@@ -69,10 +69,11 @@ public class OneOffStreamProducer {
public void start() throws IOException {
final Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
- final KafkaConfig kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ final KafkaClusterConfig kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
Properties props = new Properties();
- props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+ props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaClusterConfig.getBrokers().iterator(), new Function<Broker, String>() {
@Nullable
@Override
public String apply(@Nullable Broker broker) {
@@ -89,7 +90,7 @@ public class OneOffStreamProducer {
public void run() {
int count = 0;
while (!stopped && count < sendCount) {
- final KeyedMessage<String, String> message = new KeyedMessage<String, String>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
+ final KeyedMessage<String, String> message = new KeyedMessage<String, String>(streamingConfig.getTopic(), "current time is:" + System.currentTimeMillis());
producer.send(message);
count++;
try {
[2/3] incubator-kylin git commit: KYLIN-809 renaming
Posted by ma...@apache.org.
KYLIN-809 renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e2c73de2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e2c73de2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e2c73de2
Branch: refs/heads/0.8.0
Commit: e2c73de291ffb7d8cd272a408e00a935a46f36f4
Parents: 473b174
Author: honma <ho...@ebay.com>
Authored: Wed Jun 3 17:07:30 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 3 17:07:30 2015 +0800
----------------------------------------------------------------------
dictionary/.settings/org.eclipse.core.resources.prefs | 1 -
.../java/org/apache/kylin/job/streaming/KafkaDataLoader.java | 2 +-
.../org/apache/kylin/job/streaming/StreamingBootstrap.java | 2 +-
job/src/test/java/org/apache/kylin/job/DeployUtil.java | 7 +++----
.../java/org/apache/kylin/streaming/StreamingManager.java | 2 +-
.../java/org/apache/kylin/streaming/ITKafkaConsumerTest.java | 2 +-
.../java/org/apache/kylin/streaming/ITKafkaRequesterTest.java | 2 +-
.../java/org/apache/kylin/streaming/OneOffStreamProducer.java | 2 +-
.../java/org/apache/kylin/streaming/StreamingManagerTest.java | 2 +-
9 files changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/dictionary/.settings/org.eclipse.core.resources.prefs
----------------------------------------------------------------------
diff --git a/dictionary/.settings/org.eclipse.core.resources.prefs b/dictionary/.settings/org.eclipse.core.resources.prefs
index 04cfa2c..8bc0e1c 100644
--- a/dictionary/.settings/org.eclipse.core.resources.prefs
+++ b/dictionary/.settings/org.eclipse.core.resources.prefs
@@ -1,6 +1,5 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
-encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding//src/test/resources=UTF-8
encoding/<project>=UTF-8
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 8db89d3..eb47d15 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -23,7 +23,7 @@ public class KafkaDataLoader {
*/
public static void main(String[] args) throws IOException {
StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
- StreamingConfig streamingConfig = streamingManager.getKafkaConfig(args[1]);
+ StreamingConfig streamingConfig = streamingManager.getStreamingConfig(args[1]);
List<String> alldata = FileUtils.readLines(new File(args[0]));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 1c96e16..cad8423 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -113,7 +113,7 @@ public class StreamingBootstrap {
}
public void start(String streaming, int partitionId) throws Exception {
- final StreamingConfig streamingConfig = streamingManager.getKafkaConfig(streaming);
+ final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streaming);
Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
if (!StringUtils.isEmpty(streamingConfig.getIiName())) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9a4673f..cff78dc 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -27,8 +28,8 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HiveClient;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.dataGen.FactTableGenerator;
import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping;
@@ -37,8 +38,6 @@ import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
-import org.apache.tools.ant.filters.StringInputStream;
-import org.codehaus.plexus.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,7 +169,7 @@ public class DeployUtil {
// Write to resource store
ResourceStore store = ResourceStore.getStore(config());
- InputStream in = new StringInputStream(factTableContent);
+ InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8"));
String factTablePath = "/data/" + factTableName + ".csv";
store.deleteResource(factTablePath);
store.putResource(factTablePath, in, System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
index 3da35f1..f025216 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -106,7 +106,7 @@ public class StreamingManager {
}
}
- public StreamingConfig getKafkaConfig(String name) {
+ public StreamingConfig getStreamingConfig(String name) {
try {
return getStore().getResource(formatStreamingConfigPath(name), StreamingConfig.class, StreamingConfig.SERIALIZER);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
index db1a27e..2a65206 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
@@ -64,7 +64,7 @@ public class ITKafkaConsumerTest extends KafkaBaseTest {
public void before() throws IOException {
producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
producer.start();
- streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig("kafka_test");
kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
index e3cd4a6..4bc9e72 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
@@ -52,7 +52,7 @@ public class ITKafkaRequesterTest extends KafkaBaseTest {
@Before
public void before() {
- streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig("kafka_test");
kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
index 6630256..93da99f 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
@@ -69,7 +69,7 @@ public class OneOffStreamProducer {
public void start() throws IOException {
final Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
- final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig("kafka_test");
final KafkaClusterConfig kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
index ea76a93..772643b 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
@@ -65,7 +65,7 @@ public class StreamingManagerTest extends LocalFileMetadataTestCase {
@Test
public void test() {
- assertNotNull(streamingManager.getKafkaConfig("kafka_test"));
+ assertNotNull(streamingManager.getStreamingConfig("kafka_test"));
}
@Test
[3/3] incubator-kylin git commit: KYLIN-809 fix logs
Posted by ma...@apache.org.
KYLIN-809 fix logs
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/30acdb90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/30acdb90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/30acdb90
Branch: refs/heads/0.8.0
Commit: 30acdb9012cc87bcae4b40cf9b87af8b2bc5aaf2
Parents: e2c73de
Author: honma <ho...@ebay.com>
Authored: Wed Jun 3 17:33:50 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 3 17:33:50 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/streaming/CubeStreamBuilder.java | 2 +-
.../java/org/apache/kylin/job/streaming/StreamingBootstrap.java | 2 +-
.../src/main/java/org/apache/kylin/streaming/StreamBuilder.java | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/30acdb90/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 9ff0b20..6914b73 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -378,6 +378,6 @@ public class CubeStreamBuilder extends StreamBuilder {
@Override
protected int batchSize() {
- return 10000;
+ return 1000;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/30acdb90/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index cad8423..36f7dcf 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -165,7 +165,7 @@ public class StreamingBootstrap {
for (BlockingQueue<StreamMessage> onePartition : oneCluster) {
try {
alldata.put(onePartition.take());
- if (totalMessage++ % 10000 == 1) {
+ if (totalMessage++ % 10000 == 0) {
logger.info("Total stream message count: " + totalMessage);
}
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/30acdb90/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 8f3ce36..07b8616 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -106,7 +106,7 @@ public abstract class StreamBuilder implements Runnable {
}
if (streamMessage.getOffset() < 0) {
onStop();
- logger.warn("streaming encountered EOF, stop building");
+ logger.warn("streaming encountered EOF, stop building. The remaining {} filtered messages will be discarded", filteredMsgCount);
break;
}
@@ -114,7 +114,7 @@ public abstract class StreamBuilder implements Runnable {
if (getStreamFilter().filter(parsedStreamMessage)) {
- if (filteredMsgCount++ % 10000 == 1) {
+ if (filteredMsgCount++ % 10000 == 0) {
logger.info("Total filtered stream message count: " + filteredMsgCount);
}