You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/03 11:34:07 UTC

[1/3] incubator-kylin git commit: KYLIN-809 streaming cubing support multiple kafka clusters

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 f2fa12708 -> 30acdb901


KYLIN-809 streaming cubing support multiple kafka clusters


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/473b1745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/473b1745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/473b1745

Branch: refs/heads/0.8.0
Commit: 473b174571bafbf84b7b2db5411b3d31970d0645
Parents: f2fa127
Author: honma <ho...@ebay.com>
Authored: Wed Jun 3 11:00:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 3 15:28:55 2015 +0800

----------------------------------------------------------------------
 .../localmeta/streaming/kafka_test.json         |  30 +--
 .../streaming/test_streaming_table.json         |  12 +-
 .../kylin/job/streaming/KafkaDataLoader.java    |   6 +-
 .../kylin/job/streaming/StreamingBootstrap.java | 114 +++++++-----
 .../kylin/streaming/KafkaClusterConfig.java     |  54 ++++++
 .../org/apache/kylin/streaming/KafkaConfig.java | 185 -------------------
 .../apache/kylin/streaming/KafkaConsumer.java   |  16 +-
 .../apache/kylin/streaming/KafkaRequester.java  |  33 ++--
 .../apache/kylin/streaming/StreamBuilder.java   |   2 +-
 .../apache/kylin/streaming/StreamingConfig.java | 172 +++++++++++++++++
 .../kylin/streaming/StreamingManager.java       |   8 +-
 .../kylin/streaming/ITKafkaConsumerTest.java    |  10 +-
 .../kylin/streaming/ITKafkaRequesterTest.java   |  16 +-
 .../kylin/streaming/OneOffStreamProducer.java   |   7 +-
 14 files changed, 369 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/examples/test_case_data/localmeta/streaming/kafka_test.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/kafka_test.json b/examples/test_case_data/localmeta/streaming/kafka_test.json
index 4582dd8..5445417 100644
--- a/examples/test_case_data/localmeta/streaming/kafka_test.json
+++ b/examples/test_case_data/localmeta/streaming/kafka_test.json
@@ -1,14 +1,20 @@
 {
-  "uuid" : "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
-  "name" : "kafka_test",
-  "topic" : "kafka_stream_test",
-  "timeout" : 60000,
-  "maxReadCount" : 1000,
-  "bufferSize" : 65536,
-  "last_modified" : 0,
-  "brokers" : [ {
-    "id" : 0,
-    "host" : "sandbox.hortonworks.com",
-    "port" : 6667
-  } ]
+  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
+  "name": "kafka_test",
+  "topic": "kafka_stream_test",
+  "timeout": 60000,
+  "maxReadCount": 1000,
+  "bufferSize": 65536,
+  "last_modified": 0,
+  "clusters": [
+    {
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox.hortonworks.com",
+          "port": 6667
+        }
+      ]
+    }
+  ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/examples/test_case_data/localmeta/streaming/test_streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table.json b/examples/test_case_data/localmeta/streaming/test_streaming_table.json
index 537919a..92ec1d6 100644
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table.json
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table.json
@@ -10,11 +10,15 @@
   "parserName": "org.apache.kylin.streaming.JsonStreamParser",
   "partition": 1,
   "last_modified": 0,
-  "brokers": [
+  "clusters": [
     {
-      "id": 0,
-      "host": "sandbox",
-      "port": 6667
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox",
+          "port": 6667
+        }
+      ]
     }
   ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index d1be49a..8db89d3 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -5,7 +5,7 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.streaming.KafkaConfig;
+import org.apache.kylin.streaming.StreamingConfig;
 import org.apache.kylin.streaming.StreamingManager;
 
 import java.io.File;
