You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:07:57 UTC

[01/26] incubator-kylin git commit: measure update rule fix

Repository: incubator-kylin
Updated Branches:
  refs/heads/streaming aa49da35f -> 825409504


measure update rule fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/80a50655
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/80a50655
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/80a50655

Branch: refs/heads/streaming
Commit: 80a50655b4290a845278ec5e24d7a4a3145ff766
Parents: 2b01dcc
Author: jiazhong <ji...@ebay.com>
Authored: Fri Feb 27 13:54:15 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Fri Feb 27 13:54:15 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/cubeEdit.js          |  9 +++++++++
 webapp/app/js/controllers/cubeSchema.js        | 16 ++++++++++++++++
 webapp/app/partials/cubeDesigner/measures.html | 14 ++++++--------
 3 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/80a50655/webapp/app/js/controllers/cubeEdit.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js
index c2f0b5a..0e05077 100755
--- a/webapp/app/js/controllers/cubeEdit.js
+++ b/webapp/app/js/controllers/cubeEdit.js
@@ -36,6 +36,15 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
         return temp;
     };
 
+    $scope.getColumnType = function (_column,table){
+        var columns = $scope.getColumnsByTable(table);
+        angular.forEach(columns,function(column){
+            if(_column===column.name){
+                return column.type;
+            }
+        });
+    };
+
     var ColFamily = function () {
         var index = 1;
         return function () {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/80a50655/webapp/app/js/controllers/cubeSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
old mode 100644
new mode 100755
index e03808c..4bb5762
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -107,6 +107,22 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
         $scope.newMeasure = null;
     };
 
+    // !count !count distinct
+    $scope.measureParamValueUpdate = function(){
+        if(newMeasure.function.expression!=="COUNT"&&newMeasure.function.expression!=="COUNT_DISTINCT"){
+
+            var column = $scope.newMeasure.function.parameter.value;
+
+
+            switch(newMeasure.function.expression){
+                case "SUM":
+                    var colType = $scope.getColumnType(column, $scope.metaModel.model.fact_table);
+                    $log.log(colType);
+                    break;
+            }
+        }
+    }
+
     $scope.addNewRowkeyColumn = function () {
         $scope.cubeMetaFrame.rowkey.rowkey_columns.push({
             "column": "",

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/80a50655/webapp/app/partials/cubeDesigner/measures.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/measures.html b/webapp/app/partials/cubeDesigner/measures.html
index ef7c12f..9a932bd 100755
--- a/webapp/app/partials/cubeDesigner/measures.html
+++ b/webapp/app/partials/cubeDesigner/measures.html
@@ -110,7 +110,7 @@
                             <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Expression</b></label>
                             <div class="col-xs-12 col-sm-6">
                                 <select class="form-control"
-                                    ng-init="newMeasure.function.expression = (!!newMeasure.function.expression)?newMeasure.function.expression:cubeConfig.dftSelections.measureExpression"                                    chosen ng-model="newMeasure.function.expression" required
+                                    ng-init="newMeasure.function.expression = (!!newMeasure.function.expression)?newMeasure.function.expression:cubeConfig.dftSelections.measureExpression" chosen ng-model="newMeasure.function.expression" required
                                     ng-options="me as me for me in cubeConfig.measureExpressions">
                                     <option value=""></option>
                                 </select>
@@ -146,6 +146,7 @@
                                 <select class="form-control" chosen
                                     ng-if="newMeasure.function.parameter.type == 'column'"
                                     ng-model="newMeasure.function.parameter.value"
+                                    ng-change="measureParamValueUpdate();"
                                     ng-options="columns.name as columns.name for columns in getColumnsByTable(metaModel.model.fact_table)" >
                                     <option value="">-- Select a Fact Table Column --</option>
                                 </select>
@@ -158,13 +159,6 @@
                             <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Return Type</b></label>
                             <div class="col-xs-12 col-sm-6">
                                 <select class="form-control"
-                                    ng-if="newMeasure.function.expression != 'COUNT_DISTINCT' && newMeasure.function.expression != 'COUNT' "
-                                    ng-init="newMeasure.function.returntype = (!!newMeasure.function.returntype)?newMeasure.function.returntype:cubeConfig.dftSelections.measureDataType.value"
-                                    chosen ng-model="newMeasure.function.returntype" required
-                                    ng-options="mdt.value as mdt.name for mdt in cubeConfig.measureDataTypes">
-                                    <option value=""></option>
-                                </select>
-                                <select class="form-control"
                                     ng-if="newMeasure.function.expression == 'COUNT_DISTINCT'"
                                     ng-init="newMeasure.function.returntype = (!!newMeasure.function.returntype)?newMeasure.function.returntype:cubeConfig.dftSelections.distinctDataType.value"
                                     chosen ng-model="newMeasure.function.returntype" required
@@ -175,6 +169,10 @@
                                     ng-if="newMeasure.function.expression == 'COUNT'"
                                     ng-init="newMeasure.function.returntype= 'bigint' "><b>&nbsp;&nbsp;BIGINT</b>
                                 </span>
+                                <span class="font-color-default"
+                                      ng-if="newMeasure.function.expression != 'COUNT_DISTINCT' && newMeasure.function.expression != 'COUNT' "
+                                     ><b>&nbsp;&nbsp;{{newMeasure.function.returntype}}</b>
+                                </span>
                             </div>
                         </div>
                     </div>


[19/26] incubator-kylin git commit: refactor

Posted by li...@apache.org.
refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/58a6f73f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/58a6f73f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/58a6f73f

Branch: refs/heads/streaming
Commit: 58a6f73f530a5008ee565c863b0473fef5d34200
Parents: c038cb4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 18:10:48 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 18:10:48 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/streaming/kafka/Consumer.java  |  14 +-
 .../kylin/streaming/kafka/ConsumerConfig.java   |  72 ---------
 .../kylin/streaming/kafka/KafkaConfig.java      | 151 +++++++++++++++++++
 .../apache/kylin/streaming/kafka/Requester.java |  26 ++--
 .../kylin/streaming/kafka/TopicConfig.java      |  66 --------
 .../kylin/streaming/kafka/KafkaBaseTest.java    |  11 +-
 .../kylin/streaming/kafka/KafkaConfigTest.java  |  64 ++++++++
 .../streaming/kafka/KafkaConsumerTest.java      |  15 +-
 .../kylin/streaming/kafka/RequesterTest.java    |  26 +---
 .../kylin/streaming/kafka/TestConstants.java    |  48 ------
 .../kylin/streaming/kafka/TestProducer.java     |  25 ++-
 .../kafka_streaming_test/kafka.properties       |  10 ++
 12 files changed, 286 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
index c825d4b..074ef01 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -56,20 +56,20 @@ public class Consumer implements Runnable {
     private String topic;
     private int partitionId;
 
-    private ConsumerConfig consumerConfig;
+    private KafkaConfig kafkaConfig;
     private List<Broker> replicaBrokers;
     private AtomicLong offset = new AtomicLong();
     private BlockingQueue<Stream> streamQueue;
 
     private Logger logger;
 
-    public Consumer(String topic, int partitionId, List<Broker> initialBrokers, ConsumerConfig consumerConfig) {
+    public Consumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
         this.topic = topic;
         this.partitionId = partitionId;
-        this.consumerConfig = consumerConfig;
+        this.kafkaConfig = kafkaConfig;
         this.replicaBrokers = initialBrokers;
         logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
-        streamQueue = new ArrayBlockingQueue<Stream>(consumerConfig.getMaxReadCount());
+        streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
     }
 
     public BlockingQueue<Stream> getStreamQueue() {
@@ -77,7 +77,7 @@ public class Consumer implements Runnable {
     }
 
     private Broker getLeadBroker() {
-        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, consumerConfig);
+        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
         if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
             replicaBrokers = partitionMetadata.replicas();
             return partitionMetadata.leader();
@@ -94,9 +94,9 @@ public class Consumer implements Runnable {
                 logger.warn("cannot find lead broker");
                 continue;
             }
-            final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, consumerConfig);
+            final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
             offset.set(lastOffset);
-            final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
+            final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
             if (fetchResponse.errorCode(topic, partitionId) != 0) {
                 logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
                 continue;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
deleted file mode 100644
index 3bdbd13..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class ConsumerConfig {
-
-    private int timeout;
-
-    private int maxReadCount;
-
-    private int bufferSize;
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
-
-    public int getMaxReadCount() {
-        return maxReadCount;
-    }
-
-    public void setMaxReadCount(int maxReadCount) {
-        this.maxReadCount = maxReadCount;
-    }
-
-    public int getBufferSize() {
-        return bufferSize;
-    }
-
-    public void setBufferSize(int bufferSize) {
-        this.bufferSize = bufferSize;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
new file mode 100644
index 0000000..82513eb
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
@@ -0,0 +1,151 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfig {
+
+    private List<Broker> brokers;
+
+    private String zookeeper;
+
+    private String topic;
+
+    private int timeout;
+
+    private int maxReadCount;
+
+    private int bufferSize;
+
+    private int partitionId;
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getMaxReadCount() {
+        return maxReadCount;
+    }
+
+    public void setMaxReadCount(int maxReadCount) {
+        this.maxReadCount = maxReadCount;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public List<Broker> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(List<Broker> brokers) {
+        this.brokers = brokers;
+    }
+
+    public String getZookeeper() {
+        return zookeeper;
+    }
+
+    public void setZookeeper(String zookeeper) {
+        this.zookeeper = zookeeper;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public static KafkaConfig load(KafkaConfig config) {
+        KafkaConfig result = new KafkaConfig();
+        result.setBufferSize(config.getBufferSize());
+        result.setMaxReadCount(config.getMaxReadCount());
+        result.setTimeout(config.getTimeout());
+        result.setTopic(config.getTopic());
+        result.setZookeeper(config.getZookeeper());
+        result.setPartitionId(config.getPartitionId());
+        result.setBrokers(config.getBrokers());
+        return result;
+    }
+
+    public static KafkaConfig load(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        KafkaConfig result = new KafkaConfig();
+        result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
+        result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
+        result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
+        result.setTopic(properties.getProperty("topic"));
+        result.setZookeeper(properties.getProperty("zookeeper"));
+        result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
+
+        int id = 0;
+        List<Broker> brokers = Lists.newArrayList();
+        for (String str: properties.getProperty("brokers").split(",")) {
+            final String[] split = str.split(":");
+            final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
+            brokers.add(broker);
+        }
+        result.setBrokers(brokers);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
index f4cdd8e..8811695 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -58,11 +58,11 @@ public final class Requester {
 
     private static final Logger logger = LoggerFactory.getLogger(Requester.class);
 
-    public static TopicMeta getKafkaTopicMeta(TopicConfig topicConfig, ConsumerConfig consumerConfig) {
+    public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
         SimpleConsumer consumer;
-        for (Broker broker : topicConfig.getBrokers()) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(topicConfig.getTopic());
+        for (Broker broker : kafkaConfig.getBrokers()) {
+            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
             TopicMetadataRequest req = new TopicMetadataRequest(topics);
             TopicMetadataResponse resp = consumer.send(req);
             final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
@@ -80,16 +80,16 @@ public final class Requester {
                     return partitionMetadata.partitionId();
                 }
             });
-            return new TopicMeta(topicConfig.getTopic(), partitionIds);
+            return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
         }
-        logger.debug("cannot find topic:" + topicConfig.getTopic());
+        logger.debug("cannot find topic:" + kafkaConfig.getTopic());
         return null;
     }
 
-    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, ConsumerConfig consumerConfig) {
+    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
         SimpleConsumer consumer;
         for (Broker broker : brokers) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
+            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
             List<String> topics = Collections.singletonList(topic);
             TopicMetadataRequest req = new TopicMetadataRequest(topics);
             TopicMetadataResponse resp = consumer.send(req);
@@ -113,20 +113,20 @@ public final class Requester {
         return null;
     }
 
-    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, ConsumerConfig consumerConfig) {
+    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
         final String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
         kafka.api.FetchRequest req = new FetchRequestBuilder()
                 .clientId(clientName)
-                .addFetch(topic, partitionId, offset, consumerConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+                .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                 .build();
         return consumer.fetch(req);
     }
 
     public static long getLastOffset(String topic, int partitionId,
-                                     long whichTime, Broker broker, ConsumerConfig consumerConfig) {
+                                     long whichTime, Broker broker, KafkaConfig kafkaConfig) {
         String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
deleted file mode 100644
index 4aa9671..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.cluster.Broker;
-
-import java.util.List;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TopicConfig {
-
-    private String topic;
-
-    private List<Broker> brokers;
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public List<Broker> getBrokers() {
-        return brokers;
-    }
-
-    public void setBrokers(List<Broker> brokers) {
-        this.brokers = brokers;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
index a1f9b87..537a15d 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
@@ -41,6 +41,7 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Properties;
 
 /**
@@ -52,9 +53,15 @@ public abstract class KafkaBaseTest {
 
     protected static ZkClient zkClient;
 
+    protected static KafkaConfig kafkaConfig;
+
     @BeforeClass
-    public static void beforeClass() {
-        zkClient = new ZkClient(TestConstants.ZOOKEEPER);
+    public static void beforeClass() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        kafkaConfig = KafkaConfig.load(properties);
+
+        zkClient = new ZkClient(kafkaConfig.getZookeeper());
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
new file mode 100644
index 0000000..3c9bd87
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfigTest {
+
+    @Test
+    public void test() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        KafkaConfig config = KafkaConfig.load(properties);
+        assertEquals(1000, config.getMaxReadCount());
+        assertEquals(65536, config.getBufferSize());
+        assertEquals(60000, config.getTimeout());
+        assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
+        assertEquals("kafka_stream_test", config.getTopic());
+        assertEquals(0, config.getPartitionId());
+        assertEquals(1, config.getBrokers().size());
+        assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
index d89695d..4449d37 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
@@ -36,10 +36,12 @@ package org.apache.kylin.streaming.kafka;
 
 import com.google.common.collect.Lists;
 import kafka.cluster.Broker;
+import kafka.consumer.ConsumerConfig;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -58,7 +60,7 @@ public class KafkaConsumerTest extends KafkaBaseTest {
     private static final int TOTAL_SEND_COUNT = 100;
 
     @Before
-    public void before() {
+    public void before() throws IOException {
         producer = new TestProducer(TOTAL_SEND_COUNT);
         producer.start();
     }
@@ -80,18 +82,11 @@ public class KafkaConsumerTest extends KafkaBaseTest {
 
     @Test
     public void test() throws InterruptedException {
-        TopicConfig topicConfig = new TopicConfig();
-        topicConfig.setTopic(TestConstants.TOPIC);
-        topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
-        ConsumerConfig consumerConfig = new ConsumerConfig();
-        consumerConfig.setBufferSize(64 * 1024);
-        consumerConfig.setMaxReadCount(1000);
-        consumerConfig.setTimeout(60 * 1000);
-        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
         final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
         List<BlockingQueue<Stream>> queues = Lists.newArrayList();
         for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
-            Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, Lists.asList(TestConstants.BROKER, new Broker[0]), consumerConfig);
+            Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
             queues.add(consumer.getStreamQueue());
             executorService.execute(consumer);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
index fd8cc63..694c1fd 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
@@ -35,7 +35,6 @@
 package org.apache.kylin.streaming.kafka;
 
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -47,21 +46,9 @@ import static org.junit.Assert.*;
  */
 public class RequesterTest extends KafkaBaseTest {
 
-    private static TopicConfig topicConfig;
-    private static ConsumerConfig consumerConfig;
+    private static final String NON_EXISTED_TOPIC = "non_existent_topic";
 
-    private static final String UNEXISTED_TOPIC = "unexist_topic";
 
-    @BeforeClass
-    public static void beforeClass() {
-        topicConfig = new TopicConfig();
-        topicConfig.setTopic(TestConstants.TOPIC);
-        topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
-        consumerConfig = new ConsumerConfig();
-        consumerConfig.setBufferSize(64 * 1024);
-        consumerConfig.setMaxReadCount(1000);
-        consumerConfig.setTimeout(60 * 1000);
-    }
 
     @AfterClass
     public static void afterClass() {
@@ -69,16 +56,15 @@ public class RequesterTest extends KafkaBaseTest {
 
     @Test
     public void testTopicMeta() throws Exception {
-        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
         assertNotNull(kafkaTopicMeta);
         assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
-        assertEquals(topicConfig.getTopic(), kafkaTopicMeta.getName());
+        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
 
-        TopicConfig anotherTopicConfig = new TopicConfig();
-        anotherTopicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
-        anotherTopicConfig.setTopic(UNEXISTED_TOPIC);
+        KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
+        anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
 
-        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig, consumerConfig);
+        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
         assertTrue(kafkaTopicMeta == null);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
deleted file mode 100644
index 85a6ba3..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.cluster.Broker;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TestConstants {
-
-    public static final String TOPIC = "kafka_stream_test";
-    public static final String ZOOKEEPER = "sandbox.hortonworks.com:2181";
-    public static final Broker BROKER = new Broker(0, "sandbox.hortonworks.com", 6667);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
index 54ad583..cd3b166 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
@@ -34,12 +34,19 @@
 
 package org.apache.kylin.streaming.kafka;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.Properties;
 
 /**
@@ -49,7 +56,7 @@ public class TestProducer {
 
     private volatile boolean stopped = false;
 
-    private static final Logger logger = LoggerFactory.getLogger(TestConstants.class);
+    private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
 
     private final int sendCount;
 
@@ -57,9 +64,19 @@ public class TestProducer {
         this.sendCount = sendCount;
     }
 
-    public void start() {
+    public void start() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
+
         Properties props = new Properties();
-        props.put("metadata.broker.list", TestConstants.BROKER.getConnectionString());
+        props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+            @Nullable
+            @Override
+            public String apply(@Nullable Broker broker) {
+                return broker.getConnectionString();
+            }
+        }), ","));
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         props.put("request.required.acks", "1");
         ProducerConfig config = new ProducerConfig(props);
@@ -70,7 +87,7 @@ public class TestProducer {
             public void run() {
                 int count = 0;
                 while (!stopped && count < sendCount) {
-                    final KeyedMessage<String, String> message = new KeyedMessage<>(TestConstants.TOPIC, "current time is:" + System.currentTimeMillis());
+                    final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
                     producer.send(message);
                     count++;
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/resources/kafka_streaming_test/kafka.properties
----------------------------------------------------------------------
diff --git a/streaming/src/test/resources/kafka_streaming_test/kafka.properties b/streaming/src/test/resources/kafka_streaming_test/kafka.properties
new file mode 100644
index 0000000..ae762e3
--- /dev/null
+++ b/streaming/src/test/resources/kafka_streaming_test/kafka.properties
@@ -0,0 +1,10 @@
+zookeeper=sandbox.hortonworks.com:2181
+
+brokers=sandbox.hortonworks.com:6667
+
+consumer.timeout=60000
+consumer.bufferSize=65536
+consumer.maxReadCount=1000
+
+topic=kafka_stream_test
+partitionId=0


[18/26] incubator-kylin git commit: Merge branch 'inverted-index' into streaming

Posted by li...@apache.org.
Merge branch 'inverted-index' into streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f42ef837
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f42ef837
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f42ef837

Branch: refs/heads/streaming
Commit: f42ef837bf0f01f4b88672654b698a4856bb6f7d
Parents: c038cb4 8e011db
Author: liyang@apache.org <ya...@D-SHC-00801746.corp.ebay.com>
Authored: Mon Mar 2 06:04:03 2015 +0000
Committer: liyang@apache.org <ya...@D-SHC-00801746.corp.ebay.com>
Committed: Mon Mar 2 06:04:03 2015 +0000

----------------------------------------------------------------------
 .../apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[11/26] incubator-kylin git commit: update visulization code in cubedesigner to show model partial done

Posted by li...@apache.org.
update visulization code in cubedesigner to show model partial done


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/504f6bf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/504f6bf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/504f6bf3

Branch: refs/heads/streaming
Commit: 504f6bf351a5f75cde4eefb70d1040e9056425da
Parents: 10b5029
Author: jiazhong <ji...@ebay.com>
Authored: Sat Feb 28 15:01:39 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Sat Feb 28 15:01:39 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/cube.js          |   0
 webapp/app/js/controllers/cubeSchema.js    |   8 +-
 webapp/app/js/services/tree.js             | 159 ++++++++++++------------
 webapp/app/partials/cubes/cube_detail.html |   2 +-
 webapp/bower.json                          |   4 +-
 5 files changed, 90 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/app/js/controllers/cube.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cube.js b/webapp/app/js/controllers/cube.js
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/app/js/controllers/cubeSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index 7bad2d8..17f7b71 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -18,7 +18,7 @@
 
 'use strict';
 
-KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel) {
+KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList) {
 
     $scope.projects = [];
     $scope.newDimension = null;
@@ -65,6 +65,12 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
 
                     $scope.metaModel.model = model;
 
+                    // add model ref for cube
+                    angular.forEach(CubeList.cubes,function(cube){
+                        if(cube.name===$scope.cubeMetaFrame.name||cube.descriptor===$scope.cubeMetaFrame.name){
+                          cube.model = model;
+                        }
+                    });
                     //convert GMT mills ,to make sure partition date show GMT Date
                     //should run only one time
                     if($scope.metaModel.model.partition_desc&&$scope.metaModel.model.partition_desc.partition_date_start)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/app/js/services/tree.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tree.js b/webapp/app/js/services/tree.js
old mode 100644
new mode 100755
index bee3ac6..0eaf5e7
--- a/webapp/app/js/services/tree.js
+++ b/webapp/app/js/services/tree.js
@@ -38,92 +38,93 @@ KylinApp.service('CubeGraphService', function () {
 
         var graphData = {
             "type": "fact",
-            "name": cube.detail.fact_table,
+            "name": cube.model.fact_table,
             "children": []
         };
 
-        angular.forEach(cube.detail.dimensions, function (dimension, index) {
-            if (dimension.join && dimension.join.primary_key.length > 0) {
-
-                var dimensionNode;
-
-                /* Loop through the graphData.children array to find out: If the LKP table is already existed */
-                for(var j = 0; j < graphData.children.length; j++ ) {
-                    if(graphData.children[j].name == dimension.table){
-                        dimensionNode = graphData.children[j];
-                        break;
-                    }
-                }
-
-                /* If not existed, create dimensionNode and push it */
-                if(j == graphData.children.length) {
-                    dimensionNode = {
-                        "type": "dimension",
-                        "name": dimension.table,
-                        "join": dimension.join,
-                        "children": [],
-                        "_children": []
-                    };
-                }
-
-                if (dimension.join && dimension.join.primary_key)
-                {
-                    angular.forEach(dimension.join.primary_key, function(pk, index){
-                        for (var i = 0; i < dimensionNode._children.length; i++) {
-                            if(dimensionNode._children[i].name == pk)
-                                break;
-                        }
-                        if(i == dimensionNode._children.length) {
-                            dimensionNode._children.push({
-                                "type": "column",
-                                "name": pk
-                            });
-                        }
-
-                    });
-                }
-
-                if (dimension.derived)
-                {
-                    angular.forEach(dimension.derived, function(derived, index){
-                        for (var i = 0; i < dimensionNode._children.length; i++) {
-                            if(dimensionNode._children[i].name == derived)
-                                break;
-                        }
-                        if(i == dimensionNode._children.length) {
-                            dimensionNode._children.push({
-                                "type": "column",
-                                "name": derived + "(DERIVED)"
-                            });
-                        }
-                    });
-                }
+        cube.graph = (!!cube.graph) ? cube.graph : {};
 
-                if (dimension.hierarchy)
-                {
-                    angular.forEach(dimension.hierarchy, function(hierarchy, index){
-                        for (var i = 0; i < dimensionNode._children.length; i++) {
-                            if(dimensionNode._children[i].name == hierarchy)
-                                break;
-                        }
-                        if(i == dimensionNode._children.length) {
-                            dimensionNode._children.push({
-                                "type": "column",
-                                "name": hierarchy.column + "(HIERARCHY)"
-                            });
-                        }
-                    });
-                }
+      //angular.forEach(cube.detail.dimensions, function (dimension, index) {
+      angular.forEach(cube.model.lookups, function (lookup, index) {
+        if (lookup.join && lookup.join.primary_key.length > 0) {
 
-                if(j == graphData.children.length) {
-                    graphData.children.push(dimensionNode);
-                }
+          var dimensionNode;
 
+          /* Loop through the graphData.children array to find out: If the LKP table is already existed */
+          for(var j = 0; j < graphData.children.length; j++ ) {
+            if(graphData.children[j].name == lookup.table){
+              dimensionNode = graphData.children[j];
+              break;
             }
-        });
-
-        cube.graph = (!!cube.graph) ? cube.graph : {};
-        cube.graph.columnsCount = 0;
+          }
+
+          /* If not existed, create dimensionNode and push it */
+          if(j == graphData.children.length) {
+            dimensionNode = {
+              "type": "dimension",
+              "name": lookup.table,
+              "join": lookup.join,
+              "children": [],
+              "_children": []
+            };
+          }
+
+          //if (dimension.join && dimension.join.primary_key)
+          //{
+          //    angular.forEach(dimension.join.primary_key, function(pk, index){
+          //        for (var i = 0; i < dimensionNode._children.length; i++) {
+          //            if(dimensionNode._children[i].name == pk)
+          //                break;
+          //        }
+          //        if(i == dimensionNode._children.length) {
+          //            dimensionNode._children.push({
+          //                "type": "column",
+          //                "name": pk
+          //            });
+          //        }
+          //
+          //    });
+          //}
+          //
+          //if (dimension.derived)
+          //{
+          //    angular.forEach(dimension.derived, function(derived, index){
+          //        for (var i = 0; i < dimensionNode._children.length; i++) {
+          //            if(dimensionNode._children[i].name == derived)
+          //                break;
+          //        }
+          //        if(i == dimensionNode._children.length) {
+          //            dimensionNode._children.push({
+          //                "type": "column",
+          //                "name": derived + "(DERIVED)"
+          //            });
+          //        }
+          //    });
+          //}
+          //
+          //if (dimension.hierarchy)
+          //{
+          //    angular.forEach(dimension.hierarchy, function(hierarchy, index){
+          //        for (var i = 0; i < dimensionNode._children.length; i++) {
+          //            if(dimensionNode._children[i].name == hierarchy)
+          //                break;
+          //        }
+          //        if(i == dimensionNode._children.length) {
+          //            dimensionNode._children.push({
+          //                "type": "column",
+          //                "name": hierarchy.column + "(HIERARCHY)"
+          //            });
+          //        }
+          //    });
+          //}
+
+          if(j == graphData.children.length) {
+            graphData.children.push(dimensionNode);
+          }
+
+        }
+      });
+      cube.graph.columnsCount = 0;
         cube.graph.tree = tree;
         cube.graph.root = graphData;
         cube.graph.svg = svg;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/app/partials/cubes/cube_detail.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubes/cube_detail.html b/webapp/app/partials/cubes/cube_detail.html
old mode 100644
new mode 100755
index 39a2860..6668b8d
--- a/webapp/app/partials/cubes/cube_detail.html
+++ b/webapp/app/partials/cubes/cube_detail.html
@@ -110,4 +110,4 @@
             </div>
         </div>
     </div>
-</div>
\ No newline at end of file
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/bower.json
----------------------------------------------------------------------
diff --git a/webapp/bower.json b/webapp/bower.json
old mode 100644
new mode 100755
index 31ea559..be8eca3
--- a/webapp/bower.json
+++ b/webapp/bower.json
@@ -26,8 +26,8 @@
     "angular-ui-sortable": "0.13.1",
     "underscore": "~1.7.0",
     "fuelux": "~3.5.1",
-     "angular-animate": "1.2",
-     "angular-cookies":"1.2"
+    "angular-animate": "1.2",
+    "angular-cookies":"1.2"
 
   },
   "devDependencies": {


[22/26] incubator-kylin git commit: revert joda dependency, and revise hbasestreaminginput

Posted by li...@apache.org.
revert joda dependency, and revise hbasestreaminginput


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1d6505b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1d6505b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1d6505b3

Branch: refs/heads/streaming
Commit: 1d6505b3489dfdf8bd925dcf3ecfde9d8db286f6
Parents: f980b9f
Author: honma <ho...@ebay.com>
Authored: Mon Mar 2 13:34:42 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Mar 3 15:39:44 2015 +0800

----------------------------------------------------------------------
 common/pom.xml                                  | 11 ++----
 .../org/apache/kylin/common/util/BasicTest.java | 35 --------------------
 job/pom.xml                                     |  5 ---
 .../kylin/job/tools/HbaseStreamingInput.java    |  6 +++-
 pom.xml                                         |  6 ----
 5 files changed, 8 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 41d2291..97fb50a 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -17,8 +17,7 @@
  limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>kylin-common</artifactId>
@@ -64,10 +63,6 @@
             <artifactId>compress-lzf</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>joda-time</groupId>
-            <artifactId>joda-time</artifactId>
-        </dependency>
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -132,11 +127,11 @@
             <artifactId>commons-compress</artifactId>
             <version>1.2</version>
         </dependency>
-        <dependency>
+         <dependency>
             <groupId>org.apache.hive.hcatalog</groupId>
             <artifactId>hive-hcatalog-core</artifactId>
             <version>${hive-hcatalog.version}</version>
             <scope>provided</scope>
-        </dependency>
+          </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 6358177..4a6edc8 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -26,10 +26,8 @@ import java.util.Calendar;
 import java.util.concurrent.Semaphore;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.joda.time.DateTime;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -72,39 +70,6 @@ public class BasicTest {
     @Ignore("fix it later")
     public void test2() throws IOException, ConfigurationException {
 
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-
-                semaphore.release();
-                semaphore.release();
-                semaphore.release();
-                try {
-                    System.out.println("sleeping");
-                    Thread.sleep(10000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                semaphore.release();
-            }
-        }).start();
-
-        try {
-            try {
-                System.out.println("sleeping");
-                Thread.sleep(5000);
-                int x = semaphore.drainPermits();
-                System.out.println("drained " +x);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-            semaphore.acquire();
-            System.out.println("left " + semaphore.availablePermits());
-
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
     }
 
     private static String time(long t) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index 128e03c..c8fbc73 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -136,11 +136,6 @@
                 hbase utils like Bytes, ImmutableBytesWritable etc. -->
         </dependency>
 
-        <dependency>
-            <groupId>joda-time</groupId>
-            <artifactId>joda-time</artifactId>
-        </dependency>
-
         <!-- Env & Test -->
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
index b1c7e30..ff313b4 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
@@ -65,6 +65,9 @@ public class HbaseStreamingInput {
     }
 
     public static void addData(String tableName) throws IOException {
+
+        createTable(tableName);
+
         final Semaphore semaphore = new Semaphore(0);
         new Thread(new Runnable() {
             @Override
@@ -156,8 +159,9 @@ public class HbaseStreamingInput {
                 for (Result result : scanner) {
                     Cell cell = result.getColumnLatestCell(CF, QN);
                     byte[] value = cell.getValueArray();
-                    if (cell.getValueLength() != CELL_SIZE)
+                    if (cell.getValueLength() != CELL_SIZE) {
                         logger.error("value size invalid!!!!!");
+                    }
 
                     hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()));
                     rowCount++;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d86bcb2..74743db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,6 @@
         <compress-lzf.version>1.0.3</compress-lzf.version>
         <extendedset.version>1.3.4</extendedset.version>
         <jetty.version>9.2.7.v20150116</jetty.version>
-        <joda.version>2.7</joda.version>
 
         <!-- REST Service -->
         <spring.framework.version>3.1.2.RELEASE</spring.framework.version>
@@ -400,11 +399,6 @@
                 <artifactId>curator-recipes</artifactId>
                 <version>${curator.version}</version>
             </dependency>
-            <dependency>
-                <groupId>joda-time</groupId>
-                <artifactId>joda-time</artifactId>
-                <version>${joda.version}</version>
-            </dependency>
 
         </dependencies>
     </dependencyManagement>


[14/26] incubator-kylin git commit: Merge pull request #432 from janzhongi/inverted-index

Posted by li...@apache.org.
Merge pull request #432 from janzhongi/inverted-index

refacotr source_table_tree & enable visulization in cubeWizard

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/82c9db24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/82c9db24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/82c9db24

Branch: refs/heads/streaming
Commit: 82c9db24bca8451a37a47cce81924df4e984d345
Parents: 10b5029 d804e60
Author: Zhong,Jian <ji...@ebay.com>
Authored: Sat Feb 28 17:42:16 2015 +0800
Committer: Zhong,Jian <ji...@ebay.com>
Committed: Sat Feb 28 17:42:16 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/cube.js               |   0
 webapp/app/js/controllers/cubeSchema.js         |   8 +-
 webapp/app/js/controllers/sourceMeta.js         |  37 ++---
 webapp/app/js/model/tableModel.js               |  92 ++++++-----
 webapp/app/js/services/tables.js                |   0
 webapp/app/js/services/tree.js                  | 161 ++++++++++---------
 webapp/app/partials/cubes/cube_detail.html      |  12 +-
 webapp/app/partials/tables/source_metadata.html |  44 ++---
 .../app/partials/tables/source_table_tree.html  |   6 +-
 webapp/bower.json                               |   4 +-
 10 files changed, 191 insertions(+), 173 deletions(-)
----------------------------------------------------------------------



[23/26] incubator-kylin git commit: testing streaming on hbase

Posted by li...@apache.org.
testing streaming on hbase


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f980b9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f980b9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f980b9f6

Branch: refs/heads/streaming
Commit: f980b9f67d0804964cd67685a8d5cae864209b30
Parents: f42ef83
Author: honma <ho...@ebay.com>
Authored: Mon Mar 2 09:51:04 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Mar 3 15:39:44 2015 +0800

----------------------------------------------------------------------
 common/pom.xml                                  |  11 +-
 .../org/apache/kylin/common/util/BasicTest.java |  52 ++++-
 job/pom.xml                                     |   5 +
 .../job/deployment/HbaseConfigPrinterCLI.java   |  34 ++-
 .../kylin/job/tools/HbaseStreamingInput.java    | 220 +++++++++++++++++++
 pom.xml                                         |   6 +
 6 files changed, 316 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 97fb50a..41d2291 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -17,7 +17,8 @@
  limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>kylin-common</artifactId>
@@ -63,6 +64,10 @@
             <artifactId>compress-lzf</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+        </dependency>
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -127,11 +132,11 @@
             <artifactId>commons-compress</artifactId>
             <version>1.2</version>
         </dependency>
-         <dependency>
+        <dependency>
             <groupId>org.apache.hive.hcatalog</groupId>
             <artifactId>hive-hcatalog-core</artifactId>
             <version>${hive-hcatalog.version}</version>
             <scope>provided</scope>
-          </dependency>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 1fe9dfd..6358177 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -18,13 +18,18 @@
 
 package org.apache.kylin.common.util;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.concurrent.Semaphore;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.joda.time.DateTime;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -61,10 +66,51 @@ public class BasicTest {
     public void test1() throws Exception {
     }
 
+    final private Semaphore semaphore = new Semaphore(0);
+
     @Test
     @Ignore("fix it later")
     public void test2() throws IOException, ConfigurationException {
-        int m = 1 << 15;
-        System.out.println(m);
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+
+                semaphore.release();
+                semaphore.release();
+                semaphore.release();
+                try {
+                    System.out.println("sleeping");
+                    Thread.sleep(10000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                semaphore.release();
+            }
+        }).start();
+
+        try {
+            try {
+                System.out.println("sleeping");
+                Thread.sleep(5000);
+                int x = semaphore.drainPermits();
+                System.out.println("drained " +x);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            semaphore.acquire();
+            System.out.println("left " + semaphore.availablePermits());
+
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    private static String time(long t) {
+        DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(t);
+        return dateFormat.format(cal.getTime());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index c8fbc73..128e03c 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -136,6 +136,11 @@
                 hbase utils like Bytes, ImmutableBytesWritable etc. -->
         </dependency>
 
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
index 7a73790..15ba2c4 100644
--- a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
@@ -40,12 +40,18 @@ import org.apache.kylin.job.tools.LZOSupportnessChecker;
  */
 public class HbaseConfigPrinterCLI {
     public static void main(String[] args) throws IOException {
-        if (args.length != 1) {
-            System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-0.5.7-SNAPSHOT-job.jar org.apache.kylin.job.deployment.HadoopConfigPrinter targetFile");
-            System.exit(1);
-        }
 
-        printConfigs(args[0]);
+        if (args[0].equalsIgnoreCase("printconfig"))
+            printConfigs(args[1]);
+
+        if (args[0].equalsIgnoreCase("printenv"))
+            printAllEnv();
+
+        if (args[0].equalsIgnoreCase("printprop"))
+            printAllProperties();
+
+        if (args[0].equalsIgnoreCase("printhbaseconf"))
+            printHbaseConf();
     }
 
     private static void printConfigs(String targetFile) throws IOException {
@@ -68,7 +74,23 @@ public class HbaseConfigPrinterCLI {
         FileUtils.writeStringToFile(output, sb.toString());
     }
 
-    @SuppressWarnings("unused")
+    private static void printHbaseConf() {
+        Configuration conf = HBaseConfiguration.create();
+        for (Map.Entry<String, String> entry : conf) {
+            System.out.println("Key: " + entry.getKey());
+            System.out.println("Value: " + entry.getValue());
+            System.out.println();
+        }
+    }
+
+    private static void printAllProperties() {
+        for (Map.Entry<Object, Object> entry : System.getProperties().entrySet()) {
+            System.out.println("Key: " + entry.getKey());
+            System.out.println("Value: " + entry.getValue());
+            System.out.println();
+        }
+    }
+
     private static void printAllEnv() {
         for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
             System.out.println("Key: " + entry.getKey());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
new file mode 100644
index 0000000..b1c7e30
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
@@ -0,0 +1,220 @@
+package org.apache.kylin.job.tools;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 2/13/15.
+ */
+public class HbaseStreamingInput {
+    private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class);
+
+    private static final int CELL_SIZE = 128 * 1024; // 128 KB
+    private static final byte[] CF = "F".getBytes();
+    private static final byte[] QN = "C".getBytes();
+
+    public static void createTable(String tableName) throws IOException {
+        HBaseAdmin hadmin = new HBaseAdmin(getConnection());
+
+        try {
+            boolean tableExist = hadmin.tableExists(tableName);
+            if (tableExist) {
+                logger.info("HTable '" + tableName + "' already exists");
+                return;
+            }
+
+            logger.info("Creating HTable '" + tableName + "'");
+            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+            desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split
+
+            HColumnDescriptor fd = new HColumnDescriptor(CF);
+            fd.setBlocksize(CELL_SIZE);
+            desc.addFamily(fd);
+            hadmin.createTable(desc);
+
+            logger.info("HTable '" + tableName + "' created");
+        } finally {
+            hadmin.close();
+        }
+    }
+
+    private static void scheduleJob(Semaphore semaphore, int interval) {
+        while (true) {
+            semaphore.release();
+            try {
+                Thread.sleep(interval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public static void addData(String tableName) throws IOException {
+        final Semaphore semaphore = new Semaphore(0);
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                scheduleJob(semaphore, 300000);//5 minutes a batch
+            }
+        }).start();
+
+        while (true) {
+            try {
+                semaphore.acquire();
+                int waiting = semaphore.availablePermits();
+                if (waiting > 0) {
+                    logger.warn("There are another " + waiting + " batches waiting to be added");
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            HConnection conn = getConnection();
+            HTableInterface table = conn.getTable(tableName);
+
+            byte[] key = new byte[8 + 4];//time + id
+
+            logger.info("============================================");
+            long startTime = System.currentTimeMillis();
+            logger.info("data load start time in millis: " + startTime);
+            logger.info("data load start at " + formatTime(startTime));
+            List<Put> buffer = Lists.newArrayList();
+            for (int i = 0; i < (1 << 10); ++i) {
+                long time = System.currentTimeMillis();
+                Bytes.putLong(key, 0, time);
+                Bytes.putInt(key, 8, i);
+                Put put = new Put(key);
+                byte[] cell = randomBytes(CELL_SIZE);
+                put.add(CF, QN, cell);
+                buffer.add(put);
+            }
+            table.put(buffer);
+            table.close();
+            conn.close();
+            long endTime = System.currentTimeMillis();
+            logger.info("data load end at " + formatTime(endTime));
+            logger.info("data load time consumed: " + (endTime - startTime));
+            logger.info("============================================");
+        }
+    }
+
+    public static void randomScan(String tableName) throws IOException {
+
+        final Semaphore semaphore = new Semaphore(0);
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                scheduleJob(semaphore, 60000);//1 minutes a batch
+            }
+        }).start();
+
+        while (true) {
+            try {
+                semaphore.acquire();
+                int waiting = semaphore.drainPermits();
+                if (waiting > 0) {
+                    logger.warn("Too many queries to handle! Blocking " + waiting + " sets of scan requests");
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            Random r = new Random();
+            HConnection conn = getConnection();
+            HTableInterface table = conn.getTable(tableName);
+
+            long leftBound = getFirstKeyTime(table);
+            long rightBound = System.currentTimeMillis();
+
+            for (int t = 0; t < 10; ++t) {
+                long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound));
+                long end = start + 600000;//a period of 10 minutes
+                logger.info("A scan from " + formatTime(start) + " to " + formatTime(end));
+
+                Scan scan = new Scan();
+                scan.setStartRow(Bytes.toBytes(start));
+                scan.setStopRow(Bytes.toBytes(end));
+                scan.addFamily(CF);
+                ResultScanner scanner = table.getScanner(scan);
+                long hash = 0;
+                int rowCount = 0;
+                for (Result result : scanner) {
+                    Cell cell = result.getColumnLatestCell(CF, QN);
+                    byte[] value = cell.getValueArray();
+                    if (cell.getValueLength() != CELL_SIZE)
+                        logger.error("value size invalid!!!!!");
+
+                    hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()));
+                    rowCount++;
+                }
+                scanner.close();
+                logger.info("Scanned " + rowCount + " rows, the (meaningless) hash for the scan is " + hash);
+            }
+            table.close();
+            conn.close();
+        }
+    }
+
+    private static long getFirstKeyTime(HTableInterface table) throws IOException {
+        long startTime = 0;
+
+        Scan scan = new Scan();
+        scan.addFamily(CF);
+        ResultScanner scanner = table.getScanner(scan);
+        for (Result result : scanner) {
+            Cell cell = result.getColumnLatestCell(CF, QN);
+            byte[] key = cell.getRowArray();
+            startTime = Bytes.toLong(key, cell.getRowOffset(), 8);
+            logger.info("Retrieved first record time: " + formatTime(startTime));
+            break;//only get first one
+        }
+        scanner.close();
+        return startTime;
+
+    }
+
+    private static HConnection getConnection() throws IOException {
+        return HConnectionManager.createConnection(HBaseConfiguration.create());
+    }
+
+    private static String formatTime(long time) {
+        DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(time);
+        return dateFormat.format(cal.getTime());
+    }
+
+    private static byte[] randomBytes(int lenth) {
+        byte[] bytes = new byte[lenth];
+        Random rand = new Random();
+        rand.nextBytes(bytes);
+        return bytes;
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        if (args[0].equalsIgnoreCase("createtable")) {
+            createTable(args[1]);
+        } else if (args[0].equalsIgnoreCase("adddata")) {
+            addData(args[1]);
+        } else if (args[0].equalsIgnoreCase("randomscan")) {
+            randomScan(args[1]);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 74743db..d86bcb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@
         <compress-lzf.version>1.0.3</compress-lzf.version>
         <extendedset.version>1.3.4</extendedset.version>
         <jetty.version>9.2.7.v20150116</jetty.version>
+        <joda.version>2.7</joda.version>
 
         <!-- REST Service -->
         <spring.framework.version>3.1.2.RELEASE</spring.framework.version>
@@ -399,6 +400,11 @@
                 <artifactId>curator-recipes</artifactId>
                 <version>${curator.version}</version>
             </dependency>
+            <dependency>
+                <groupId>joda-time</groupId>
+                <artifactId>joda-time</artifactId>
+                <version>${joda.version}</version>
+            </dependency>
 
         </dependencies>
     </dependencyManagement>


[15/26] incubator-kylin git commit: Merge remote-tracking branch 'origin/inverted-index' into streaming

Posted by li...@apache.org.
Merge remote-tracking branch 'origin/inverted-index' into streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9c3a6067
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9c3a6067
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9c3a6067

Branch: refs/heads/streaming
Commit: 9c3a60673fa31b9c900a18850f292ff7401410da
Parents: 04d9b7d 82c9db2
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 10:00:24 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 10:00:24 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/cube.js               |   0
 webapp/app/js/controllers/cubeEdit.js           |  11 +-
 webapp/app/js/controllers/cubeSchema.js         |   8 +-
 webapp/app/js/controllers/cubes.js              |   4 +-
 webapp/app/js/controllers/sourceMeta.js         |  37 ++---
 webapp/app/js/model/tableModel.js               |  92 ++++++-----
 webapp/app/js/services/tables.js                |   0
 webapp/app/js/services/tree.js                  | 161 ++++++++++---------
 .../cubeDesigner/advanced_settings.html         |  34 ++--
 .../app/partials/cubeDesigner/incremental.html  |   2 +-
 webapp/app/partials/cubes/cube_detail.html      |  12 +-
 webapp/app/partials/tables/source_metadata.html |  44 ++---
 .../app/partials/tables/source_table_tree.html  |   6 +-
 webapp/bower.json                               |   4 +-
 14 files changed, 222 insertions(+), 193 deletions(-)
----------------------------------------------------------------------



[20/26] incubator-kylin git commit: Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming

Posted by li...@apache.org.
Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/172efa6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/172efa6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/172efa6a

Branch: refs/heads/streaming
Commit: 172efa6ad9d42c58ed866616c9b92b039a5a763f
Parents: 58a6f73 f42ef83
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 10:14:32 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 10:14:32 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[21/26] incubator-kylin git commit: add log

Posted by li...@apache.org.
add log


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1f2fb283
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1f2fb283
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1f2fb283

Branch: refs/heads/streaming
Commit: 1f2fb283e50b99b65766fc9a00c08494b9231d1e
Parents: 172efa6
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 11:09:58 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 11:09:58 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/common/persistence/HBaseConnection.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1f2fb283/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
index 145cada..27d9e4c 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
@@ -77,6 +77,7 @@ public class HBaseConnection {
                 ConnPool.put(url, connection);
             }
         } catch (Throwable t) {
+            logger.error("Error when open connection " + url, t);
             throw new StorageException("Error when open connection " + url, t);
         }
 


[04/26] incubator-kylin git commit: add right 'return type' when create measure in cube KYLIN-600

Posted by li...@apache.org.
add right 'return type' when create measure in cube KYLIN-600


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0a5830ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0a5830ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0a5830ef

Branch: refs/heads/streaming
Commit: 0a5830ef96dc6c4aced85c3b2c7bbb177a764d6e
Parents: 80a5065
Author: jiazhong <ji...@ebay.com>
Authored: Fri Feb 27 16:49:20 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Fri Feb 27 16:49:20 2015 +0800

----------------------------------------------------------------------
 .../kylin/rest/controller/CubeController.java   | 38 +++++++++++++++-----
 webapp/app/js/controllers/cubeEdit.js           |  7 ++--
 webapp/app/js/controllers/cubeSchema.js         | 26 ++++++++++----
 webapp/app/js/controllers/cubes.js              |  5 ++-
 webapp/app/js/filters/filter.js                 |  4 +--
 webapp/app/partials/cubeDesigner/measures.html  | 11 +++---
 6 files changed, 64 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index e624d45..be91436 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -18,10 +18,14 @@
 
 package org.apache.kylin.rest.controller;
 
-import com.codahale.metrics.annotation.Metered;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
@@ -55,11 +59,17 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.AccessDeniedException;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
 
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.*;
+import com.codahale.metrics.annotation.Metered;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
 
 /**
  * CubeController is defined as Restful API entrance for UI.
@@ -348,7 +358,17 @@ public class CubeController extends BasicController {
             return cubeRequest;
         }
         try {
-            metaManager.updateDataModelDesc(modelDesc);
+
+            DataModelDesc existingModel = metaManager.getDataModelDesc(modelDesc.getName());
+            if (existingModel == null) {
+                metaManager.createDataModelDesc(modelDesc);
+            } else {
+
+                //ignore overwriting conflict checking before splict MODEL & CUBE
+                modelDesc.setLastModified(existingModel.getLastModified());
+                metaManager.updateDataModelDesc(modelDesc);
+            }
+
         } catch (IOException e) {
             // TODO Auto-generated catch block
             logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/js/controllers/cubeEdit.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js
index 0e05077..b632af1 100755
--- a/webapp/app/js/controllers/cubeEdit.js
+++ b/webapp/app/js/controllers/cubeEdit.js
@@ -38,11 +38,14 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
 
     $scope.getColumnType = function (_column,table){
         var columns = $scope.getColumnsByTable(table);
+        var type;
         angular.forEach(columns,function(column){
-            if(_column===column.name){
-                return column.type;
+            if(_column === column.name){
+                type = column.datatype;
+                return;
             }
         });
+        return type;
     };
 
     var ColFamily = function () {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/js/controllers/cubeSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index 4bb5762..7bad2d8 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -107,17 +107,31 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
         $scope.newMeasure = null;
     };
 
-    // !count !count distinct
-    $scope.measureParamValueUpdate = function(){
-        if(newMeasure.function.expression!=="COUNT"&&newMeasure.function.expression!=="COUNT_DISTINCT"){
+    //map right return type for param
+    $scope.measureReturnTypeUpdate = function(){
+        if($scope.newMeasure.function.expression!=="COUNT_DISTINCT"){
 
             var column = $scope.newMeasure.function.parameter.value;
+            var colType = $scope.getColumnType(column, $scope.metaModel.model.fact_table); // $scope.getColumnType defined in cubeEdit.js
 
 
-            switch(newMeasure.function.expression){
+            switch($scope.newMeasure.function.expression){
                 case "SUM":
-                    var colType = $scope.getColumnType(column, $scope.metaModel.model.fact_table);
-                    $log.log(colType);
+                    if(colType==="smallint"||colType==="int"||colType==="bigint"){
+                        $scope.newMeasure.function.returntype= 'bigint';
+                    }else{
+                        $scope.newMeasure.function.returntype= 'decimal';
+                    }
+                    break;
+                case "MIN":
+                case "MAX":
+                    $scope.newMeasure.function.returntype = colType;
+                    break;
+                case "COUNT":
+                    $scope.newMeasure.function.returntype = "bigint";
+                    break;
+                default:
+                    $scope.newMeasure.function.returntype = "";
                     break;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/js/controllers/cubes.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubes.js b/webapp/app/js/controllers/cubes.js
index d576955..927fda1 100755
--- a/webapp/app/js/controllers/cubes.js
+++ b/webapp/app/js/controllers/cubes.js
@@ -77,7 +77,9 @@ KylinApp
 
         $scope.loadDetail = function (cube) {
             var defer = $q.defer();
-            if (!cube.detail) {
+            if(cube.detail){
+                defer.resolve(cube.detail);
+            } else {
                 CubeDescService.get({cube_name: cube.name}, {}, function (detail) {
                     if (detail.length > 0&&detail[0].hasOwnProperty("name")) {
                         cube.detail = detail[0];
@@ -95,6 +97,7 @@ KylinApp
                     }
                 });
             }
+
             return defer.promise;
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/js/filters/filter.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/filters/filter.js b/webapp/app/js/filters/filter.js
old mode 100644
new mode 100755
index d0d49fd..f3364c7
--- a/webapp/app/js/filters/filter.js
+++ b/webapp/app/js/filters/filter.js
@@ -108,7 +108,8 @@ KylinApp
         //convert GMT+0 time to specified Timezone
         return function(item,timezone,format){
 
-            if(angular.isUndefined(item)){
+            // undefined and 0 is not necessary to show
+            if(angular.isUndefined(item)||item===0){
                 return "";
             }
 
@@ -129,7 +130,6 @@ KylinApp
                     gmttimezone = timezone;
             }
 
-
             var localOffset = new Date().getTimezoneOffset();
             var convertedMillis = item;
             if(gmttimezone.indexOf("GMT+")!=-1){

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/partials/cubeDesigner/measures.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/measures.html b/webapp/app/partials/cubeDesigner/measures.html
index 9a932bd..a260e28 100755
--- a/webapp/app/partials/cubeDesigner/measures.html
+++ b/webapp/app/partials/cubeDesigner/measures.html
@@ -111,6 +111,7 @@
                             <div class="col-xs-12 col-sm-6">
                                 <select class="form-control"
                                     ng-init="newMeasure.function.expression = (!!newMeasure.function.expression)?newMeasure.function.expression:cubeConfig.dftSelections.measureExpression" chosen ng-model="newMeasure.function.expression" required
+                                    ng-change="measureReturnTypeUpdate();"
                                     ng-options="me as me for me in cubeConfig.measureExpressions">
                                     <option value=""></option>
                                 </select>
@@ -146,7 +147,7 @@
                                 <select class="form-control" chosen
                                     ng-if="newMeasure.function.parameter.type == 'column'"
                                     ng-model="newMeasure.function.parameter.value"
-                                    ng-change="measureParamValueUpdate();"
+                                    ng-change="measureReturnTypeUpdate();"
                                     ng-options="columns.name as columns.name for columns in getColumnsByTable(metaModel.model.fact_table)" >
                                     <option value="">-- Select a Fact Table Column --</option>
                                 </select>
@@ -166,12 +167,8 @@
                                     <option value=""></option>
                                 </select>
                                 <span class="font-color-default"
-                                    ng-if="newMeasure.function.expression == 'COUNT'"
-                                    ng-init="newMeasure.function.returntype= 'bigint' "><b>&nbsp;&nbsp;BIGINT</b>
-                                </span>
-                                <span class="font-color-default"
-                                      ng-if="newMeasure.function.expression != 'COUNT_DISTINCT' && newMeasure.function.expression != 'COUNT' "
-                                     ><b>&nbsp;&nbsp;{{newMeasure.function.returntype}}</b>
+                                      ng-if="newMeasure.function.expression != 'COUNT_DISTINCT'"
+                                     ><b>&nbsp;&nbsp;{{newMeasure.function.returntype | uppercase}}</b>
                                 </span>
                             </div>
                         </div>


[17/26] incubator-kylin git commit: raise max region to 500

Posted by li...@apache.org.
raise max region to 500


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8e011db0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8e011db0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8e011db0

Branch: refs/heads/streaming
Commit: 8e011db0042d924ecae765a53cf62a5d0d8c715c
Parents: 82c9db2
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Mar 2 14:03:28 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Mar 2 14:03:28 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e011db0/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
index a677381..6bdacfd 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -43,7 +43,7 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
     public static final int MEDIUM_CUT = 20; //  20 GB per region
     public static final int LARGE_CUT = 100; // 100 GB per region
     
-    public static final int MAX_REGION = 200;
+    public static final int MAX_REGION = 500;
 
     private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
 


[08/26] incubator-kylin git commit: Merge pull request #431 from janzhongi/inverted-index

Posted by li...@apache.org.
Merge pull request #431 from janzhongi/inverted-index

filter partiton column use only date type column

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/10b50297
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/10b50297
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/10b50297

Branch: refs/heads/streaming
Commit: 10b50297ea6038a6dc99554460a598c0cc4085f0
Parents: 07a02fa 8735d15
Author: Zhong,Jian <ji...@ebay.com>
Authored: Fri Feb 27 18:29:29 2015 +0800
Committer: Zhong,Jian <ji...@ebay.com>
Committed: Fri Feb 27 18:29:29 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/cubeEdit.js           | 11 +++++--
 webapp/app/js/controllers/cubes.js              |  4 +--
 .../cubeDesigner/advanced_settings.html         | 34 +++++++++++---------
 .../app/partials/cubeDesigner/incremental.html  |  2 +-
 4 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------



[13/26] incubator-kylin git commit: refactor source_table_tree & add 'model Json format info' to cube wizard

Posted by li...@apache.org.
refactor source_table_tree & add  'model Json format info' to cube wizard


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d804e602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d804e602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d804e602

Branch: refs/heads/streaming
Commit: d804e602ad64b41bfa7c4843ed5fb2874c8497c5
Parents: 4b65f02
Author: jiazhong <ji...@ebay.com>
Authored: Sat Feb 28 17:40:18 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Sat Feb 28 17:40:18 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/sourceMeta.js         | 37 +++-----
 webapp/app/js/model/tableModel.js               | 92 +++++++++++---------
 webapp/app/js/services/tables.js                |  0
 webapp/app/partials/cubes/cube_detail.html      | 10 ++-
 webapp/app/partials/tables/source_metadata.html | 44 +++++-----
 .../app/partials/tables/source_table_tree.html  |  6 +-
 6 files changed, 97 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
old mode 100644
new mode 100755
index e2f166c..6f14e83
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -21,14 +21,13 @@
 KylinApp
     .controller('SourceMetaCtrl', function ($scope,$cacheFactory, $q, $window, $routeParams, CubeService, $modal, TableService,$route,loadingRequest,SweetAlert,tableConfig,TableModel) {
         var $httpDefaultCache = $cacheFactory.get('$http');
-        $scope.selectedSrcDb = [];
-        $scope.selectedSrcTable = {};
+       $scope.tableModel = TableModel;
+        $scope.tableModel.selectedSrcDb = [];
+        $scope.tableModel.selectedSrcTable = {};
         $scope.window = 0.68 * $window.innerHeight;
         $scope.tableConfig = tableConfig;
 
-        $scope.hiveTbLoad={
-            status:"init"
-        }
+
        $scope.state = { filterAttr: 'id', filterReverse:false, reverseColumn: 'id',
             dimensionFilter: '', measureFilter: ''};
 
@@ -42,7 +41,7 @@ KylinApp
        };
 
         $scope.aceSrcTbLoaded = function (forceLoad) {
-            $scope.selectedSrcDb = [];
+            $scope.tableModel.selectedSrcDb = [];
             $scope.treeOptions = {
                 nodeChildren: "columns",
                 injectClasses: {
@@ -57,7 +56,7 @@ KylinApp
                 }
             };
 
-            $scope.selectedSrcTable = {};
+            $scope.tableModel.selectedSrcTable = {};
             var defer = $q.defer();
 
             $scope.loading = true;
@@ -93,9 +92,10 @@ KylinApp
                     obj.sort(innerSort);
                 }
 
+              $scope.tableModel.selectedSrcDb = [];
                 for (var key in  tableMap) {
                     var tables = tableMap[key];
-                    $scope.selectedSrcDb.push({
+                    $scope.tableModel.selectedSrcDb.push({
                         "name": key,
                         "columns": tables
                     });
@@ -112,25 +112,20 @@ KylinApp
             $scope.aceSrcTbLoaded();
 
         });
-        $scope.$watch('hiveTbLoad.status', function (newValue, oldValue) {
-            if(newValue=="success"){
-                $scope.aceSrcTbLoaded(true);
-            }
 
-        });
 
         $scope.showSelected = function (obj) {
             if (obj.uuid) {
-                $scope.selectedSrcTable = obj;
+                $scope.tableModel.selectedSrcTable = obj;
             }
             else if(obj.datatype) {
-                $scope.selectedSrcTable.selectedSrcColumn = obj;
+                $scope.tableModel.selectedSrcTable.selectedSrcColumn = obj;
             }
         };
 
         $scope.aceSrcTbChanged = function () {
-            $scope.selectedSrcDb = [];
-            $scope.selectedSrcTable = {};
+            $scope.tableModel.selectedSrcDb = [];
+            $scope.tableModel.selectedSrcTable = {};
             $scope.aceSrcTbLoaded(true);
         };
 
@@ -146,9 +141,6 @@ KylinApp
                     projectName:function(){
                       return  $scope.projectModel.selectedProject;
                     },
-                    hiveTbLoad:function(){
-                      return $scope.hiveTbLoad;
-                    },
                     scope: function () {
                         return $scope;
                     }
@@ -156,7 +148,7 @@ KylinApp
             });
         };
 
-        var ModalInstanceCtrl = function ($scope,$location, $modalInstance, tableNames, MessageService,projectName,hiveTbLoad) {
+        var ModalInstanceCtrl = function ($scope,$location, $modalInstance, tableNames, MessageService,projectName,scope) {
             $scope.tableNames = "";
             $scope.projectName = projectName;
             $scope.cancel = function () {
@@ -195,8 +187,8 @@ KylinApp
                         SweetAlert.swal('Partial loaded!','The following table(s) have been successfully synchronized: ' + loadTableInfo+"\n\n Failed to synchronize following table(s):"  + unloadedTableInfo, 'warning');
                     }
                     loadingRequest.hide();
+                    scope.aceSrcTbLoaded(true);
 
-                    hiveTbLoad.status="success";
                 },function(e){
                     if(e.data&& e.data.exception){
                         var message =e.data.exception;
@@ -206,7 +198,6 @@ KylinApp
                         SweetAlert.swal('Oops...', "Failed to take action.", 'error');
                     }
                     loadingRequest.hide();
-                    hiveTbLoad.status="init";
                 })
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/js/model/tableModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/tableModel.js b/webapp/app/js/model/tableModel.js
old mode 100644
new mode 100755
index 3f734a1..a3bc820
--- a/webapp/app/js/model/tableModel.js
+++ b/webapp/app/js/model/tableModel.js
@@ -1,43 +1,49 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-KylinApp.service('TableModel', function() {
-
-
-    this.selectProjectTables = [];
-
-    this.initTables = function(){
-        this.selectProjectTables = [];
-    }
-
-    this.addTable = function(table){
-        this.selectProjectTables.push(table);
-    }
-
-    this.setSelectedProjectTables = function(tables) {
-        this.selectProjectTables = tables;
-    }
-
-
-    this.selectedSrcDb = [];
-
-    this.selectedSrcTable = {};
-
-
-});
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+KylinApp.service('TableModel', function() {
+
+
+   //for tables in cubeDesigner
+    this.selectProjectTables = [];
+
+    this.initTables = function(){
+        this.selectProjectTables = [];
+    }
+
+    this.addTable = function(table){
+        this.selectProjectTables.push(table);
+    }
+
+    this.setSelectedProjectTables = function(tables) {
+        this.selectProjectTables = tables;
+    }
+
+
+  // for load table page
+    this.selectedSrcDb = [];
+    this.selectedSrcTable = {};
+
+    this.init = function(){
+      this.selectedSrcDb = [];
+      this.selectedSrcTable = {};
+    }
+
+
+});
+

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/js/services/tables.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/partials/cubes/cube_detail.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubes/cube_detail.html b/webapp/app/partials/cubes/cube_detail.html
index 6668b8d..59f2d69 100755
--- a/webapp/app/partials/cubes/cube_detail.html
+++ b/webapp/app/partials/cubes/cube_detail.html
@@ -29,8 +29,12 @@
         </li>
         <li class="{{cube.visiblePage=='json'? 'active':''}}"
             ng-if="userService.hasRole('ROLE_ADMIN') || hasPermission(cube, 16) && !newAccess">
-            <a href="" ng-click="cube.visiblePage='json';">JSON</a>
+            <a href="" ng-click="cube.visiblePage='json';">JSON(Cube)</a>
         </li>
+      <li class="{{cube.visiblePage=='json_model'? 'active':''}}"
+          ng-if="userService.hasRole('ROLE_ADMIN') || hasPermission(cube, 16) && !newAccess">
+        <a href="" ng-click="cube.visiblePage='json_model';">JSON(Model)</a>
+      </li>
         <li class="{{cube.visiblePage=='access'? 'active':''}}">
             <a href="" ng-click="cube.visiblePage='access';listAccess(cube, 'CubeInstance');">Access</a>
         </li>
@@ -64,6 +68,10 @@
              style="background-color: white;border: 0px">{{angular.toJson(cleanStatus(cube.detail), true)}}</pre>
     </div>
 
+    <div ng-show="cube.visiblePage=='json_model'" class="cube-detail">
+          <pre ng-if="!state.jsonEdit"
+               style="background-color: white;border: 0px">{{angular.toJson(cleanStatus(cube.model), true)}}</pre>
+    </div>
     <div ng-show="cube.visiblePage=='graph'" id="cube_graph_{{cube.name}}" class="cube-detail cube_graph">
     </div>
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/partials/tables/source_metadata.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/source_metadata.html b/webapp/app/partials/tables/source_metadata.html
old mode 100644
new mode 100755
index e77185a..62b4d0f
--- a/webapp/app/partials/tables/source_metadata.html
+++ b/webapp/app/partials/tables/source_metadata.html
@@ -45,7 +45,7 @@
     </div>
     <!--Tab-->
     <div class="col-xs-9">
-        <h3 class="text-info">Table Schema:{{ selectedSrcTable.name}}</h3>
+        <h3 class="text-info">Table Schema:{{ tableModel.selectedSrcTable.name}}</h3>
         <div class="tabbable nav-tabs-custom">
             <ul class="nav nav-tabs">
                 <li class="active">
@@ -58,52 +58,52 @@
             <div class="tab-content">
                 <!--Schema-->
                 <div id="schema" class="tab-pane">
-                    <div ng-if="selectedSrcTable.uuid" class="table-responsive">
+                    <div ng-if="tableModel.selectedSrcTable.uuid" class="table-responsive">
                         <table class="table">
                             <tbody>
                                 <tr>
                                     <th style="width:20%">NAME</th>
-                                    <td>{{ selectedSrcTable.name}}</td>
+                                    <td>{{ tableModel.selectedSrcTable.name}}</td>
                                 </tr>
                                 <tr>
                                     <th>Hive DATABASE</th>
-                                    <td>{{selectedSrcTable.database}}</td>
+                                    <td>{{tableModel.selectedSrcTable.database}}</td>
                                 </tr>
                                 <tr>
                                     <th>SNAPSHOT TIME</th>
-                                    <td>{{selectedSrcTable.exd.lastUpdateTime | utcToConfigTimeZone}}</td>
+                                    <td>{{tableModel.selectedSrcTable.exd.lastUpdateTime | utcToConfigTimeZone}}</td>
                                 </tr>
                                 <tr>
                                     <th>LOCATION</th>
-                                    <td>{{selectedSrcTable.exd.location}}</td>
+                                    <td>{{tableModel.selectedSrcTable.exd.location}}</td>
                                 </tr>
                                 <tr>
                                     <th>INPUT FORMAT</th>
-                                    <td>{{selectedSrcTable.exd.inputformat}}</td>
+                                    <td>{{tableModel.selectedSrcTable.exd.inputformat}}</td>
                                 </tr>
                                 <tr>
                                     <th>OUTPUT FORMAT</th>
-                                    <td>{{selectedSrcTable.exd.outputformat}}</td>
+                                    <td>{{tableModel.selectedSrcTable.exd.outputformat}}</td>
                                 </tr>
                                 <tr>
                                     <th>OWNER</th>
-                                    <td><a href="mailto:{{selectedSrcTable.exd.owner}}">{{selectedSrcTable.exd.owner}}</a></td>
+                                    <td><a href="mailto:{{tableModel.selectedSrcTable.exd.owner}}">{{tableModel.selectedSrcTable.exd.owner}}</a></td>
                                 </tr>
                                 <tr>
                                     <th>TOTAL FILE NUMBER</th>
-                                    <td>{{selectedSrcTable.exd.totalNumberFiles}}</td>
+                                    <td>{{tableModel.selectedSrcTable.exd.totalNumberFiles}}</td>
                                 </tr>
                                 <tr>
                                     <th>TOTAL FILE SIZE</th>
-                                    <td>{{selectedSrcTable.exd.totalFileSize}}</td>
+                                    <td>{{tableModel.selectedSrcTable.exd.totalFileSize}}</td>
                                 </tr>
                                 <tr>
                                     <th>PARTITIONED</th>
-                                    <td>{{selectedSrcTable.exd.partitioned}}</td>
+                                    <td>{{tableModel.selectedSrcTable.exd.partitioned}}</td>
                                 </tr>
                                 <tr>
                                     <th>PARTITION COLUMNS</th>
-                                    <td>{{selectedSrcTable.exd.partitionColumns}}</td>
+                                    <td>{{tableModel.selectedSrcTable.exd.partitionColumns}}</td>
                                 </tr>
                             </tbody>
                         </table>
@@ -120,7 +120,7 @@
                             </span>
                         </div>
                         <div class="space-6"></div>
-                        <div ng-if="(selectedSrcTable.columns | filter: columnName).length>0">
+                        <div ng-if="(tableModel.selectedSrcTable.columns | filter: columnName).length>0">
                             <table class="table table-hover table-striped list">
                                 <thead>
                                 <tr style="cursor: pointer">
@@ -137,26 +137,26 @@
                                 </tr>
                                 </thead>
 
-                                <tr ng-repeat="column in selectedSrcTable.columns | filter: columnName | orderObjectBy:state.filterAttr:state.filterReverse">
-                                    <td style="{{(selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+                                <tr ng-repeat="column in tableModel.selectedSrcTable.columns | filter: columnName | orderObjectBy:state.filterAttr:state.filterReverse">
+                                    <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
                                         {{ column.id}}
                                     </td>
-                                    <td style="{{(selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+                                    <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
                                         {{ column.name}}
                                     </td>
-                                    <td style="{{(selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+                                    <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
                                         {{ column.datatype}}
                                     </td>
-                                    <td style="{{(selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
-                                        <!--{{ selectedSrcTable.cardinality[column.name]}}-->
+                                    <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+                                        <!--{{ tableModel.selectedSrcTable.cardinality[column.name]}}-->
                                         {{column.cardinality}}
                                     </td>
                                 </tr>
                             </table>
                         </div>
-                        <div ng-if="(selectedSrcTable.columns | filter: columnName).length == 0" no-result
+                        <div ng-if="(tableModel.selectedSrcTable.columns | filter: columnName).length == 0" no-result
                              text="No Matched Table Column."></div>
-                        <div ng-if="!!!selectedSrcTable.uuid">
+                        <div ng-if="!!!tableModel.selectedSrcTable.uuid">
                             <div no-result text="No Table Selected."></div>
                         </div>
                     </div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/partials/tables/source_table_tree.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/source_table_tree.html b/webapp/app/partials/tables/source_table_tree.html
old mode 100644
new mode 100755
index 972e996..f2cd7ed
--- a/webapp/app/partials/tables/source_table_tree.html
+++ b/webapp/app/partials/tables/source_table_tree.html
@@ -34,12 +34,12 @@
     <div class="space-4"></div>
     <!--tree-->
     <div style="width:100%; height:{{window}}px; overflow:auto;">
-        <treecontrol ng-if="selectedSrcDb.length > 0" class="tree-light"
+        <treecontrol ng-if="tableModel.selectedSrcDb.length > 0" class="tree-light"
                      dirSelection="true"
-                     tree-model="selectedSrcDb"
+                     tree-model="tableModel.selectedSrcDb"
                      options="treeOptions"
                      on-selection="showSelected(node)"
-                     selected-node="selectedSrcTable">
+                     selected-node="tableModel.selectedSrcTable">
             {{node.name}} {{!!(node.datatype)?'(' + trimType(node.datatype) + ')' : ''}}
         </treecontrol>
     </div>


[25/26] incubator-kylin git commit: refactor

Posted by li...@apache.org.
refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0970d76f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0970d76f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0970d76f

Branch: refs/heads/streaming
Commit: 0970d76fd4e9d6b92e684cc64c5e97676f986132
Parents: 1f2fb28
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 16:06:58 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 16:06:58 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/streaming/KafkaConfig.java | 151 +++++++++++++++++++
 .../apache/kylin/streaming/KafkaConsumer.java   | 133 ++++++++++++++++
 .../org/apache/kylin/streaming/Requester.java   | 146 ++++++++++++++++++
 .../java/org/apache/kylin/streaming/Stream.java |  57 +++++++
 .../apache/kylin/streaming/StreamBuilder.java   |  95 ++++++++++++
 .../org/apache/kylin/streaming/TopicMeta.java   |  63 ++++++++
 .../apache/kylin/streaming/kafka/Consumer.java  | 120 ---------------
 .../kylin/streaming/kafka/KafkaConfig.java      | 151 -------------------
 .../apache/kylin/streaming/kafka/Requester.java | 146 ------------------
 .../apache/kylin/streaming/kafka/Stream.java    |  57 -------
 .../kylin/streaming/kafka/StreamBuilder.java    |  94 ------------
 .../apache/kylin/streaming/kafka/TopicMeta.java |  63 --------
 .../apache/kylin/streaming/KafkaBaseTest.java   |  78 ++++++++++
 .../apache/kylin/streaming/KafkaConfigTest.java |  65 ++++++++
 .../kylin/streaming/KafkaConsumerTest.java      |  98 ++++++++++++
 .../apache/kylin/streaming/RequesterTest.java   |  71 +++++++++
 .../apache/kylin/streaming/TestProducer.java    | 114 ++++++++++++++
 .../kylin/streaming/kafka/KafkaBaseTest.java    |  77 ----------
 .../kylin/streaming/kafka/KafkaConfigTest.java  |  64 --------
 .../streaming/kafka/KafkaConsumerTest.java      | 101 -------------
 .../kylin/streaming/kafka/RequesterTest.java    |  70 ---------
 .../kylin/streaming/kafka/TestProducer.java     | 115 --------------
 22 files changed, 1071 insertions(+), 1058 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
new file mode 100644
index 0000000..7d0cd6b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -0,0 +1,151 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfig {
+
+    private List<Broker> brokers;
+
+    private String zookeeper;
+
+    private String topic;
+
+    private int timeout;
+
+    private int maxReadCount;
+
+    private int bufferSize;
+
+    private int partitionId;
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getMaxReadCount() {
+        return maxReadCount;
+    }
+
+    public void setMaxReadCount(int maxReadCount) {
+        this.maxReadCount = maxReadCount;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public List<Broker> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(List<Broker> brokers) {
+        this.brokers = brokers;
+    }
+
+    public String getZookeeper() {
+        return zookeeper;
+    }
+
+    public void setZookeeper(String zookeeper) {
+        this.zookeeper = zookeeper;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public static KafkaConfig load(KafkaConfig config) {
+        KafkaConfig result = new KafkaConfig();
+        result.setBufferSize(config.getBufferSize());
+        result.setMaxReadCount(config.getMaxReadCount());
+        result.setTimeout(config.getTimeout());
+        result.setTopic(config.getTopic());
+        result.setZookeeper(config.getZookeeper());
+        result.setPartitionId(config.getPartitionId());
+        result.setBrokers(config.getBrokers());
+        return result;
+    }
+
+    public static KafkaConfig load(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        KafkaConfig result = new KafkaConfig();
+        result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
+        result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
+        result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
+        result.setTopic(properties.getProperty("topic"));
+        result.setZookeeper(properties.getProperty("zookeeper"));
+        result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
+
+        int id = 0;
+        List<Broker> brokers = Lists.newArrayList();
+        for (String str: properties.getProperty("brokers").split(",")) {
+            final String[] split = str.split(":");
+            final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
+            brokers.add(broker);
+        }
+        result.setBrokers(brokers);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
new file mode 100644
index 0000000..e45b6e4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -0,0 +1,133 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class KafkaConsumer implements Runnable {
+
+    private String topic;
+    private int partitionId;
+
+    private KafkaConfig kafkaConfig;
+    private List<Broker> replicaBrokers;
+    private AtomicLong offset = new AtomicLong();
+    private BlockingQueue<Stream> streamQueue;
+
+    private Logger logger;
+
+    public KafkaConsumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
+        this.topic = topic;
+        this.partitionId = partitionId;
+        this.kafkaConfig = kafkaConfig;
+        this.replicaBrokers = initialBrokers;
+        logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
+        streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
+    }
+
+    public BlockingQueue<Stream> getStreamQueue() {
+        return streamQueue;
+    }
+
+    private Broker getLeadBroker() {
+        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
+        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+            replicaBrokers = partitionMetadata.replicas();
+            return partitionMetadata.leader();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            Broker leadBroker = getLeadBroker();
+            if (leadBroker == null) {
+                logger.warn("cannot find lead broker");
+            } else {
+                final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+                offset.set(lastOffset);
+            }
+            while (true) {
+                if (leadBroker == null) {
+                    leadBroker = getLeadBroker();
+                }
+                if (leadBroker == null) {
+                    logger.warn("cannot find lead broker");
+                    continue;
+                }
+
+                final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
+                if (fetchResponse.errorCode(topic, partitionId) != 0) {
+                    logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+                    continue;
+                }
+                for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
+                    final ByteBuffer payload = messageAndOffset.message().payload();
+                    //TODO use ByteBuffer maybe
+                    byte[] bytes = new byte[payload.limit()];
+                    payload.get(bytes);
+                    logger.debug("get message offset:" + messageAndOffset.offset());
+                    try {
+                        streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
+                    } catch (InterruptedException e) {
+                        logger.error("error put streamQueue", e);
+                        break;
+                    }
+                    offset.incrementAndGet();
+                }
+            }
+        } catch (Exception e) {
+            logger.error("consumer has encountered an error", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
new file mode 100644
index 0000000..79332af
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
@@ -0,0 +1,146 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public final class Requester {
+
+    private static final Logger logger = LoggerFactory.getLogger(Requester.class);
+
+    public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
+        SimpleConsumer consumer;
+        for (Broker broker : kafkaConfig.getBrokers()) {
+            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
+            TopicMetadataRequest req = new TopicMetadataRequest(topics);
+            TopicMetadataResponse resp = consumer.send(req);
+            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+            if (topicMetadatas.size() != 1) {
+                break;
+            }
+            final TopicMetadata topicMetadata = topicMetadatas.get(0);
+            if (topicMetadata.errorCode() != 0) {
+                break;
+            }
+            List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
+                @Nullable
+                @Override
+                public Integer apply(PartitionMetadata partitionMetadata) {
+                    return partitionMetadata.partitionId();
+                }
+            });
+            return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+        }
+        logger.debug("cannot find topic:" + kafkaConfig.getTopic());
+        return null;
+    }
+
+    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
+        SimpleConsumer consumer;
+        for (Broker broker : brokers) {
+            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(topic);
+            TopicMetadataRequest req = new TopicMetadataRequest(topics);
+            TopicMetadataResponse resp = consumer.send(req);
+            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+            if (topicMetadatas.size() != 1) {
+                logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
+                break;
+            }
+            final TopicMetadata topicMetadata = topicMetadatas.get(0);
+            if (topicMetadata.errorCode() != 0) {
+                logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
+                break;
+            }
+            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
+                if (partitionMetadata.partitionId() == partitionId) {
+                    return partitionMetadata;
+                }
+            }
+        }
+        logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
+        return null;
+    }
+
+    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
+        final String clientName = "client_" + topic + "_" + partitionId;
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+        kafka.api.FetchRequest req = new FetchRequestBuilder()
+                .clientId(clientName)
+                .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+                .build();
+        return consumer.fetch(req);
+    }
+
+    public static long getLastOffset(String topic, int partitionId,
+                                     long whichTime, Broker broker, KafkaConfig kafkaConfig) {
+        String clientName = "client_" + topic + "_" + partitionId;
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+
+        if (response.hasError()) {
+            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+            return 0;
+        }
+        long[] offsets = response.offsets(topic, partitionId);
+        return offsets[0];
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
new file mode 100644
index 0000000..3cb43b6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class Stream {
+
+    private long timestamp;
+    private byte[] rawData;
+
+    public Stream(long timestamp, byte[] rawData) {
+        this.timestamp = timestamp;
+        this.rawData = rawData;
+    }
+
+    public byte[] getRawData() {
+        return rawData;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
new file mode 100644
index 0000000..005c255
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -0,0 +1,95 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by qianzhou on 2/17/15.
+ */
+public abstract class StreamBuilder implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+
+    private static final int BATCH_BUILD_BYTES_THRESHOLD = 64 * 1024;
+    private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 5 * 60 * 1000;
+
+    private BlockingQueue<Stream> streamQueue;
+    private long lastBuildTime = System.currentTimeMillis();
+    private int bytesTotal = 0;
+
+    public StreamBuilder(BlockingQueue<Stream> streamQueue) {
+        this.streamQueue = streamQueue;
+    }
+
+    protected abstract void build(List<Stream> streamsToBuild);
+
+    private void buildStream(List<Stream> streams) {
+        build(streams);
+        clearCounter();
+    }
+
+    private void clearCounter() {
+        lastBuildTime = System.currentTimeMillis();
+        bytesTotal = 0;
+    }
+
+    @Override
+    public void run() {
+        try {
+            List<Stream> streamToBuild = Lists.newArrayList();
+            clearCounter();
+            while (true) {
+                final Stream stream = streamQueue.take();
+                streamToBuild.add(stream);
+                bytesTotal += stream.getRawData().length;
+                if (bytesTotal >= BATCH_BUILD_BYTES_THRESHOLD) {
+                    buildStream(streamToBuild);
+                } else if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
+                    buildStream(streamToBuild);
+                } else {
+                    continue;
+                }
+            }
+        } catch (InterruptedException e) {
+            logger.error("StreamBuilder has been interrupted", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
new file mode 100644
index 0000000..b93d589
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
+ *
+ * Created by qianzhou on 2/15/15.
+ */
+public class TopicMeta {
+
+    private final String name;
+
+    private final List<Integer> partitionIds;
+
+    public TopicMeta(String name, List<Integer> partitionIds) {
+        this.name = name;
+        this.partitionIds = Collections.unmodifiableList(partitionIds);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public List<Integer> getPartitionIds() {
+        return partitionIds;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
deleted file mode 100644
index 074ef01..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class Consumer implements Runnable {
-
-    private String topic;
-    private int partitionId;
-
-    private KafkaConfig kafkaConfig;
-    private List<Broker> replicaBrokers;
-    private AtomicLong offset = new AtomicLong();
-    private BlockingQueue<Stream> streamQueue;
-
-    private Logger logger;
-
-    public Consumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
-        this.topic = topic;
-        this.partitionId = partitionId;
-        this.kafkaConfig = kafkaConfig;
-        this.replicaBrokers = initialBrokers;
-        logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
-        streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
-    }
-
-    public BlockingQueue<Stream> getStreamQueue() {
-        return streamQueue;
-    }
-
-    private Broker getLeadBroker() {
-        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
-        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
-            replicaBrokers = partitionMetadata.replicas();
-            return partitionMetadata.leader();
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public void run() {
-        while (true) {
-            final Broker leadBroker = getLeadBroker();
-            if (leadBroker == null) {
-                logger.warn("cannot find lead broker");
-                continue;
-            }
-            final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
-            offset.set(lastOffset);
-            final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
-            if (fetchResponse.errorCode(topic, partitionId) != 0) {
-                logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
-                continue;
-            }
-            for (MessageAndOffset messageAndOffset: fetchResponse.messageSet(topic, partitionId)) {
-                final ByteBuffer payload = messageAndOffset.message().payload();
-                //TODO use ByteBuffer maybe
-                byte[] bytes = new byte[payload.limit()];
-                payload.get(bytes);
-                logger.debug("get message offset:" + messageAndOffset.offset());
-                try {
-                    streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
-                } catch (InterruptedException e) {
-                    logger.error("error put streamQueue", e);
-                }
-                offset.incrementAndGet();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
deleted file mode 100644
index 82513eb..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 3/2/15.
- */
-public class KafkaConfig {
-
-    private List<Broker> brokers;
-
-    private String zookeeper;
-
-    private String topic;
-
-    private int timeout;
-
-    private int maxReadCount;
-
-    private int bufferSize;
-
-    private int partitionId;
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
-
-    public int getMaxReadCount() {
-        return maxReadCount;
-    }
-
-    public void setMaxReadCount(int maxReadCount) {
-        this.maxReadCount = maxReadCount;
-    }
-
-    public int getBufferSize() {
-        return bufferSize;
-    }
-
-    public void setBufferSize(int bufferSize) {
-        this.bufferSize = bufferSize;
-    }
-
-    public List<Broker> getBrokers() {
-        return brokers;
-    }
-
-    public void setBrokers(List<Broker> brokers) {
-        this.brokers = brokers;
-    }
-
-    public String getZookeeper() {
-        return zookeeper;
-    }
-
-    public void setZookeeper(String zookeeper) {
-        this.zookeeper = zookeeper;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public void setPartitionId(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
-    public static KafkaConfig load(KafkaConfig config) {
-        KafkaConfig result = new KafkaConfig();
-        result.setBufferSize(config.getBufferSize());
-        result.setMaxReadCount(config.getMaxReadCount());
-        result.setTimeout(config.getTimeout());
-        result.setTopic(config.getTopic());
-        result.setZookeeper(config.getZookeeper());
-        result.setPartitionId(config.getPartitionId());
-        result.setBrokers(config.getBrokers());
-        return result;
-    }
-
-    public static KafkaConfig load(Properties properties) {
-        Preconditions.checkNotNull(properties);
-        KafkaConfig result = new KafkaConfig();
-        result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
-        result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
-        result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
-        result.setTopic(properties.getProperty("topic"));
-        result.setZookeeper(properties.getProperty("zookeeper"));
-        result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
-
-        int id = 0;
-        List<Broker> brokers = Lists.newArrayList();
-        for (String str: properties.getProperty("brokers").split(",")) {
-            final String[] split = str.split(":");
-            final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
-            brokers.add(broker);
-        }
-        result.setBrokers(brokers);
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
deleted file mode 100644
index 8811695..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.*;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public final class Requester {
-
-    private static final Logger logger = LoggerFactory.getLogger(Requester.class);
-
-    public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
-        SimpleConsumer consumer;
-        for (Broker broker : kafkaConfig.getBrokers()) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp = consumer.send(req);
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                break;
-            }
-            List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
-                @Nullable
-                @Override
-                public Integer apply(PartitionMetadata partitionMetadata) {
-                    return partitionMetadata.partitionId();
-                }
-            });
-            return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
-        }
-        logger.debug("cannot find topic:" + kafkaConfig.getTopic());
-        return null;
-    }
-
-    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
-        SimpleConsumer consumer;
-        for (Broker broker : brokers) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(topic);
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp = consumer.send(req);
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
-                break;
-            }
-            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
-                if (partitionMetadata.partitionId() == partitionId) {
-                    return partitionMetadata;
-                }
-            }
-        }
-        logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
-        return null;
-    }
-
-    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
-        final String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
-        kafka.api.FetchRequest req = new FetchRequestBuilder()
-                .clientId(clientName)
-                .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
-                .build();
-        return consumer.fetch(req);
-    }
-
-    public static long getLastOffset(String topic, int partitionId,
-                                     long whichTime, Broker broker, KafkaConfig kafkaConfig) {
-        String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
-                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-        OffsetResponse response = consumer.getOffsetsBefore(request);
-
-        if (response.hasError()) {
-            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
-            return 0;
-        }
-        long[] offsets = response.offsets(topic, partitionId);
-        return offsets[0];
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
deleted file mode 100644
index 2a0ede4..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class Stream {
-
-    private long timestamp;
-    private byte[] rawData;
-
-    public Stream(long timestamp, byte[] rawData) {
-        this.timestamp = timestamp;
-        this.rawData = rawData;
-    }
-
-    public byte[] getRawData() {
-        return rawData;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
deleted file mode 100644
index 145b5c8..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Created by qianzhou on 2/17/15.
- */
-public abstract class StreamBuilder implements Runnable {
-
-    private List<BlockingQueue<Stream>> streamQueues;
-    private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
-    private final int batchBuildCount;
-
-    public StreamBuilder(List<BlockingQueue<Stream>> streamQueues, int batchBuildCount) {
-        this.streamQueues = streamQueues;
-        this.batchBuildCount = batchBuildCount;
-    }
-
-
-    private int getEarliestStreamIndex(Stream[] streamHead) {
-        long ts = Long.MAX_VALUE;
-        int idx = 0;
-        for (int i = 0; i < streamHead.length; i++) {
-            if (streamHead[i].getTimestamp() < ts) {
-                ts = streamHead[i].getTimestamp();
-                idx = i;
-            }
-        }
-        return idx;
-    }
-
-    protected abstract void build(List<Stream> streamsToBuild);
-
-    @Override
-    public void run() {
-        try {
-            Stream[] streamHead = new Stream[streamQueues.size()];
-            for (int i = 0; i < streamQueues.size(); i++) {
-                streamHead[i] = streamQueues.get(i).take();
-            }
-            List<Stream> streamToBuild = Lists.newArrayListWithCapacity(batchBuildCount);
-            while (true) {
-                if (streamToBuild.size() >= batchBuildCount) {
-                    build(streamToBuild);
-                    streamToBuild.clear();
-                }
-                int idx = getEarliestStreamIndex(streamHead);
-                streamToBuild.add(streamHead[idx]);
-                streamHead[idx] = streamQueues.get(idx).take();
-            }
-        } catch (InterruptedException e) {
-            logger.error("", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
deleted file mode 100644
index 7822797..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
- *
- * Created by qianzhou on 2/15/15.
- */
-public class TopicMeta {
-
-    private final String name;
-
-    private final List<Integer> partitionIds;
-
-    public TopicMeta(String name, List<Integer> partitionIds) {
-        this.name = name;
-        this.partitionIds = Collections.unmodifiableList(partitionIds);
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public List<Integer> getPartitionIds() {
-        return partitionIds;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
new file mode 100644
index 0000000..89277d2
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kylin.streaming.KafkaConfig;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public abstract class KafkaBaseTest {
+
+    protected static final Logger logger = LoggerFactory.getLogger("kafka test");
+
+    protected static ZkClient zkClient;
+
+    protected static KafkaConfig kafkaConfig;
+
+    @BeforeClass
+    public static void beforeClass() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        kafkaConfig = KafkaConfig.load(properties);
+
+        zkClient = new ZkClient(kafkaConfig.getZookeeper());
+    }
+
+
+    public static void createTopic(String topic, int partition, int replica) {
+        try {
+            AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
+        } catch (TopicExistsException e) {
+            logger.info(e.getMessage());
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
new file mode 100644
index 0000000..5612763
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.streaming.KafkaConfig;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfigTest {
+
+    @Test
+    public void test() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        KafkaConfig config = KafkaConfig.load(properties);
+        assertEquals(1000, config.getMaxReadCount());
+        assertEquals(65536, config.getBufferSize());
+        assertEquals(60000, config.getTimeout());
+        assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
+        assertEquals("kafka_stream_test", config.getTopic());
+        assertEquals(0, config.getPartitionId());
+        assertEquals(1, config.getBrokers().size());
+        assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
new file mode 100644
index 0000000..91e06fc
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class KafkaConsumerTest extends KafkaBaseTest {
+
+    private TestProducer producer;
+
+    private static final int TOTAL_SEND_COUNT = 100;
+
+    @Before
+    public void before() throws IOException {
+        producer = new TestProducer(TOTAL_SEND_COUNT);
+        producer.start();
+    }
+
+    @After
+    public void after() {
+        producer.stop();
+    }
+
+    private void waitForProducerToStop(TestProducer producer) {
+        while (!producer.isStopped()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void test() throws InterruptedException {
+        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
+        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+        List<BlockingQueue<Stream>> queues = Lists.newArrayList();
+        for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
+            KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+            queues.add(consumer.getStreamQueue());
+            executorService.execute(consumer);
+        }
+        waitForProducerToStop(producer);
+        int count = 0;
+        for (BlockingQueue<Stream> queue : queues) {
+            count += queue.size();
+        }
+        //since there will be historical data
+        assertTrue(count >= TOTAL_SEND_COUNT);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
new file mode 100644
index 0000000..d54f22c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.streaming.KafkaConfig;
+import org.apache.kylin.streaming.Requester;
+import org.apache.kylin.streaming.TopicMeta;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class RequesterTest extends KafkaBaseTest {
+
+    private static final String NON_EXISTED_TOPIC = "non_existent_topic";
+
+
+
+    @AfterClass
+    public static void afterClass() {
+    }
+
+    @Test
+    public void testTopicMeta() throws Exception {
+        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
+        assertNotNull(kafkaTopicMeta);
+        assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
+        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
+
+        KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
+        anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
+
+        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
+        assertTrue(kafkaTopicMeta == null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
new file mode 100644
index 0000000..368904c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import kafka.cluster.Broker;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TestProducer {
+
+    private volatile boolean stopped = false;
+
+    private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
+
+    private final int sendCount;
+
+    public TestProducer(int sendCount) {
+        this.sendCount = sendCount;
+    }
+
+    public void start() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
+
+        Properties props = new Properties();
+        props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+            @Nullable
+            @Override
+            public String apply(@Nullable Broker broker) {
+                return broker.getConnectionString();
+            }
+        }), ","));
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+        ProducerConfig config = new ProducerConfig(props);
+        final Producer<String, String> producer = new Producer<String, String>(config);
+
+        final Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int count = 0;
+                while (!stopped && count < sendCount) {
+                    final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
+                    producer.send(message);
+                    count++;
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                logger.debug("totally " + count +" messages have been sent");
+                stopped = true;
+
+            }
+        });
+        thread.setDaemon(false);
+        thread.start();
+    }
+
+    public boolean isStopped() {
+        return stopped;
+    }
+
+    public void stop() {
+        stopped = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
deleted file mode 100644
index 537a15d..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.TopicExistsException;
-import org.I0Itec.zkclient.ZkClient;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public abstract class KafkaBaseTest {
-
-    protected static final Logger logger = LoggerFactory.getLogger("kafka test");
-
-    protected static ZkClient zkClient;
-
-    protected static KafkaConfig kafkaConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws IOException {
-        final Properties properties = new Properties();
-        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        kafkaConfig = KafkaConfig.load(properties);
-
-        zkClient = new ZkClient(kafkaConfig.getZookeeper());
-    }
-
-
-    public static void createTopic(String topic, int partition, int replica) {
-        try {
-            AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
-        } catch (TopicExistsException e) {
-            logger.info(e.getMessage());
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
deleted file mode 100644
index 3c9bd87..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Created by qianzhou on 3/2/15.
- */
-public class KafkaConfigTest {
-
-    @Test
-    public void test() throws IOException {
-        final Properties properties = new Properties();
-        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        KafkaConfig config = KafkaConfig.load(properties);
-        assertEquals(1000, config.getMaxReadCount());
-        assertEquals(65536, config.getBufferSize());
-        assertEquals(60000, config.getTimeout());
-        assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
-        assertEquals("kafka_stream_test", config.getTopic());
-        assertEquals(0, config.getPartitionId());
-        assertEquals(1, config.getBrokers().size());
-        assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
deleted file mode 100644
index 4449d37..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import kafka.consumer.ConsumerConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class KafkaConsumerTest extends KafkaBaseTest {
-
-    private TestProducer producer;
-
-    private static final int TOTAL_SEND_COUNT = 100;
-
-    @Before
-    public void before() throws IOException {
-        producer = new TestProducer(TOTAL_SEND_COUNT);
-        producer.start();
-    }
-
-    @After
-    public void after() {
-        producer.stop();
-    }
-
-    private void waitForProducerToStop(TestProducer producer) {
-        while (!producer.isStopped()) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    @Test
-    public void test() throws InterruptedException {
-        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
-        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
-        List<BlockingQueue<Stream>> queues = Lists.newArrayList();
-        for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
-            Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
-            queues.add(consumer.getStreamQueue());
-            executorService.execute(consumer);
-        }
-        waitForProducerToStop(producer);
-        int count = 0;
-        for (BlockingQueue<Stream> queue : queues) {
-            count += queue.size();
-        }
-        //since there will be historical data
-        assertTrue(count >= TOTAL_SEND_COUNT);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
deleted file mode 100644
index 694c1fd..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.*;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class RequesterTest extends KafkaBaseTest {
-
-    private static final String NON_EXISTED_TOPIC = "non_existent_topic";
-
-
-
-    @AfterClass
-    public static void afterClass() {
-    }
-
-    @Test
-    public void testTopicMeta() throws Exception {
-        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
-        assertNotNull(kafkaTopicMeta);
-        assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
-        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
-
-        KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
-        anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
-
-        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
-        assertTrue(kafkaTopicMeta == null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
deleted file mode 100644
index cd3b166..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TestProducer {
-
-    private volatile boolean stopped = false;
-
-    private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
-
-    private final int sendCount;
-
-    public TestProducer(int sendCount) {
-        this.sendCount = sendCount;
-    }
-
-    public void start() throws IOException {
-        final Properties properties = new Properties();
-        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
-
-        Properties props = new Properties();
-        props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
-            @Nullable
-            @Override
-            public String apply(@Nullable Broker broker) {
-                return broker.getConnectionString();
-            }
-        }), ","));
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("request.required.acks", "1");
-        ProducerConfig config = new ProducerConfig(props);
-        final Producer<String, String> producer = new Producer<String, String>(config);
-
-        final Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                int count = 0;
-                while (!stopped && count < sendCount) {
-                    final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
-                    producer.send(message);
-                    count++;
-                    try {
-                        Thread.sleep(100);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-                logger.debug("totally " + count +" messages have been sent");
-                stopped = true;
-
-            }
-        });
-        thread.setDaemon(false);
-        thread.start();
-    }
-
-    public boolean isStopped() {
-        return stopped;
-    }
-
-    public void stop() {
-        stopped = true;
-    }
-}


[16/26] incubator-kylin git commit: fix

Posted by li...@apache.org.
fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c038cb4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c038cb4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c038cb4b

Branch: refs/heads/streaming
Commit: c038cb4bada510a17c9bb36a32ddbdb7ff947ac9
Parents: 9c3a606
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 11:41:56 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 11:41:56 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/streaming/kafka/Consumer.java  |  3 +++
 .../apache/kylin/streaming/kafka/Requester.java | 27 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c038cb4b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
index eaee2a1..c825d4b 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -34,6 +34,7 @@
 
 package org.apache.kylin.streaming.kafka;
 
+import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.PartitionMetadata;
@@ -93,6 +94,8 @@ public class Consumer implements Runnable {
                 logger.warn("cannot find lead broker");
                 continue;
             }
+            final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, consumerConfig);
+            offset.set(lastOffset);
             final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
             if (fetchResponse.errorCode(topic, partitionId) != 0) {
                 logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c038cb4b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
index 1f14a47..f4cdd8e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -37,7 +37,9 @@ package org.apache.kylin.streaming.kafka;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
 import kafka.cluster.Broker;
+import kafka.common.TopicAndPartition;
 import kafka.javaapi.*;
 import kafka.javaapi.consumer.SimpleConsumer;
 import org.slf4j.Logger;
@@ -45,7 +47,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Created by qianzhou on 2/15/15.
@@ -114,8 +118,29 @@ public final class Requester {
         SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
         kafka.api.FetchRequest req = new FetchRequestBuilder()
                 .clientId(clientName)
-                .addFetch(topic, partitionId, offset, consumerConfig.getTimeout()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+                .addFetch(topic, partitionId, offset, consumerConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                 .build();
         return consumer.fetch(req);
     }
+
+    public static long getLastOffset(String topic, int partitionId,
+                                     long whichTime, Broker broker, ConsumerConfig consumerConfig) {
+        String clientName = "client_" + topic + "_" + partitionId;
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+
+        if (response.hasError()) {
+            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+            return 0;
+        }
+        long[] offsets = response.offsets(topic, partitionId);
+        return offsets[0];
+    }
+
+
 }


[10/26] incubator-kylin git commit: Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming

Posted by li...@apache.org.
Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/04d9b7d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/04d9b7d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/04d9b7d6

Branch: refs/heads/streaming
Commit: 04d9b7d60f01f10681da03cc7525153fd27dacdd
Parents: 06e4b20 bd0b9f7
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Sat Feb 28 09:41:26 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Sat Feb 28 09:41:26 2015 +0800

----------------------------------------------------------------------
 .../kylin/rest/controller/CubeController.java   | 38 +++++++++++++++-----
 webapp/app/js/controllers/cubeEdit.js           | 12 +++++++
 webapp/app/js/controllers/cubeSchema.js         | 30 ++++++++++++++++
 webapp/app/js/controllers/cubes.js              |  5 ++-
 webapp/app/js/filters/filter.js                 |  4 +--
 webapp/app/partials/cubeDesigner/measures.html  | 15 +++-----
 6 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[05/26] incubator-kylin git commit: Merge pull request #430 from janzhongi/inverted-index

Posted by li...@apache.org.
Merge pull request #430 from janzhongi/inverted-index

fix return type issue when create measure KYLIN-600

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/07a02fa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/07a02fa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/07a02fa0

Branch: refs/heads/streaming
Commit: 07a02fa03aa485823c17552571a868fa73f91b82
Parents: 598476f 0a5830e
Author: Zhong,Jian <ji...@ebay.com>
Authored: Fri Feb 27 16:52:25 2015 +0800
Committer: Zhong,Jian <ji...@ebay.com>
Committed: Fri Feb 27 16:52:25 2015 +0800

----------------------------------------------------------------------
 .../kylin/rest/controller/CubeController.java   | 38 +++++++++++++++-----
 webapp/app/js/controllers/cubeEdit.js           | 12 +++++++
 webapp/app/js/controllers/cubeSchema.js         | 30 ++++++++++++++++
 webapp/app/js/controllers/cubes.js              |  5 ++-
 webapp/app/js/filters/filter.js                 |  4 +--
 webapp/app/partials/cubeDesigner/measures.html  | 15 +++-----
 6 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[24/26] incubator-kylin git commit: change read workload for streaming test

Posted by li...@apache.org.
change read workload for streaming test


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/29b27818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/29b27818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/29b27818

Branch: refs/heads/streaming
Commit: 29b27818c866b7fd5ba7f0046dc68ca612ff505b
Parents: 1d6505b
Author: honma <ho...@ebay.com>
Authored: Tue Mar 3 15:39:05 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Mar 3 15:39:45 2015 +0800

----------------------------------------------------------------------
 common/src/test/java/org/apache/kylin/common/util/BasicTest.java | 4 +---
 .../java/org/apache/kylin/job/tools/HbaseStreamingInput.java     | 3 ++-
 2 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29b27818/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 4a6edc8..4d2a25c 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -48,7 +48,6 @@ public class BasicTest {
 
     private void foo(Long a) {
         System.out.printf("a");
-
     }
 
     private void foo(Integer b) {
@@ -64,12 +63,11 @@ public class BasicTest {
     public void test1() throws Exception {
     }
 
-    final private Semaphore semaphore = new Semaphore(0);
-
     @Test
     @Ignore("fix it later")
     public void test2() throws IOException, ConfigurationException {
 
+        System.out.println(512<<20);
     }
 
     private static String time(long t) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29b27818/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
index ff313b4..8479391 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
@@ -41,6 +41,7 @@ public class HbaseStreamingInput {
             logger.info("Creating HTable '" + tableName + "'");
             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
             desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split
+            desc.setMemStoreFlushSize(512 << 20);//512M
 
             HColumnDescriptor fd = new HColumnDescriptor(CF);
             fd.setBlocksize(CELL_SIZE);
@@ -144,7 +145,7 @@ public class HbaseStreamingInput {
             long leftBound = getFirstKeyTime(table);
             long rightBound = System.currentTimeMillis();
 
-            for (int t = 0; t < 10; ++t) {
+            for (int t = 0; t < 5; ++t) {
                 long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound));
                 long end = start + 600000;//a period of 10 minutes
                 logger.info("A scan from " + formatTime(start) + " to " + formatTime(end));


[09/26] incubator-kylin git commit: KYLIN-609 Add Hybrid as a federation of Cube and Inverted-index realization

Posted by li...@apache.org.
KYLIN-609 Add Hybrid as a federation of Cube and Inverted-index realization

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/06e4b203
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/06e4b203
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/06e4b203

Branch: refs/heads/streaming
Commit: 06e4b20386621ca4bdd70513da4156c3022e8c80
Parents: 7f1abe0
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Sat Feb 28 09:41:13 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Sat Feb 28 09:41:13 2015 +0800

----------------------------------------------------------------------
 .../RoutingRules/RealizationPriorityRule.java   |  5 ++--
 .../kylin/storage/hybrid/HybridInstance.java    | 28 +++++++++++---------
 .../storage/hybrid/HybridTupleIterator.java     | 15 ++++++++---
 3 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06e4b203/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
index 6a67140..09a7dce 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
@@ -33,8 +33,9 @@ public class RealizationPriorityRule extends RoutingRule {
     static Map<RealizationType, Integer> priorities = Maps.newHashMap();
 
     static {
-        priorities.put(RealizationType.CUBE, 0);
-        priorities.put(RealizationType.INVERTED_INDEX, 1);
+        priorities.put(RealizationType.HYBRID, 0);
+        priorities.put(RealizationType.CUBE, 1);
+        priorities.put(RealizationType.INVERTED_INDEX, 2);
     }
 
     public static void setPriorities(Map<RealizationType, Integer> priorities) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06e4b203/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index ebba9f5..789ddc0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -53,15 +53,11 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
             throw new IllegalArgumentException("Didn't find realization '" + realTimeRealization.getType() + "'" + " with name '" + realTimeRealization.getRealization() + "' in '" + name + "'");
         }
 
-
-        if (realTimeRealizationInstance.getDateRangeEnd() < historyRealizationInstance.getDateRangeEnd()) {
-            throw new IllegalStateException("The real time realization's dateRangeEnd should be greater than history realization's dateRangeEnd.");
-        }
     }
 
     @Override
     public boolean isCapable(SQLDigest digest) {
-        return historyRealizationInstance.isCapable(digest) || realTimeRealizationInstance.isCapable(digest);
+        return getHistoryRealizationInstance().isCapable(digest) || getRealTimeRealizationInstance().isCapable(digest);
     }
 
     @Override
@@ -78,17 +74,17 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
 
     @Override
     public String getFactTable() {
-        return null;
+        return getHistoryRealizationInstance().getFactTable();
     }
 
     @Override
     public List<TblColRef> getAllColumns() {
-        return null;
+        return getHistoryRealizationInstance().getAllColumns();
     }
 
     @Override
     public List<MeasureDesc> getMeasures() {
-        return null;
+        return getHistoryRealizationInstance().getMeasures();
     }
 
     @Override
@@ -138,28 +134,34 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
     }
 
     public IRealization getHistoryRealizationInstance() {
+        if (historyRealizationInstance == null) {
+            this.init();
+        }
         return historyRealizationInstance;
     }
 
     public IRealization getRealTimeRealizationInstance() {
+        if (realTimeRealizationInstance == null) {
+            this.init();
+        }
         return realTimeRealizationInstance;
     }
 
     @Override
     public long getDateRangeStart() {
-        return historyRealizationInstance.getDateRangeStart();
+        return Math.min(getHistoryRealizationInstance().getDateRangeStart(), getRealTimeRealizationInstance().getDateRangeStart());
     }
 
     @Override
     public long getDateRangeEnd() {
-        return realTimeRealizationInstance.getDateRangeEnd();
+        return Math.max(getHistoryRealizationInstance().getDateRangeEnd(), getRealTimeRealizationInstance().getDateRangeEnd());
     }
 
     public String getModelName() {
-        if(historyRealizationInstance instanceof CubeInstance) {
-            return ((CubeInstance)historyRealizationInstance).getDescriptor().getModelName();
+        if (getHistoryRealizationInstance() instanceof CubeInstance) {
+            return ((CubeInstance) historyRealizationInstance).getDescriptor().getModelName();
         }
 
-        return ((IIInstance)historyRealizationInstance).getDescriptor().getModelName();
+        return ((IIInstance) historyRealizationInstance).getDescriptor().getModelName();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06e4b203/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
index daf3057..516bc58 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
@@ -19,12 +19,19 @@ public class HybridTupleIterator implements ITupleIterator {
 
     @Override
     public boolean hasNext() {
-        return iterators[currentIndex].hasNext() || (currentIndex + 1 < iterators.length && iterators[currentIndex + 1].hasNext());
+        if (iterators[currentIndex].hasNext())
+            return true;
+
+        while (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
+            currentIndex++;
+        }
+
+        return iterators[currentIndex].hasNext();
     }
 
     @Override
     public ITuple next() {
-        if (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
+        while (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
             currentIndex++;
         }
 
@@ -33,6 +40,8 @@ public class HybridTupleIterator implements ITupleIterator {
 
     @Override
     public void close() {
-
+        for (ITupleIterator i : iterators) {
+            i.close();
+        }
     }
 }


[06/26] incubator-kylin git commit: Merge branch 'inverted-index' into streaming

Posted by li...@apache.org.
Merge branch 'inverted-index' into streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bd0b9f70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bd0b9f70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bd0b9f70

Branch: refs/heads/streaming
Commit: bd0b9f700f82f29c52d8385d5f8158aea2a5636d
Parents: aa49da3 07a02fa
Author: liyang@apache.org <ya...@D-SHC-00801746.corp.ebay.com>
Authored: Fri Feb 27 10:00:24 2015 +0000
Committer: liyang@apache.org <ya...@D-SHC-00801746.corp.ebay.com>
Committed: Fri Feb 27 10:00:24 2015 +0000

----------------------------------------------------------------------
 .../kylin/rest/controller/CubeController.java   | 38 +++++++++++++++-----
 webapp/app/js/controllers/cubeEdit.js           | 12 +++++++
 webapp/app/js/controllers/cubeSchema.js         | 30 ++++++++++++++++
 webapp/app/js/controllers/cubes.js              |  5 ++-
 webapp/app/js/filters/filter.js                 |  4 +--
 webapp/app/partials/cubeDesigner/measures.html  | 15 +++-----
 6 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[02/26] incubator-kylin git commit: KYLIN-609 Add Hybrid as a federation of Cube and Inverted-index realization

Posted by li...@apache.org.
KYLIN-609 Add Hybrid as a federation of Cube and Inverted-index realization

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1c0719e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1c0719e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1c0719e2

Branch: refs/heads/streaming
Commit: 1c0719e2310208e8cce2438f93ac6b1b595705c9
Parents: a536576
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Fri Feb 27 14:24:59 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Fri Feb 27 14:24:59 2015 +0800

----------------------------------------------------------------------
 .../kylin/storage/hybrid/HybridInstance.java    | 22 ++++++
 .../storage/hybrid/HybridStorageEngine.java     | 79 +++++++++++++++++++-
 .../storage/hybrid/HybridTupleIterator.java     | 38 ++++++++++
 3 files changed, 135 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index b550fc0..ebba9f5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -5,6 +5,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -43,6 +45,18 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
         historyRealizationInstance = registry.getRealization(historyRealization.getType(), historyRealization.getRealization());
         realTimeRealizationInstance = registry.getRealization(realTimeRealization.getType(), realTimeRealization.getRealization());
 
+        if (historyRealizationInstance == null) {
+            throw new IllegalArgumentException("Didn't find realization '" + historyRealization.getType() + "'" + " with name '" + historyRealization.getRealization() + "' in '" + name + "'");
+        }
+
+        if (realTimeRealizationInstance == null) {
+            throw new IllegalArgumentException("Didn't find realization '" + realTimeRealization.getType() + "'" + " with name '" + realTimeRealization.getRealization() + "' in '" + name + "'");
+        }
+
+
+        if (realTimeRealizationInstance.getDateRangeEnd() < historyRealizationInstance.getDateRangeEnd()) {
+            throw new IllegalStateException("The real time realization's dateRangeEnd should be greater than history realization's dateRangeEnd.");
+        }
     }
 
     @Override
@@ -140,4 +154,12 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
     public long getDateRangeEnd() {
         return realTimeRealizationInstance.getDateRangeEnd();
     }
+
+    public String getModelName() {
+        if(historyRealizationInstance instanceof CubeInstance) {
+            return ((CubeInstance)historyRealizationInstance).getDescriptor().getModelName();
+        }
+
+        return ((IIInstance)historyRealizationInstance).getDescriptor().getModelName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
index 566a4f5..8a50b7e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -1,13 +1,24 @@
 package org.apache.kylin.storage.hybrid;
 
-import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.filter.*;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.IStorageEngine;
 import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageEngineFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+
 /**
  * Created by shaoshi on 2/13/15.
  */
@@ -27,10 +38,70 @@ public class HybridStorageEngine implements IStorageEngine {
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
 
-        long conditionBoundry = hybridInstance.getHistoryRealizationInstance().getDateRangeEnd();
+        long boundary = hybridInstance.getHistoryRealizationInstance().getDateRangeEnd();
+        FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd");
+        String boundaryDate = format.format(boundary);
+
+        Collection<TblColRef> filterCols = sqlDigest.filterColumns;
+
+        String modelName = hybridInstance.getModelName();
+
+        MetadataManager metaMgr = getMetadataManager();
+
+        DataModelDesc modelDesc = metaMgr.getDataModelDesc(modelName);
+
+        String partitionColFull = modelDesc.getPartitionDesc().getPartitionDateColumn();
+
+        String partitionTable = partitionColFull.substring(0, partitionColFull.lastIndexOf("."));
+        String partitionCol = partitionColFull.substring(partitionColFull.lastIndexOf(".") + 1);
+
+
+        TableDesc factTbl = metaMgr.getTableDesc(partitionTable);
+        ColumnDesc columnDesc = factTbl.findColumnByName(partitionCol);
+        TblColRef partitionColRef = new TblColRef(columnDesc);
+
+        // search the historic realization
+
+        ITupleIterator iterator1 = searchRealization(hybridInstance.getHistoryRealizationInstance(), context, sqlDigest);
+
+
+        // now search the realtime realization, need add the boundary condition
 
-        TupleFilter filter = sqlDigest.filter;
+        CompareTupleFilter compareTupleFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
+
+        ColumnTupleFilter columnTupleFilter = new ColumnTupleFilter(partitionColRef);
+        ConstantTupleFilter constantTupleFilter = new ConstantTupleFilter(boundaryDate);
+        compareTupleFilter.addChild(columnTupleFilter);
+        compareTupleFilter.addChild(constantTupleFilter);
+
+        LogicalTupleFilter logicalTupleFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+
+        logicalTupleFilter.addChild(sqlDigest.filter);
+        logicalTupleFilter.addChild(compareTupleFilter);
+
+        sqlDigest.filter = logicalTupleFilter;
+
+        if (!sqlDigest.filterColumns.contains(partitionColRef)) {
+            sqlDigest.filterColumns.add(partitionColRef);
+        }
+
+        if (!sqlDigest.allColumns.contains(partitionColRef)) {
+            sqlDigest.allColumns.add(partitionColRef);
+        }
+
+        ITupleIterator iterator2 = searchRealization(hybridInstance.getRealTimeRealizationInstance(), context, sqlDigest);
+
+
+        return new HybridTupleIterator(new ITupleIterator[]{iterator1, iterator2});
+    }
+
+    private ITupleIterator searchRealization(IRealization realization, StorageContext context, SQLDigest sqlDigest) {
+
+        IStorageEngine storageEngine = StorageEngineFactory.getStorageEngine(realization);
+        return storageEngine.search(context, sqlDigest);
+    }
 
-        return null;
+    private MetadataManager getMetadataManager() {
+        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
new file mode 100644
index 0000000..daf3057
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
@@ -0,0 +1,38 @@
+package org.apache.kylin.storage.hybrid;
+
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+
+/**
+ * Created by shaoshi on 2/27/15.
+ */
+public class HybridTupleIterator implements ITupleIterator {
+
+    private ITupleIterator[] iterators;
+
+    private int currentIndex;
+
+    public HybridTupleIterator(ITupleIterator[] iterators) {
+        this.iterators = iterators;
+        currentIndex = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterators[currentIndex].hasNext() || (currentIndex + 1 < iterators.length && iterators[currentIndex + 1].hasNext());
+    }
+
+    @Override
+    public ITuple next() {
+        if (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
+            currentIndex++;
+        }
+
+        return iterators[currentIndex].next();
+    }
+
+    @Override
+    public void close() {
+
+    }
+}


[07/26] incubator-kylin git commit: filter partiton column use only date type column

Posted by li...@apache.org.
filter partiton column use only date type column


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8735d15e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8735d15e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8735d15e

Branch: refs/heads/streaming
Commit: 8735d15e76352f94078244b5d5e909ab697f2841
Parents: 0a5830e
Author: jiazhong <ji...@ebay.com>
Authored: Fri Feb 27 18:28:06 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Fri Feb 27 18:28:06 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/cubeEdit.js           | 11 +++++--
 webapp/app/js/controllers/cubes.js              |  4 +--
 .../cubeDesigner/advanced_settings.html         | 34 +++++++++++---------
 .../app/partials/cubeDesigner/incremental.html  |  2 +-
 4 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8735d15e/webapp/app/js/controllers/cubeEdit.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js
index b632af1..21a666e 100755
--- a/webapp/app/js/controllers/cubeEdit.js
+++ b/webapp/app/js/controllers/cubeEdit.js
@@ -26,16 +26,23 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
     $scope.cubeMode = absUrl.indexOf("/cubes/add")!=-1?'addNewCube':absUrl.indexOf("/cubes/edit")!=-1?'editExistCube':'default';
 
 
-    $scope.getColumnsByTable = function (name) {
+    $scope.getColumnsByTable = function (tableName) {
         var temp = [];
         angular.forEach(TableModel.selectProjectTables, function (table) {
-            if (table.name == name) {
+            if (table.name == tableName) {
                 temp = table.columns;
             }
         });
         return temp;
     };
 
+    $scope.getPartitonColumns = function(tableName){
+        var columns = _.filter($scope.getColumnsByTable(tableName),function(column){
+            return column.datatype==="date";
+        });
+        return columns;
+    };
+
     $scope.getColumnType = function (_column,table){
         var columns = $scope.getColumnsByTable(table);
         var type;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8735d15e/webapp/app/js/controllers/cubes.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubes.js b/webapp/app/js/controllers/cubes.js
index 927fda1..35266a4 100755
--- a/webapp/app/js/controllers/cubes.js
+++ b/webapp/app/js/controllers/cubes.js
@@ -360,9 +360,9 @@ KylinApp
         }
     });
 
-var jobSubmitCtrl = function ($scope, $modalInstance, CubeService, MessageService, $location, cube,metaModel, buildType,SweetAlert,loadingRequest) {
+var jobSubmitCtrl = function ($scope, $modalInstance, CubeService, MessageService, $location, cube,MetaModel, buildType,SweetAlert,loadingRequest) {
     $scope.cube = cube;
-    $scope.metaModel = metaModel;
+    $scope.metaModel = MetaModel;
     $scope.jobBuildRequest = {
         buildType: buildType,
         startTime: 0,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8735d15e/webapp/app/partials/cubeDesigner/advanced_settings.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html
old mode 100644
new mode 100755
index 5c65886..b0d4f4f
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -95,9 +95,9 @@
             <tr>
                 <th>ID</th>
                 <th>Column</th>
-                <th>Length</th>
-                <th>Dictionary</th>
                 <th>Mandatory</th>
+                <th>Dictionary</th>
+                <th>Length</th>
                 <th ng-if="state.mode=='edit'"></th>
             </tr>
 
@@ -115,13 +115,17 @@
                     <span ng-if="state.mode=='view'">{{rowkey_column.column}}</span>
                 </td>
                 <td>
-                    <!--Column Length -->
-                    <input type="text" class="form-control" placeholder="Column Length.." ng-if="state.mode=='edit'"
-                           tooltip="rowkey column length.." tooltip-trigger="focus"
-                           ng-model="rowkey_column.length" class="form-control">
+                    <!-- Mandatory -->
+                    <button type="button " ng-if="state.mode=='edit'"
+                            class="btn btn-xs btn-default {{rowkey_column.mandatory? 'active':''}}"
+                            ng-model="rowkey_column.mandatory"
+                            btn-checkbox btn-checkbox-true="true" btn-checkbox-false="false">
+                        {{rowkey_column.mandatory? 'Y':'N'}}
+                    </button>
 
-                    <span ng-if="state.mode=='view'">{{rowkey_column.length}}</span>
+                    <span ng-if="state.mode=='view'">{{rowkey_column.mandatory? 'Y':'N'}}</span>
                 </td>
+
                 <td>
                     <!--Use Dictionary-->
                     <div>
@@ -133,17 +137,17 @@
                         <span ng-if="state.mode=='view'">{{rowkey_column.dictionary}}</span>
                     </div>
                 </td>
+
                 <td>
-                    <!-- Mandatory -->
-                    <button type="button " ng-if="state.mode=='edit'"
-                            class="btn btn-xs btn-default {{rowkey_column.mandatory? 'active':''}}"
-                            ng-model="rowkey_column.mandatory"
-                            btn-checkbox btn-checkbox-true="true" btn-checkbox-false="false">
-                        {{rowkey_column.mandatory? 'Y':'N'}}
-                    </button>
+                    <!--Column Length -->
+                    <input type="text" class="form-control" placeholder="Column Length.." ng-if="state.mode=='edit'"
+                           tooltip="rowkey column length.." tooltip-trigger="focus"
+                           ng-model="rowkey_column.length" class="form-control">
 
-                    <span ng-if="state.mode=='view'">{{rowkey_column.mandatory? 'Y':'N'}}</span>
+                    <span ng-if="state.mode=='view'">{{rowkey_column.length}}</span>
                 </td>
+
+
                 <td ng-if="state.mode=='edit'">
                     <button class="btn btn-xs btn-info"
                             ng-click="removeElement(cubeMetaFrame.rowkey.rowkey_columns, rowkey_column)"><i

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8735d15e/webapp/app/partials/cubeDesigner/incremental.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/incremental.html b/webapp/app/partials/cubeDesigner/incremental.html
index 496cc36..365ef43 100755
--- a/webapp/app/partials/cubeDesigner/incremental.html
+++ b/webapp/app/partials/cubeDesigner/incremental.html
@@ -46,7 +46,7 @@
                             ng-required="metaModel.model.partition_desc.partition_date_start"
                             ng-model="metaModel.model.partition_desc.partition_date_column"
                             ng-if="state.mode=='edit'"
-                            ng-options="metaModel.model.fact_table+'.'+columns.name as metaModel.model.fact_table+'.'+columns.name for columns in getColumnsByTable(metaModel.model.fact_table)" >
+                            ng-options="metaModel.model.fact_table+'.'+columns.name as metaModel.model.fact_table+'.'+columns.name for columns in getPartitonColumns(metaModel.model.fact_table)" >
                         <option value=""></option>
                     </select>
                     <span ng-if="state.mode=='view'">


[12/26] incubator-kylin git commit: enable visulization in cubeWizard

Posted by li...@apache.org.
enable visulization in cubeWizard


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4b65f02b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4b65f02b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4b65f02b

Branch: refs/heads/streaming
Commit: 4b65f02bf0d6a7abc7829da5a376724d2ad63d9f
Parents: 504f6bf
Author: jiazhong <ji...@ebay.com>
Authored: Sat Feb 28 15:39:30 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Sat Feb 28 15:39:30 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/services/tree.js | 106 +++++++++++++++++++-----------------
 1 file changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b65f02b/webapp/app/js/services/tree.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tree.js b/webapp/app/js/services/tree.js
index 0eaf5e7..0a565af 100755
--- a/webapp/app/js/services/tree.js
+++ b/webapp/app/js/services/tree.js
@@ -69,62 +69,68 @@ KylinApp.service('CubeGraphService', function () {
             };
           }
 
-          //if (dimension.join && dimension.join.primary_key)
-          //{
-          //    angular.forEach(dimension.join.primary_key, function(pk, index){
-          //        for (var i = 0; i < dimensionNode._children.length; i++) {
-          //            if(dimensionNode._children[i].name == pk)
-          //                break;
-          //        }
-          //        if(i == dimensionNode._children.length) {
-          //            dimensionNode._children.push({
-          //                "type": "column",
-          //                "name": pk
-          //            });
-          //        }
-          //
-          //    });
-          //}
-          //
-          //if (dimension.derived)
-          //{
-          //    angular.forEach(dimension.derived, function(derived, index){
-          //        for (var i = 0; i < dimensionNode._children.length; i++) {
-          //            if(dimensionNode._children[i].name == derived)
-          //                break;
-          //        }
-          //        if(i == dimensionNode._children.length) {
-          //            dimensionNode._children.push({
-          //                "type": "column",
-          //                "name": derived + "(DERIVED)"
-          //            });
-          //        }
-          //    });
-          //}
-          //
-          //if (dimension.hierarchy)
-          //{
-          //    angular.forEach(dimension.hierarchy, function(hierarchy, index){
-          //        for (var i = 0; i < dimensionNode._children.length; i++) {
-          //            if(dimensionNode._children[i].name == hierarchy)
-          //                break;
-          //        }
-          //        if(i == dimensionNode._children.length) {
-          //            dimensionNode._children.push({
-          //                "type": "column",
-          //                "name": hierarchy.column + "(HIERARCHY)"
-          //            });
-          //        }
-          //    });
-          //}
-
           if(j == graphData.children.length) {
             graphData.children.push(dimensionNode);
           }
 
         }
       });
-      cube.graph.columnsCount = 0;
+
+      angular.forEach(cube.detail.dimensions, function (dimension, index) {
+        // for dimension on lookup table
+        if(cube.model.fact_table!==dimension.table){
+            var lookup = _.find(graphData.children,function(item){
+              return item.name === dimension.table;
+            });
+
+          angular.forEach(lookup.join.primary_key, function(pk, index){
+                  for (var i = 0; i < lookup._children.length; i++) {
+                      if(lookup._children[i].name == pk)
+                          break;
+                  }
+                  if(i == lookup._children.length) {
+                    lookup._children.push({
+                          "type": "column",
+                          "name": pk
+                      });
+                  }
+          });
+
+          if (dimension.derived&&dimension.derived.length)
+          {
+              angular.forEach(dimension.derived, function(derived, index){
+                  for (var i = 0; i < lookup._children.length; i++) {
+                      if(lookup._children[i].name == derived)
+                          break;
+                  }
+                  if(i == lookup._children.length) {
+                    lookup._children.push({
+                          "type": "column",
+                          "name": derived + "(DERIVED)"
+                      });
+                  }
+              });
+          }
+
+          if (dimension.hierarchy)
+          {
+              angular.forEach(dimension.column, function(hierarchy, index){
+                  for (var i = 0; i < lookup._children.length; i++) {
+                      if(lookup._children[i].name == hierarchy)
+                          break;
+                  }
+                  if(i == lookup._children.length) {
+                    lookup._children.push({
+                          "type": "column",
+                          "name": hierarchy + "(HIERARCHY)"
+                      });
+                  }
+              });
+          }
+
+        };
+      });
+        cube.graph.columnsCount = 0;
         cube.graph.tree = tree;
         cube.graph.root = graphData;
         cube.graph.svg = svg;


[03/26] incubator-kylin git commit: Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming

Posted by li...@apache.org.
Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7f1abe08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7f1abe08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7f1abe08

Branch: refs/heads/streaming
Commit: 7f1abe081d8152fcefc8a6a7030ac26b31ebcc4b
Parents: 1c0719e aa49da3
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Fri Feb 27 14:25:40 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Fri Feb 27 14:25:40 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  10 +-
 .../org/apache/kylin/common/util/ClassUtil.java |  53 ++++++++
 .../apache/kylin/common/util/ClasspathUtil.java |  45 -------
 .../common/persistence/ResourceToolTest.java    |   4 +-
 .../common/util/AbstractKylinTestCase.java      |   6 +-
 .../kylin/common/util/BasicHadoopTest.java      |   2 +-
 .../common/util/HBaseMetadataTestCase.java      |   2 +-
 .../common/util/LocalFileMetadataTestCase.java  |   4 +-
 .../kylin/dict/DictionaryInfoSerializer.java    |   3 +-
 .../org/apache/kylin/dict/TrieDictionary.java   |   3 +-
 .../main/java/org/apache/kylin/dict/Util.java   |  44 -------
 .../apache/kylin/job/CubeMetadataUpgrade.java   |   2 +-
 .../kylin/job/common/HadoopShellExecutable.java |   5 +-
 .../kylin/job/common/MapReduceExecutable.java   |  35 +++---
 .../kylin/job/engine/JobEngineConfig.java       |   2 +-
 .../kylin/job/hadoop/AbstractHadoopJob.java     |   2 +-
 .../kylin/job/manager/ExecutableManager.java    |  28 +++--
 .../kylin/job/BuildCubeWithEngineTest.java      |   6 +-
 .../apache/kylin/job/BuildIIWithEngineTest.java |   4 +-
 .../org/apache/kylin/job/ExportHBaseData.java   |   2 +-
 .../org/apache/kylin/job/ImportHBaseData.java   |   2 +-
 .../apache/kylin/job/SampleCubeSetupTest.java   |   4 +-
 .../kylin/job/tools/CubeMigrationTests.java     |   4 +-
 .../java/org/apache/kylin/rest/DebugTomcat.java |   2 +-
 webapp/app/js/controllers/cubeEdit.js           |  17 +--
 webapp/app/js/controllers/cubeSchema.js         |   2 +-
 webapp/app/js/controllers/cubes.js              | 122 ++++++++++---------
 webapp/app/js/model/cubeListModel.js            |   0
 webapp/app/js/model/jobListModel.js             |   2 +-
 .../app/partials/cubeDesigner/incremental.html  |   2 +-
 webapp/app/partials/cubeDesigner/measures.html  |   2 +-
 webapp/grunt.json                               |   4 +-
 32 files changed, 201 insertions(+), 224 deletions(-)
----------------------------------------------------------------------



[26/26] incubator-kylin git commit: Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming

Posted by li...@apache.org.
Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/82540950
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/82540950
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/82540950

Branch: refs/heads/streaming
Commit: 825409504f8b34c098d89392fa3070eb4641b0f7
Parents: 0970d76 29b2781
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 16:07:04 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 16:07:04 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java |  17 +-
 .../job/deployment/HbaseConfigPrinterCLI.java   |  34 ++-
 .../kylin/job/tools/HbaseStreamingInput.java    | 225 +++++++++++++++++++
 3 files changed, 266 insertions(+), 10 deletions(-)
----------------------------------------------------------------------