@@ -23,7 +23,7 @@ public class KafkaDataLoader {
      */
     public static void main(String[] args) throws IOException {
         StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
-        KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(args[1]);
+        StreamingConfig streamingConfig = streamingManager.getKafkaConfig(args[1]);
 
         List<String> alldata = FileUtils.readLines(new File(args[0]));
 
@@ -37,7 +37,7 @@ public class KafkaDataLoader {
         Producer<String, String> producer = new Producer<String, String>(config);
 
         for (int i = 0; i < alldata.size(); ++i) {
-            KeyedMessage<String, String> data = new KeyedMessage<String, String>(kafkaConfig.getTopic(), String.valueOf(i), alldata.get(i));
+            KeyedMessage<String, String> data = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), alldata.get(i));
             producer.send(data);
         }
         producer.close();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index dcfa774..1c96e16 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -86,8 +86,8 @@ public class StreamingBootstrap {
         return bootstrap;
     }
 
-    private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
-        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+    private static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
+        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
         if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
             return partitionMetadata.leader();
         } else {
@@ -113,71 +113,83 @@ public class StreamingBootstrap {
     }
 
     public void start(String streaming, int partitionId) throws Exception {
-        final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
-        Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
+        final StreamingConfig streamingConfig = streamingManager.getKafkaConfig(streaming);
+        Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
 
-        final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
-        Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
-
-        if (!StringUtils.isEmpty(kafkaConfig.getIiName())) {
-            startIIStreaming(kafkaConfig, partitionId, partitionCount);
-        } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) {
-            startCubeStreaming(kafkaConfig, partitionId, partitionCount);
+        if (!StringUtils.isEmpty(streamingConfig.getIiName())) {
+            startIIStreaming(streamingConfig, partitionId);
+        } else if (!StringUtils.isEmpty(streamingConfig.getCubeName())) {
+            startCubeStreaming(streamingConfig, partitionId);
         } else {
             throw new IllegalArgumentException("no cube or ii in kafka config");
         }
     }
 
-    private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
+    private List<BlockingQueue<StreamMessage>> consume(KafkaClusterConfig kafkaClusterConfig, final int partitionCount) {
         List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
         for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
-            final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+            final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
 
-            final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
+            final long latestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
             long streamingOffset = latestOffset;
             logger.info("submitting offset:" + streamingOffset);
 
-            KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+            KafkaConsumer consumer = new KafkaConsumer(kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, 1);
             Executors.newSingleThreadExecutor().submit(consumer);
             result.add(consumer.getStreamQueue(0));
         }
         return result;
     }
 
-    private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
-        final String cubeName = kafkaConfig.getCubeName();
-        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+    private void startCubeStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
+        List<KafkaClusterConfig> kafkaClusterConfigs = streamingConfig.getKafkaClusterConfigs();
+
+        final List<List<BlockingQueue<StreamMessage>>> allClustersData = Lists.newArrayList();
 
-        final List<BlockingQueue<StreamMessage>> queues = consume(kafkaConfig, partitionCount);
-        final LinkedBlockingDeque<StreamMessage> streamQueue = new LinkedBlockingDeque<>();
+        for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
+            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+            Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
+
+            final List<BlockingQueue<StreamMessage>> oneClusterData = consume(kafkaClusterConfig, partitionCount);
+            logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
+            allClustersData.add(oneClusterData);
+        }
+
+        final LinkedBlockingDeque<StreamMessage> alldata = new LinkedBlockingDeque<>();
         Executors.newSingleThreadExecutor().execute(new Runnable() {
             @Override
             public void run() {
                 int totalMessage = 0;
                 while (true) {
-                    for (BlockingQueue<StreamMessage> queue : queues) {
-                        try {
-                            streamQueue.put(queue.take());
-                            if (totalMessage++ % 10000 == 1) {
-                                logger.info("Total stream message count: " + totalMessage);
+                    for (List<BlockingQueue<StreamMessage>> oneCluster : allClustersData) {
+                        for (BlockingQueue<StreamMessage> onePartition : oneCluster) {
+                            try {
+                                alldata.put(onePartition.take());
+                                if (totalMessage++ % 10000 == 1) {
+                                    logger.info("Total stream message count: " + totalMessage);
+                                }
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException(e);
                             }
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException(e);
                         }
                     }
                 }
             }
         });
-        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
-        cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
-        cubeStreamBuilder.setStreamFilter(getStreamFilter(kafkaConfig));
+
+        final String cubeName = streamingConfig.getCubeName();
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(alldata, cubeName);
+        cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
+        cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         future.get();
     }
 
-    private StreamParser getStreamParser(KafkaConfig kafkaConfig, List<TblColRef> columns) throws Exception {
-        if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
-            Class clazz = Class.forName(kafkaConfig.getParserName());
+    private StreamParser getStreamParser(StreamingConfig streamingConfig, List<TblColRef> columns) throws Exception {
+        if (!StringUtils.isEmpty(streamingConfig.getParserName())) {
+            Class clazz = Class.forName(streamingConfig.getParserName());
             Constructor constructor = clazz.getConstructor(List.class);
             return (StreamParser) constructor.newInstance(columns);
         } else {
@@ -185,22 +197,32 @@ public class StreamingBootstrap {
         }
     }
 
-    private StreamFilter getStreamFilter(KafkaConfig kafkaConfig) throws Exception {
-        if (!StringUtils.isEmpty(kafkaConfig.getFilterName())) {
-            Class clazz = Class.forName(kafkaConfig.getFilterName());
+    private StreamFilter getStreamFilter(StreamingConfig streamingConfig) throws Exception {
+        if (!StringUtils.isEmpty(streamingConfig.getFilterName())) {
+            Class clazz = Class.forName(streamingConfig.getFilterName());
             return (StreamFilter) clazz.newInstance();
         } else {
             return DefaultStreamFilter.instance;
         }
     }
 
-    private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
-        final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
-        Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
+    private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
+
+        List<KafkaClusterConfig> allClustersConfigs = streamingConfig.getKafkaClusterConfigs();
+        if (allClustersConfigs.size() != 1) {
+            throw new RuntimeException("II streaming only support one kafka cluster");
+        }
+        KafkaClusterConfig kafkaClusterConfig = allClustersConfigs.get(0);
+
+        final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+        Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
+
+        final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(streamingConfig.getIiName());
+        Preconditions.checkNotNull(ii, "cannot find ii name:" + streamingConfig.getIiName());
         Preconditions.checkArgument(ii.getSegments().size() > 0);
         final IISegment iiSegment = ii.getSegments().get(0);
 
-        final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+        final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
         Preconditions.checkState(leadBroker != null, "cannot find lead broker");
         final int shard = ii.getDescriptor().getSharding();
         Preconditions.checkArgument(shard % partitionCount == 0);
@@ -208,10 +230,10 @@ public class StreamingBootstrap {
         final int startShard = partitionId * parallelism;
         final int endShard = startShard + parallelism;
 
-        long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), startShard, endShard);
+        long streamingOffset = getEarliestStreamingOffset(streamingConfig.getName(), startShard, endShard);
         streamingOffset = streamingOffset - (streamingOffset % parallelism);
         logger.info("offset from ii desc is " + streamingOffset);
-        final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+        final long earliestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
         logger.info("offset from KafkaRequester is " + earliestOffset);
         streamingOffset = Math.max(streamingOffset, earliestOffset);
         logger.info("starting offset is " + streamingOffset);
@@ -221,14 +243,14 @@ public class StreamingBootstrap {
             throw new IllegalStateException("please create htable:" + iiSegment.getStorageLocationIdentifier() + " first");
         }
 
-        KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
-        kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
+        KafkaConsumer consumer = new KafkaConsumer(kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, parallelism);
+        kafkaConsumers.put(getKey(streamingConfig.getName(), partitionId), consumer);
 
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
-            task.setStreamParser(getStreamParser(kafkaConfig, ii.getDescriptor().listAllColumns()));
+            final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
+            task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
new file mode 100644
index 0000000..0661498
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.streaming;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 6/3/15.
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class KafkaClusterConfig extends RootPersistentEntity {
+    public static Serializer<KafkaClusterConfig> SERIALIZER = new JsonSerializer<KafkaClusterConfig>(KafkaClusterConfig.class);
+
+    @JsonProperty("brokers")
+    private List<BrokerConfig> brokerConfigs;
+
+    @JsonBackReference
+    private StreamingConfig streamingConfig;
+
+    public int getBufferSize() {
+        return streamingConfig.getBufferSize();
+    }
+
+    public String getTopic() {
+        return streamingConfig.getTopic();
+    }
+
+    public int getTimeout() {
+        return streamingConfig.getTimeout();
+    }
+
+    public int getMaxReadCount() {
+        return streamingConfig.getMaxReadCount();
+    }
+
+    public List<Broker> getBrokers() {
+        return Lists.transform(brokerConfigs, new Function<BrokerConfig, Broker>() {
+            @Nullable
+            @Override
+            public Broker apply(BrokerConfig input) {
+                return new Broker(input.getId(), input.getHost(), input.getPort());
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
deleted file mode 100644
index f0f3b6f..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
-
-import javax.annotation.Nullable;
-import java.io.*;
-import java.util.List;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class KafkaConfig extends RootPersistentEntity {
-
-    public static Serializer<KafkaConfig> SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("brokers")
-    private List<BrokerConfig> brokerConfigs;
-
-    @JsonProperty("topic")
-    private String topic;
-
-    @JsonProperty("timeout")
-    private int timeout;
-
-    @JsonProperty("maxReadCount")
-    private int maxReadCount;
-
-    @JsonProperty("bufferSize")
-    private int bufferSize;
-
-    @JsonProperty("iiName")
-    private String iiName;
-
-    @JsonProperty("cubeName")
-    private String cubeName;
-
-    @JsonProperty("parserName")
-    private String parserName;
-
-    @JsonProperty("filterName")
-    private String filterName;
-
-    public String getFilterName() {
-        return filterName;
-    }
-
-    public void setFilterName(String filterName) {
-        this.filterName = filterName;
-    }
-
-
-    public String getParserName() {
-        return parserName;
-    }
-
-    public void setParserName(String parserName) {
-        this.parserName = parserName;
-    }
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
-
-    public int getMaxReadCount() {
-        return maxReadCount;
-    }
-
-    public void setMaxReadCount(int maxReadCount) {
-        this.maxReadCount = maxReadCount;
-    }
-
-    public int getBufferSize() {
-        return bufferSize;
-    }
-
-    public void setBufferSize(int bufferSize) {
-        this.bufferSize = bufferSize;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public void setBrokerConfigs(List<BrokerConfig> brokerConfigs) {
-        this.brokerConfigs = brokerConfigs;
-    }
-
-    public List<Broker> getBrokers() {
-        return Lists.transform(brokerConfigs, new Function<BrokerConfig, Broker>() {
-            @Nullable
-            @Override
-            public Broker apply(BrokerConfig input) {
-                return new Broker(input.getId(), input.getHost(), input.getPort());
-            }
-        });
-    }
-
-    public String getCubeName() {
-        return cubeName;
-    }
-
-    public void setCubeName(String cubeName) {
-        this.cubeName = cubeName;
-    }
-
-    public String getIiName() {
-        return iiName;
-    }
-
-    public void setIiName(String iiName) {
-        this.iiName = iiName;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public KafkaConfig clone() {
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            SERIALIZER.serialize(this, new DataOutputStream(baos));
-            return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
-        } catch (IOException e) {
-            throw new RuntimeException(e);//in mem, should not happen
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 70d37ef..cfb8d82 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -54,7 +54,7 @@ public class KafkaConsumer implements Runnable {
 
     private final String topic;
     private final int partitionId;
-    private final KafkaConfig kafkaConfig;
+    private final KafkaClusterConfig streamingConfig;
     private final transient int parallelism;
     private final LinkedBlockingQueue<StreamMessage>[] streamQueue;
 
@@ -65,22 +65,22 @@ public class KafkaConsumer implements Runnable {
 
     private volatile boolean isRunning = true;
 
-    public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
-        this(topic, partitionId, startOffset, initialBrokers, kafkaConfig, 1);
+    public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaClusterConfig kafkaClusterConfig) {
+        this(topic, partitionId, startOffset, initialBrokers, kafkaClusterConfig, 1);
     }
 
-    public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig, int parallelism) {
+    public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaClusterConfig kafkaClusterConfig, int parallelism) {
         Preconditions.checkArgument(parallelism > 0);
         this.topic = topic;
         this.partitionId = partitionId;
-        this.kafkaConfig = kafkaConfig;
+        this.streamingConfig = kafkaClusterConfig;
         this.offset = startOffset;
         this.replicaBrokers = initialBrokers;
         this.logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
         this.parallelism = parallelism;
         this.streamQueue = new LinkedBlockingQueue[parallelism];
         for (int i = 0; i < parallelism; ++i) {
-            streamQueue[i] = new LinkedBlockingQueue<StreamMessage>(kafkaConfig.getMaxReadCount());
+            streamQueue[i] = new LinkedBlockingQueue<StreamMessage>(kafkaClusterConfig.getMaxReadCount());
         }
     }
 
@@ -89,7 +89,7 @@ public class KafkaConsumer implements Runnable {
     }
 
     private Broker getLeadBroker() {
-        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
+        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(topic, partitionId, replicaBrokers, streamingConfig);
         if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
             replicaBrokers = partitionMetadata.replicas();
             return partitionMetadata.leader();
@@ -124,7 +124,7 @@ public class KafkaConsumer implements Runnable {
 
                 logger.info("fetching topic {} partition id {} offset {} leader {}", new String[] { topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString() });
 
-                final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaConfig);
+                final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, streamingConfig);
                 if (fetchResponse.errorCode(topic, partitionId) != 0) {
                     logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
                     Thread.sleep(30000);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
index 02e2759..c21974f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
@@ -84,11 +84,11 @@ public final class KafkaRequester {
         return broker.getConnectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
     }
 
-    public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
+    public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {
         SimpleConsumer consumer;
-        for (Broker broker : kafkaConfig.getBrokers()) {
-            consumer = getSimpleConsumer(broker, kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
+        for (Broker broker : kafkaClusterConfig.getBrokers()) {
+            consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(kafkaClusterConfig.getTopic());
             TopicMetadataRequest req = new TopicMetadataRequest(topics);
             TopicMetadataResponse resp = consumer.send(req);
             final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
@@ -106,16 +106,16 @@ public final class KafkaRequester {
                     return partitionMetadata.partitionId();
                 }
             });
-            return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+            return new TopicMeta(kafkaClusterConfig.getTopic(), partitionIds);
         }
-        logger.debug("cannot find topic:" + kafkaConfig.getTopic());
+        logger.debug("cannot find topic:" + kafkaClusterConfig.getTopic());
         return null;
     }
 
-    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
+    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) {
         SimpleConsumer consumer;
         for (Broker broker : brokers) {
-            consumer = getSimpleConsumer(broker, kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+            consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
             List<String> topics = Collections.singletonList(topic);
             TopicMetadataRequest req = new TopicMetadataRequest(topics);
             TopicMetadataResponse resp = consumer.send(req);
@@ -139,25 +139,21 @@ public final class KafkaRequester {
         return null;
     }
 
-    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
+    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
         final String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = getSimpleConsumer(broker, kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
-        kafka.api.FetchRequest req = new FetchRequestBuilder()
-                .clientId(clientName)
-                .addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell
+        SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
+        kafka.api.FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell
                 .build();
         return consumer.fetch(req);
     }
 
-    public static long getLastOffset(String topic, int partitionId,
-                                     long whichTime, Broker broker, KafkaConfig kafkaConfig) {
+    public static long getLastOffset(String topic, int partitionId, long whichTime, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
         String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = getSimpleConsumer(broker, kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+        SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
-                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
         OffsetResponse response = consumer.getOffsetsBefore(request);
 
         if (response.hasError()) {
@@ -174,5 +170,4 @@ public final class KafkaRequester {
         }
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index ce5e3d7..8f3ce36 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -115,7 +115,7 @@ public abstract class StreamBuilder implements Runnable {
                 if (getStreamFilter().filter(parsedStreamMessage)) {
 
                     if (filteredMsgCount++ % 10000 == 1) {
-                        logger.info("Total stream message count: " + filteredMsgCount);
+                        logger.info("Total filtered stream message count: " + filteredMsgCount);
                     }
 
                     if (startOffset > parsedStreamMessage.getOffset()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
new file mode 100644
index 0000000..68aaf6b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
@@ -0,0 +1,172 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+
+import java.io.*;
+import java.util.List;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class StreamingConfig extends RootPersistentEntity {
+
+    public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+    @JsonProperty("name")
+    private String name;
+
+    @JsonManagedReference
+    @JsonProperty("clusters")
+    private List<KafkaClusterConfig> kafkaClusterConfigs;
+
+    @JsonProperty("topic")
+    private String topic;
+
+    @JsonProperty("timeout")
+    private int timeout;
+
+    @JsonProperty("maxReadCount")
+    private int maxReadCount;
+
+    @JsonProperty("bufferSize")
+    private int bufferSize;
+
+    @JsonProperty("iiName")
+    private String iiName;
+
+    @JsonProperty("cubeName")
+    private String cubeName;
+
+    @JsonProperty("parserName")
+    private String parserName;
+
+    @JsonProperty("filterName")
+    private String filterName;
+
+    public List<KafkaClusterConfig> getKafkaClusterConfigs() {
+        return kafkaClusterConfigs;
+    }
+
+    public String getFilterName() {
+        return filterName;
+    }
+
+    public void setFilterName(String filterName) {
+        this.filterName = filterName;
+    }
+
+    public String getParserName() {
+        return parserName;
+    }
+
+    public void setParserName(String parserName) {
+        this.parserName = parserName;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getMaxReadCount() {
+        return maxReadCount;
+    }
+
+    public void setMaxReadCount(int maxReadCount) {
+        this.maxReadCount = maxReadCount;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getCubeName() {
+        return cubeName;
+    }
+
+    public void setCubeName(String cubeName) {
+        this.cubeName = cubeName;
+    }
+
+    public String getIiName() {
+        return iiName;
+    }
+
+    public void setIiName(String iiName) {
+        this.iiName = iiName;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public StreamingConfig clone() {
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            SERIALIZER.serialize(this, new DataOutputStream(baos));
+            return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+        } catch (IOException e) {
+            throw new RuntimeException(e);//in mem, should not happen
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
index 46c34fa..3da35f1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -96,9 +96,9 @@ public class StreamingManager {
     }
 
 
-    public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
+    public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
         try {
-            getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+            getStore().putResource(formatStreamingConfigPath(name), config, StreamingConfig.SERIALIZER);
             return true;
         } catch (IOException e) {
             logger.error("error save resource name:" + name, e);
@@ -106,9 +106,9 @@ public class StreamingManager {
         }
     }
 
-    public KafkaConfig getKafkaConfig(String name) {
+    public StreamingConfig getKafkaConfig(String name) {
         try {
-            return getStore().getResource(formatStreamingConfigPath(name), KafkaConfig.class, KafkaConfig.SERIALIZER);
+            return getStore().getResource(formatStreamingConfigPath(name), StreamingConfig.class, StreamingConfig.SERIALIZER);
         } catch (IOException e) {
             logger.error("error get resource name:" + name, e);
             throw new RuntimeException("error get resource name:" + name, e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
index 3008314..db1a27e 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
@@ -57,13 +57,15 @@ public class ITKafkaConsumerTest extends KafkaBaseTest {
 
     private static final int TOTAL_SEND_COUNT = 100;
 
-    private KafkaConfig kafkaConfig;
+    private StreamingConfig streamingConfig;
+    private KafkaClusterConfig kafkaClusterConfig;
 
     @Before
     public void before() throws IOException {
         producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
         producer.start();
-        kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
     }
 
     @After
@@ -84,11 +86,11 @@ public class ITKafkaConsumerTest extends KafkaBaseTest {
     @Test
     @Ignore("since ci does not have the topic")
     public void test() throws InterruptedException {
-        final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
+        final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig);
         final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
         List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
         for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
-            KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaConfig.getBrokers(), kafkaConfig);
+            KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
             queues.add(consumer.getStreamQueue(0));
             executorService.execute(consumer);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
index d8853f6..e3cd4a6 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
@@ -47,12 +47,13 @@ import static org.junit.Assert.*;
 public class ITKafkaRequesterTest extends KafkaBaseTest {
 
     private static final String NON_EXISTED_TOPIC = "non_existent_topic";
-    private KafkaConfig kafkaConfig;
-
+    private StreamingConfig streamingConfig;
+    private KafkaClusterConfig kafkaClusterConfig;
 
     @Before
     public void before() {
-        kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
     }
 
     @AfterClass
@@ -62,15 +63,16 @@ public class ITKafkaRequesterTest extends KafkaBaseTest {
     @Test
     @Ignore("since ci does not enable kafka")
     public void testTopicMeta() throws Exception {
-        TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
+        TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig);
         assertNotNull(kafkaTopicMeta);
         assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
-        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
+        assertEquals(streamingConfig.getTopic(), kafkaTopicMeta.getName());
 
-        KafkaConfig anotherTopicConfig = kafkaConfig.clone();
+        StreamingConfig anotherTopicConfig = streamingConfig.clone();
+        KafkaClusterConfig anotherClusterConfig = anotherTopicConfig.getKafkaClusterConfigs().get(0);
         anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
 
-        kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(anotherTopicConfig);
+        kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(anotherClusterConfig);
         assertTrue(kafkaTopicMeta == null);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/473b1745/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
index c17a3fc..6630256 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
@@ -69,10 +69,11 @@ public class OneOffStreamProducer {
     public void start() throws IOException {
         final Properties properties = new Properties();
         properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        final KafkaConfig kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        final KafkaClusterConfig kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
 
         Properties props = new Properties();
-        props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+        props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaClusterConfig.getBrokers().iterator(), new Function<Broker, String>() {
             @Nullable
             @Override
             public String apply(@Nullable Broker broker) {
@@ -89,7 +90,7 @@ public class OneOffStreamProducer {
             public void run() {
                 int count = 0;
                 while (!stopped && count < sendCount) {
-                    final KeyedMessage<String, String> message = new KeyedMessage<String, String>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
+                    final KeyedMessage<String, String> message = new KeyedMessage<String, String>(streamingConfig.getTopic(), "current time is:" + System.currentTimeMillis());
                     producer.send(message);
                     count++;
                     try {


[2/3] incubator-kylin git commit: KYLIN-809 renaming

Posted by ma...@apache.org.
KYLIN-809 renaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e2c73de2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e2c73de2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e2c73de2

Branch: refs/heads/0.8.0
Commit: e2c73de291ffb7d8cd272a408e00a935a46f36f4
Parents: 473b174
Author: honma <ho...@ebay.com>
Authored: Wed Jun 3 17:07:30 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 3 17:07:30 2015 +0800

----------------------------------------------------------------------
 dictionary/.settings/org.eclipse.core.resources.prefs         | 1 -
 .../java/org/apache/kylin/job/streaming/KafkaDataLoader.java  | 2 +-
 .../org/apache/kylin/job/streaming/StreamingBootstrap.java    | 2 +-
 job/src/test/java/org/apache/kylin/job/DeployUtil.java        | 7 +++----
 .../java/org/apache/kylin/streaming/StreamingManager.java     | 2 +-
 .../java/org/apache/kylin/streaming/ITKafkaConsumerTest.java  | 2 +-
 .../java/org/apache/kylin/streaming/ITKafkaRequesterTest.java | 2 +-
 .../java/org/apache/kylin/streaming/OneOffStreamProducer.java | 2 +-
 .../java/org/apache/kylin/streaming/StreamingManagerTest.java | 2 +-
 9 files changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/dictionary/.settings/org.eclipse.core.resources.prefs
----------------------------------------------------------------------
diff --git a/dictionary/.settings/org.eclipse.core.resources.prefs b/dictionary/.settings/org.eclipse.core.resources.prefs
index 04cfa2c..8bc0e1c 100644
--- a/dictionary/.settings/org.eclipse.core.resources.prefs
+++ b/dictionary/.settings/org.eclipse.core.resources.prefs
@@ -1,6 +1,5 @@
 eclipse.preferences.version=1
 encoding//src/main/java=UTF-8
-encoding//src/main/resources=UTF-8
 encoding//src/test/java=UTF-8
 encoding//src/test/resources=UTF-8
 encoding/<project>=UTF-8

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 8db89d3..eb47d15 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -23,7 +23,7 @@ public class KafkaDataLoader {
      */
     public static void main(String[] args) throws IOException {
         StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
-        StreamingConfig streamingConfig = streamingManager.getKafkaConfig(args[1]);
+        StreamingConfig streamingConfig = streamingManager.getStreamingConfig(args[1]);
 
         List<String> alldata = FileUtils.readLines(new File(args[0]));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 1c96e16..cad8423 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -113,7 +113,7 @@ public class StreamingBootstrap {
     }
 
     public void start(String streaming, int partitionId) throws Exception {
-        final StreamingConfig streamingConfig = streamingManager.getKafkaConfig(streaming);
+        final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streaming);
         Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
 
         if (!StringUtils.isEmpty(streamingConfig.getIiName())) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9a4673f..cff78dc 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -27,8 +28,8 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HiveClient;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
 import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping;
@@ -37,8 +38,6 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
-import org.apache.tools.ant.filters.StringInputStream;
-import org.codehaus.plexus.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -170,7 +169,7 @@ public class DeployUtil {
         // Write to resource store
         ResourceStore store = ResourceStore.getStore(config());
 
-        InputStream in = new StringInputStream(factTableContent);
+        InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8"));
         String factTablePath = "/data/" + factTableName + ".csv";
         store.deleteResource(factTablePath);
         store.putResource(factTablePath, in, System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
index 3da35f1..f025216 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -106,7 +106,7 @@ public class StreamingManager {
         }
     }
 
-    public StreamingConfig getKafkaConfig(String name) {
+    public StreamingConfig getStreamingConfig(String name) {
         try {
             return getStore().getResource(formatStreamingConfigPath(name), StreamingConfig.class, StreamingConfig.SERIALIZER);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
index db1a27e..2a65206 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
@@ -64,7 +64,7 @@ public class ITKafkaConsumerTest extends KafkaBaseTest {
     public void before() throws IOException {
         producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
         producer.start();
-        streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig("kafka_test");
         kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
index e3cd4a6..4bc9e72 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
@@ -52,7 +52,7 @@ public class ITKafkaRequesterTest extends KafkaBaseTest {
 
     @Before
     public void before() {
-        streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig("kafka_test");
         kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
index 6630256..93da99f 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
@@ -69,7 +69,7 @@ public class OneOffStreamProducer {
     public void start() throws IOException {
         final Properties properties = new Properties();
         properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig("kafka_test");
         final KafkaClusterConfig kafkaClusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
 
         Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e2c73de2/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
index ea76a93..772643b 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
@@ -65,7 +65,7 @@ public class StreamingManagerTest extends LocalFileMetadataTestCase {
 
     @Test
     public void test() {
-        assertNotNull(streamingManager.getKafkaConfig("kafka_test"));
+        assertNotNull(streamingManager.getStreamingConfig("kafka_test"));
     }
 
     @Test


[3/3] incubator-kylin git commit: KYLIN-809 fix logs

Posted by ma...@apache.org.
KYLIN-809 fix logs


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/30acdb90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/30acdb90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/30acdb90

Branch: refs/heads/0.8.0
Commit: 30acdb9012cc87bcae4b40cf9b87af8b2bc5aaf2
Parents: e2c73de
Author: honma <ho...@ebay.com>
Authored: Wed Jun 3 17:33:50 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 3 17:33:50 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/streaming/CubeStreamBuilder.java   | 2 +-
 .../java/org/apache/kylin/job/streaming/StreamingBootstrap.java  | 2 +-
 .../src/main/java/org/apache/kylin/streaming/StreamBuilder.java  | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/30acdb90/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 9ff0b20..6914b73 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -378,6 +378,6 @@ public class CubeStreamBuilder extends StreamBuilder {
 
     @Override
     protected int batchSize() {
-        return 10000;
+        return 1000;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/30acdb90/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index cad8423..36f7dcf 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -165,7 +165,7 @@ public class StreamingBootstrap {
                         for (BlockingQueue<StreamMessage> onePartition : oneCluster) {
                             try {
                                 alldata.put(onePartition.take());
-                                if (totalMessage++ % 10000 == 1) {
+                                if (totalMessage++ % 10000 == 0) {
                                     logger.info("Total stream message count: " + totalMessage);
                                 }
                             } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/30acdb90/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 8f3ce36..07b8616 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -106,7 +106,7 @@ public abstract class StreamBuilder implements Runnable {
                 }
                 if (streamMessage.getOffset() < 0) {
                     onStop();
-                    logger.warn("streaming encountered EOF, stop building");
+                    logger.warn("streaming encountered EOF, stop building. The remaining {} filtered messages will be discarded", filteredMsgCount);
                     break;
                 }
 
@@ -114,7 +114,7 @@ public abstract class StreamBuilder implements Runnable {
 
                 if (getStreamFilter().filter(parsedStreamMessage)) {
 
-                    if (filteredMsgCount++ % 10000 == 1) {
+                    if (filteredMsgCount++ % 10000 == 0) {
                         logger.info("Total filtered stream message count: " + filteredMsgCount);
                     }