You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:07:57 UTC
[01/26] incubator-kylin git commit: measure update rule fix
Repository: incubator-kylin
Updated Branches:
refs/heads/streaming aa49da35f -> 825409504
measure update rule fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/80a50655
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/80a50655
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/80a50655
Branch: refs/heads/streaming
Commit: 80a50655b4290a845278ec5e24d7a4a3145ff766
Parents: 2b01dcc
Author: jiazhong <ji...@ebay.com>
Authored: Fri Feb 27 13:54:15 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Fri Feb 27 13:54:15 2015 +0800
----------------------------------------------------------------------
webapp/app/js/controllers/cubeEdit.js | 9 +++++++++
webapp/app/js/controllers/cubeSchema.js | 16 ++++++++++++++++
webapp/app/partials/cubeDesigner/measures.html | 14 ++++++--------
3 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/80a50655/webapp/app/js/controllers/cubeEdit.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js
index c2f0b5a..0e05077 100755
--- a/webapp/app/js/controllers/cubeEdit.js
+++ b/webapp/app/js/controllers/cubeEdit.js
@@ -36,6 +36,15 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
return temp;
};
+ $scope.getColumnType = function (_column,table){
+ var columns = $scope.getColumnsByTable(table);
+ angular.forEach(columns,function(column){
+ if(_column===column.name){
+ return column.type;
+ }
+ });
+ };
+
var ColFamily = function () {
var index = 1;
return function () {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/80a50655/webapp/app/js/controllers/cubeSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
old mode 100644
new mode 100755
index e03808c..4bb5762
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -107,6 +107,22 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
$scope.newMeasure = null;
};
+ // !count !count distinct
+ $scope.measureParamValueUpdate = function(){
+ if(newMeasure.function.expression!=="COUNT"&&newMeasure.function.expression!=="COUNT_DISTINCT"){
+
+ var column = $scope.newMeasure.function.parameter.value;
+
+
+ switch(newMeasure.function.expression){
+ case "SUM":
+ var colType = $scope.getColumnType(column, $scope.metaModel.model.fact_table);
+ $log.log(colType);
+ break;
+ }
+ }
+ }
+
$scope.addNewRowkeyColumn = function () {
$scope.cubeMetaFrame.rowkey.rowkey_columns.push({
"column": "",
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/80a50655/webapp/app/partials/cubeDesigner/measures.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/measures.html b/webapp/app/partials/cubeDesigner/measures.html
index ef7c12f..9a932bd 100755
--- a/webapp/app/partials/cubeDesigner/measures.html
+++ b/webapp/app/partials/cubeDesigner/measures.html
@@ -110,7 +110,7 @@
<label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Expression</b></label>
<div class="col-xs-12 col-sm-6">
<select class="form-control"
- ng-init="newMeasure.function.expression = (!!newMeasure.function.expression)?newMeasure.function.expression:cubeConfig.dftSelections.measureExpression" chosen ng-model="newMeasure.function.expression" required
+ ng-init="newMeasure.function.expression = (!!newMeasure.function.expression)?newMeasure.function.expression:cubeConfig.dftSelections.measureExpression" chosen ng-model="newMeasure.function.expression" required
ng-options="me as me for me in cubeConfig.measureExpressions">
<option value=""></option>
</select>
@@ -146,6 +146,7 @@
<select class="form-control" chosen
ng-if="newMeasure.function.parameter.type == 'column'"
ng-model="newMeasure.function.parameter.value"
+ ng-change="measureParamValueUpdate();"
ng-options="columns.name as columns.name for columns in getColumnsByTable(metaModel.model.fact_table)" >
<option value="">-- Select a Fact Table Column --</option>
</select>
@@ -158,13 +159,6 @@
<label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Return Type</b></label>
<div class="col-xs-12 col-sm-6">
<select class="form-control"
- ng-if="newMeasure.function.expression != 'COUNT_DISTINCT' && newMeasure.function.expression != 'COUNT' "
- ng-init="newMeasure.function.returntype = (!!newMeasure.function.returntype)?newMeasure.function.returntype:cubeConfig.dftSelections.measureDataType.value"
- chosen ng-model="newMeasure.function.returntype" required
- ng-options="mdt.value as mdt.name for mdt in cubeConfig.measureDataTypes">
- <option value=""></option>
- </select>
- <select class="form-control"
ng-if="newMeasure.function.expression == 'COUNT_DISTINCT'"
ng-init="newMeasure.function.returntype = (!!newMeasure.function.returntype)?newMeasure.function.returntype:cubeConfig.dftSelections.distinctDataType.value"
chosen ng-model="newMeasure.function.returntype" required
@@ -175,6 +169,10 @@
ng-if="newMeasure.function.expression == 'COUNT'"
ng-init="newMeasure.function.returntype= 'bigint' "><b> BIGINT</b>
</span>
+ <span class="font-color-default"
+ ng-if="newMeasure.function.expression != 'COUNT_DISTINCT' && newMeasure.function.expression != 'COUNT' "
+ ><b> {{newMeasure.function.returntype}}</b>
+ </span>
</div>
</div>
</div>
[19/26] incubator-kylin git commit: refactor
Posted by li...@apache.org.
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/58a6f73f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/58a6f73f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/58a6f73f
Branch: refs/heads/streaming
Commit: 58a6f73f530a5008ee565c863b0473fef5d34200
Parents: c038cb4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 18:10:48 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 18:10:48 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/streaming/kafka/Consumer.java | 14 +-
.../kylin/streaming/kafka/ConsumerConfig.java | 72 ---------
.../kylin/streaming/kafka/KafkaConfig.java | 151 +++++++++++++++++++
.../apache/kylin/streaming/kafka/Requester.java | 26 ++--
.../kylin/streaming/kafka/TopicConfig.java | 66 --------
.../kylin/streaming/kafka/KafkaBaseTest.java | 11 +-
.../kylin/streaming/kafka/KafkaConfigTest.java | 64 ++++++++
.../streaming/kafka/KafkaConsumerTest.java | 15 +-
.../kylin/streaming/kafka/RequesterTest.java | 26 +---
.../kylin/streaming/kafka/TestConstants.java | 48 ------
.../kylin/streaming/kafka/TestProducer.java | 25 ++-
.../kafka_streaming_test/kafka.properties | 10 ++
12 files changed, 286 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
index c825d4b..074ef01 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -56,20 +56,20 @@ public class Consumer implements Runnable {
private String topic;
private int partitionId;
- private ConsumerConfig consumerConfig;
+ private KafkaConfig kafkaConfig;
private List<Broker> replicaBrokers;
private AtomicLong offset = new AtomicLong();
private BlockingQueue<Stream> streamQueue;
private Logger logger;
- public Consumer(String topic, int partitionId, List<Broker> initialBrokers, ConsumerConfig consumerConfig) {
+ public Consumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
this.topic = topic;
this.partitionId = partitionId;
- this.consumerConfig = consumerConfig;
+ this.kafkaConfig = kafkaConfig;
this.replicaBrokers = initialBrokers;
logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
- streamQueue = new ArrayBlockingQueue<Stream>(consumerConfig.getMaxReadCount());
+ streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
}
public BlockingQueue<Stream> getStreamQueue() {
@@ -77,7 +77,7 @@ public class Consumer implements Runnable {
}
private Broker getLeadBroker() {
- final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, consumerConfig);
+ final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
replicaBrokers = partitionMetadata.replicas();
return partitionMetadata.leader();
@@ -94,9 +94,9 @@ public class Consumer implements Runnable {
logger.warn("cannot find lead broker");
continue;
}
- final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, consumerConfig);
+ final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
offset.set(lastOffset);
- final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
+ final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
if (fetchResponse.errorCode(topic, partitionId) != 0) {
logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
continue;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
deleted file mode 100644
index 3bdbd13..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class ConsumerConfig {
-
- private int timeout;
-
- private int maxReadCount;
-
- private int bufferSize;
-
- public int getTimeout() {
- return timeout;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public int getMaxReadCount() {
- return maxReadCount;
- }
-
- public void setMaxReadCount(int maxReadCount) {
- this.maxReadCount = maxReadCount;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public void setBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
new file mode 100644
index 0000000..82513eb
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
@@ -0,0 +1,151 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfig {
+
+ private List<Broker> brokers;
+
+ private String zookeeper;
+
+ private String topic;
+
+ private int timeout;
+
+ private int maxReadCount;
+
+ private int bufferSize;
+
+ private int partitionId;
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMaxReadCount() {
+ return maxReadCount;
+ }
+
+ public void setMaxReadCount(int maxReadCount) {
+ this.maxReadCount = maxReadCount;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public List<Broker> getBrokers() {
+ return brokers;
+ }
+
+ public void setBrokers(List<Broker> brokers) {
+ this.brokers = brokers;
+ }
+
+ public String getZookeeper() {
+ return zookeeper;
+ }
+
+ public void setZookeeper(String zookeeper) {
+ this.zookeeper = zookeeper;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public static KafkaConfig load(KafkaConfig config) {
+ KafkaConfig result = new KafkaConfig();
+ result.setBufferSize(config.getBufferSize());
+ result.setMaxReadCount(config.getMaxReadCount());
+ result.setTimeout(config.getTimeout());
+ result.setTopic(config.getTopic());
+ result.setZookeeper(config.getZookeeper());
+ result.setPartitionId(config.getPartitionId());
+ result.setBrokers(config.getBrokers());
+ return result;
+ }
+
+ public static KafkaConfig load(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ KafkaConfig result = new KafkaConfig();
+ result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
+ result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
+ result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
+ result.setTopic(properties.getProperty("topic"));
+ result.setZookeeper(properties.getProperty("zookeeper"));
+ result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
+
+ int id = 0;
+ List<Broker> brokers = Lists.newArrayList();
+ for (String str: properties.getProperty("brokers").split(",")) {
+ final String[] split = str.split(":");
+ final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
+ brokers.add(broker);
+ }
+ result.setBrokers(brokers);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
index f4cdd8e..8811695 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -58,11 +58,11 @@ public final class Requester {
private static final Logger logger = LoggerFactory.getLogger(Requester.class);
- public static TopicMeta getKafkaTopicMeta(TopicConfig topicConfig, ConsumerConfig consumerConfig) {
+ public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
SimpleConsumer consumer;
- for (Broker broker : topicConfig.getBrokers()) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(topicConfig.getTopic());
+ for (Broker broker : kafkaConfig.getBrokers()) {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
@@ -80,16 +80,16 @@ public final class Requester {
return partitionMetadata.partitionId();
}
});
- return new TopicMeta(topicConfig.getTopic(), partitionIds);
+ return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
}
- logger.debug("cannot find topic:" + topicConfig.getTopic());
+ logger.debug("cannot find topic:" + kafkaConfig.getTopic());
return null;
}
- public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, ConsumerConfig consumerConfig) {
+ public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
SimpleConsumer consumer;
for (Broker broker : brokers) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
@@ -113,20 +113,20 @@ public final class Requester {
return null;
}
- public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, ConsumerConfig consumerConfig) {
+ public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
final String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
kafka.api.FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
- .addFetch(topic, partitionId, offset, consumerConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+ .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
.build();
return consumer.fetch(req);
}
public static long getLastOffset(String topic, int partitionId,
- long whichTime, Broker broker, ConsumerConfig consumerConfig) {
+ long whichTime, Broker broker, KafkaConfig kafkaConfig) {
String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
deleted file mode 100644
index 4aa9671..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.cluster.Broker;
-
-import java.util.List;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TopicConfig {
-
- private String topic;
-
- private List<Broker> brokers;
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public List<Broker> getBrokers() {
- return brokers;
- }
-
- public void setBrokers(List<Broker> brokers) {
- this.brokers = brokers;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
index a1f9b87..537a15d 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
@@ -41,6 +41,7 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Properties;
/**
@@ -52,9 +53,15 @@ public abstract class KafkaBaseTest {
protected static ZkClient zkClient;
+ protected static KafkaConfig kafkaConfig;
+
@BeforeClass
- public static void beforeClass() {
- zkClient = new ZkClient(TestConstants.ZOOKEEPER);
+ public static void beforeClass() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ kafkaConfig = KafkaConfig.load(properties);
+
+ zkClient = new ZkClient(kafkaConfig.getZookeeper());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
new file mode 100644
index 0000000..3c9bd87
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfigTest {
+
+ @Test
+ public void test() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ KafkaConfig config = KafkaConfig.load(properties);
+ assertEquals(1000, config.getMaxReadCount());
+ assertEquals(65536, config.getBufferSize());
+ assertEquals(60000, config.getTimeout());
+ assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
+ assertEquals("kafka_stream_test", config.getTopic());
+ assertEquals(0, config.getPartitionId());
+ assertEquals(1, config.getBrokers().size());
+ assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
index d89695d..4449d37 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
@@ -36,10 +36,12 @@ package org.apache.kylin.streaming.kafka;
import com.google.common.collect.Lists;
import kafka.cluster.Broker;
+import kafka.consumer.ConsumerConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -58,7 +60,7 @@ public class KafkaConsumerTest extends KafkaBaseTest {
private static final int TOTAL_SEND_COUNT = 100;
@Before
- public void before() {
+ public void before() throws IOException {
producer = new TestProducer(TOTAL_SEND_COUNT);
producer.start();
}
@@ -80,18 +82,11 @@ public class KafkaConsumerTest extends KafkaBaseTest {
@Test
public void test() throws InterruptedException {
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setTopic(TestConstants.TOPIC);
- topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
- ConsumerConfig consumerConfig = new ConsumerConfig();
- consumerConfig.setBufferSize(64 * 1024);
- consumerConfig.setMaxReadCount(1000);
- consumerConfig.setTimeout(60 * 1000);
- final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+ final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
List<BlockingQueue<Stream>> queues = Lists.newArrayList();
for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
- Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, Lists.asList(TestConstants.BROKER, new Broker[0]), consumerConfig);
+ Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
queues.add(consumer.getStreamQueue());
executorService.execute(consumer);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
index fd8cc63..694c1fd 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
@@ -35,7 +35,6 @@
package org.apache.kylin.streaming.kafka;
import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Collections;
@@ -47,21 +46,9 @@ import static org.junit.Assert.*;
*/
public class RequesterTest extends KafkaBaseTest {
- private static TopicConfig topicConfig;
- private static ConsumerConfig consumerConfig;
+ private static final String NON_EXISTED_TOPIC = "non_existent_topic";
- private static final String UNEXISTED_TOPIC = "unexist_topic";
- @BeforeClass
- public static void beforeClass() {
- topicConfig = new TopicConfig();
- topicConfig.setTopic(TestConstants.TOPIC);
- topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
- consumerConfig = new ConsumerConfig();
- consumerConfig.setBufferSize(64 * 1024);
- consumerConfig.setMaxReadCount(1000);
- consumerConfig.setTimeout(60 * 1000);
- }
@AfterClass
public static void afterClass() {
@@ -69,16 +56,15 @@ public class RequesterTest extends KafkaBaseTest {
@Test
public void testTopicMeta() throws Exception {
- TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+ TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
assertNotNull(kafkaTopicMeta);
assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
- assertEquals(topicConfig.getTopic(), kafkaTopicMeta.getName());
+ assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
- TopicConfig anotherTopicConfig = new TopicConfig();
- anotherTopicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
- anotherTopicConfig.setTopic(UNEXISTED_TOPIC);
+ KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
+ anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
- kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig, consumerConfig);
+ kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
assertTrue(kafkaTopicMeta == null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
deleted file mode 100644
index 85a6ba3..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.cluster.Broker;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TestConstants {
-
- public static final String TOPIC = "kafka_stream_test";
- public static final String ZOOKEEPER = "sandbox.hortonworks.com:2181";
- public static final Broker BROKER = new Broker(0, "sandbox.hortonworks.com", 6667);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
index 54ad583..cd3b166 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
@@ -34,12 +34,19 @@
package org.apache.kylin.streaming.kafka;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Properties;
/**
@@ -49,7 +56,7 @@ public class TestProducer {
private volatile boolean stopped = false;
- private static final Logger logger = LoggerFactory.getLogger(TestConstants.class);
+ private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
private final int sendCount;
@@ -57,9 +64,19 @@ public class TestProducer {
this.sendCount = sendCount;
}
- public void start() {
+ public void start() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
+
Properties props = new Properties();
- props.put("metadata.broker.list", TestConstants.BROKER.getConnectionString());
+ props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable Broker broker) {
+ return broker.getConnectionString();
+ }
+ }), ","));
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
@@ -70,7 +87,7 @@ public class TestProducer {
public void run() {
int count = 0;
while (!stopped && count < sendCount) {
- final KeyedMessage<String, String> message = new KeyedMessage<>(TestConstants.TOPIC, "current time is:" + System.currentTimeMillis());
+ final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
producer.send(message);
count++;
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/resources/kafka_streaming_test/kafka.properties
----------------------------------------------------------------------
diff --git a/streaming/src/test/resources/kafka_streaming_test/kafka.properties b/streaming/src/test/resources/kafka_streaming_test/kafka.properties
new file mode 100644
index 0000000..ae762e3
--- /dev/null
+++ b/streaming/src/test/resources/kafka_streaming_test/kafka.properties
@@ -0,0 +1,10 @@
+zookeeper=sandbox.hortonworks.com:2181
+
+brokers=sandbox.hortonworks.com:6667
+
+consumer.timeout=60000
+consumer.bufferSize=65536
+consumer.maxReadCount=1000
+
+topic=kafka_stream_test
+partitionId=0
[18/26] incubator-kylin git commit: Merge branch 'inverted-index'
into streaming
Posted by li...@apache.org.
Merge branch 'inverted-index' into streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f42ef837
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f42ef837
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f42ef837
Branch: refs/heads/streaming
Commit: f42ef837bf0f01f4b88672654b698a4856bb6f7d
Parents: c038cb4 8e011db
Author: liyang@apache.org <ya...@D-SHC-00801746.corp.ebay.com>
Authored: Mon Mar 2 06:04:03 2015 +0000
Committer: liyang@apache.org <ya...@D-SHC-00801746.corp.ebay.com>
Committed: Mon Mar 2 06:04:03 2015 +0000
----------------------------------------------------------------------
.../apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[11/26] incubator-kylin git commit: update visulization code in
cubedesigner to show model partial done
Posted by li...@apache.org.
update visulization code in cubedesigner to show model partial done
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/504f6bf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/504f6bf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/504f6bf3
Branch: refs/heads/streaming
Commit: 504f6bf351a5f75cde4eefb70d1040e9056425da
Parents: 10b5029
Author: jiazhong <ji...@ebay.com>
Authored: Sat Feb 28 15:01:39 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Sat Feb 28 15:01:39 2015 +0800
----------------------------------------------------------------------
webapp/app/js/controllers/cube.js | 0
webapp/app/js/controllers/cubeSchema.js | 8 +-
webapp/app/js/services/tree.js | 159 ++++++++++++------------
webapp/app/partials/cubes/cube_detail.html | 2 +-
webapp/bower.json | 4 +-
5 files changed, 90 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/app/js/controllers/cube.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cube.js b/webapp/app/js/controllers/cube.js
old mode 100644
new mode 100755
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/app/js/controllers/cubeSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index 7bad2d8..17f7b71 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -18,7 +18,7 @@
'use strict';
-KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel) {
+KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList) {
$scope.projects = [];
$scope.newDimension = null;
@@ -65,6 +65,12 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
$scope.metaModel.model = model;
+ // add model ref for cube
+ angular.forEach(CubeList.cubes,function(cube){
+ if(cube.name===$scope.cubeMetaFrame.name||cube.descriptor===$scope.cubeMetaFrame.name){
+ cube.model = model;
+ }
+ });
//convert GMT mills ,to make sure partition date show GMT Date
//should run only one time
if($scope.metaModel.model.partition_desc&&$scope.metaModel.model.partition_desc.partition_date_start)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/app/js/services/tree.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tree.js b/webapp/app/js/services/tree.js
old mode 100644
new mode 100755
index bee3ac6..0eaf5e7
--- a/webapp/app/js/services/tree.js
+++ b/webapp/app/js/services/tree.js
@@ -38,92 +38,93 @@ KylinApp.service('CubeGraphService', function () {
var graphData = {
"type": "fact",
- "name": cube.detail.fact_table,
+ "name": cube.model.fact_table,
"children": []
};
- angular.forEach(cube.detail.dimensions, function (dimension, index) {
- if (dimension.join && dimension.join.primary_key.length > 0) {
-
- var dimensionNode;
-
- /* Loop through the graphData.children array to find out: If the LKP table is already existed */
- for(var j = 0; j < graphData.children.length; j++ ) {
- if(graphData.children[j].name == dimension.table){
- dimensionNode = graphData.children[j];
- break;
- }
- }
-
- /* If not existed, create dimensionNode and push it */
- if(j == graphData.children.length) {
- dimensionNode = {
- "type": "dimension",
- "name": dimension.table,
- "join": dimension.join,
- "children": [],
- "_children": []
- };
- }
-
- if (dimension.join && dimension.join.primary_key)
- {
- angular.forEach(dimension.join.primary_key, function(pk, index){
- for (var i = 0; i < dimensionNode._children.length; i++) {
- if(dimensionNode._children[i].name == pk)
- break;
- }
- if(i == dimensionNode._children.length) {
- dimensionNode._children.push({
- "type": "column",
- "name": pk
- });
- }
-
- });
- }
-
- if (dimension.derived)
- {
- angular.forEach(dimension.derived, function(derived, index){
- for (var i = 0; i < dimensionNode._children.length; i++) {
- if(dimensionNode._children[i].name == derived)
- break;
- }
- if(i == dimensionNode._children.length) {
- dimensionNode._children.push({
- "type": "column",
- "name": derived + "(DERIVED)"
- });
- }
- });
- }
+ cube.graph = (!!cube.graph) ? cube.graph : {};
- if (dimension.hierarchy)
- {
- angular.forEach(dimension.hierarchy, function(hierarchy, index){
- for (var i = 0; i < dimensionNode._children.length; i++) {
- if(dimensionNode._children[i].name == hierarchy)
- break;
- }
- if(i == dimensionNode._children.length) {
- dimensionNode._children.push({
- "type": "column",
- "name": hierarchy.column + "(HIERARCHY)"
- });
- }
- });
- }
+ //angular.forEach(cube.detail.dimensions, function (dimension, index) {
+ angular.forEach(cube.model.lookups, function (lookup, index) {
+ if (lookup.join && lookup.join.primary_key.length > 0) {
- if(j == graphData.children.length) {
- graphData.children.push(dimensionNode);
- }
+ var dimensionNode;
+ /* Loop through the graphData.children array to find out: If the LKP table is already existed */
+ for(var j = 0; j < graphData.children.length; j++ ) {
+ if(graphData.children[j].name == lookup.table){
+ dimensionNode = graphData.children[j];
+ break;
}
- });
-
- cube.graph = (!!cube.graph) ? cube.graph : {};
- cube.graph.columnsCount = 0;
+ }
+
+ /* If not existed, create dimensionNode and push it */
+ if(j == graphData.children.length) {
+ dimensionNode = {
+ "type": "dimension",
+ "name": lookup.table,
+ "join": lookup.join,
+ "children": [],
+ "_children": []
+ };
+ }
+
+ //if (dimension.join && dimension.join.primary_key)
+ //{
+ // angular.forEach(dimension.join.primary_key, function(pk, index){
+ // for (var i = 0; i < dimensionNode._children.length; i++) {
+ // if(dimensionNode._children[i].name == pk)
+ // break;
+ // }
+ // if(i == dimensionNode._children.length) {
+ // dimensionNode._children.push({
+ // "type": "column",
+ // "name": pk
+ // });
+ // }
+ //
+ // });
+ //}
+ //
+ //if (dimension.derived)
+ //{
+ // angular.forEach(dimension.derived, function(derived, index){
+ // for (var i = 0; i < dimensionNode._children.length; i++) {
+ // if(dimensionNode._children[i].name == derived)
+ // break;
+ // }
+ // if(i == dimensionNode._children.length) {
+ // dimensionNode._children.push({
+ // "type": "column",
+ // "name": derived + "(DERIVED)"
+ // });
+ // }
+ // });
+ //}
+ //
+ //if (dimension.hierarchy)
+ //{
+ // angular.forEach(dimension.hierarchy, function(hierarchy, index){
+ // for (var i = 0; i < dimensionNode._children.length; i++) {
+ // if(dimensionNode._children[i].name == hierarchy)
+ // break;
+ // }
+ // if(i == dimensionNode._children.length) {
+ // dimensionNode._children.push({
+ // "type": "column",
+ // "name": hierarchy.column + "(HIERARCHY)"
+ // });
+ // }
+ // });
+ //}
+
+ if(j == graphData.children.length) {
+ graphData.children.push(dimensionNode);
+ }
+
+ }
+ });
+ cube.graph.columnsCount = 0;
cube.graph.tree = tree;
cube.graph.root = graphData;
cube.graph.svg = svg;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/app/partials/cubes/cube_detail.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubes/cube_detail.html b/webapp/app/partials/cubes/cube_detail.html
old mode 100644
new mode 100755
index 39a2860..6668b8d
--- a/webapp/app/partials/cubes/cube_detail.html
+++ b/webapp/app/partials/cubes/cube_detail.html
@@ -110,4 +110,4 @@
</div>
</div>
</div>
-</div>
\ No newline at end of file
+</div>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504f6bf3/webapp/bower.json
----------------------------------------------------------------------
diff --git a/webapp/bower.json b/webapp/bower.json
old mode 100644
new mode 100755
index 31ea559..be8eca3
--- a/webapp/bower.json
+++ b/webapp/bower.json
@@ -26,8 +26,8 @@
"angular-ui-sortable": "0.13.1",
"underscore": "~1.7.0",
"fuelux": "~3.5.1",
- "angular-animate": "1.2",
- "angular-cookies":"1.2"
+ "angular-animate": "1.2",
+ "angular-cookies":"1.2"
},
"devDependencies": {
[22/26] incubator-kylin git commit: revert joda dependency,
and revise hbasestreaminginput
Posted by li...@apache.org.
revert joda dependency, and revise hbasestreaminginput
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1d6505b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1d6505b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1d6505b3
Branch: refs/heads/streaming
Commit: 1d6505b3489dfdf8bd925dcf3ecfde9d8db286f6
Parents: f980b9f
Author: honma <ho...@ebay.com>
Authored: Mon Mar 2 13:34:42 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Mar 3 15:39:44 2015 +0800
----------------------------------------------------------------------
common/pom.xml | 11 ++----
.../org/apache/kylin/common/util/BasicTest.java | 35 --------------------
job/pom.xml | 5 ---
.../kylin/job/tools/HbaseStreamingInput.java | 6 +++-
pom.xml | 6 ----
5 files changed, 8 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 41d2291..97fb50a 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -17,8 +17,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kylin-common</artifactId>
@@ -64,10 +63,6 @@
<artifactId>compress-lzf</artifactId>
</dependency>
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </dependency>
<!-- Env & Test -->
<dependency>
<groupId>org.apache.commons</groupId>
@@ -132,11 +127,11 @@
<artifactId>commons-compress</artifactId>
<version>1.2</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive-hcatalog.version}</version>
<scope>provided</scope>
- </dependency>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 6358177..4a6edc8 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -26,10 +26,8 @@ import java.util.Calendar;
import java.util.concurrent.Semaphore;
import org.apache.commons.configuration.ConfigurationException;
-import org.joda.time.DateTime;
import org.junit.Ignore;
import org.junit.Test;
-import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
import org.slf4j.LoggerFactory;
/**
@@ -72,39 +70,6 @@ public class BasicTest {
@Ignore("fix it later")
public void test2() throws IOException, ConfigurationException {
- new Thread(new Runnable() {
- @Override
- public void run() {
-
- semaphore.release();
- semaphore.release();
- semaphore.release();
- try {
- System.out.println("sleeping");
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- semaphore.release();
- }
- }).start();
-
- try {
- try {
- System.out.println("sleeping");
- Thread.sleep(5000);
- int x = semaphore.drainPermits();
- System.out.println("drained " +x);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- semaphore.acquire();
- System.out.println("left " + semaphore.availablePermits());
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
}
private static String time(long t) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index 128e03c..c8fbc73 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -136,11 +136,6 @@
hbase utils like Bytes, ImmutableBytesWritable etc. -->
</dependency>
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </dependency>
-
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
index b1c7e30..ff313b4 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
@@ -65,6 +65,9 @@ public class HbaseStreamingInput {
}
public static void addData(String tableName) throws IOException {
+
+ createTable(tableName);
+
final Semaphore semaphore = new Semaphore(0);
new Thread(new Runnable() {
@Override
@@ -156,8 +159,9 @@ public class HbaseStreamingInput {
for (Result result : scanner) {
Cell cell = result.getColumnLatestCell(CF, QN);
byte[] value = cell.getValueArray();
- if (cell.getValueLength() != CELL_SIZE)
+ if (cell.getValueLength() != CELL_SIZE) {
logger.error("value size invalid!!!!!");
+ }
hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()));
rowCount++;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d6505b3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d86bcb2..74743db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,6 @@
<compress-lzf.version>1.0.3</compress-lzf.version>
<extendedset.version>1.3.4</extendedset.version>
<jetty.version>9.2.7.v20150116</jetty.version>
- <joda.version>2.7</joda.version>
<!-- REST Service -->
<spring.framework.version>3.1.2.RELEASE</spring.framework.version>
@@ -400,11 +399,6 @@
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- <version>${joda.version}</version>
- </dependency>
</dependencies>
</dependencyManagement>
[14/26] incubator-kylin git commit: Merge pull request #432 from
janzhongi/inverted-index
Posted by li...@apache.org.
Merge pull request #432 from janzhongi/inverted-index
refacotr source_table_tree & enable visulization in cubeWizard
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/82c9db24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/82c9db24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/82c9db24
Branch: refs/heads/streaming
Commit: 82c9db24bca8451a37a47cce81924df4e984d345
Parents: 10b5029 d804e60
Author: Zhong,Jian <ji...@ebay.com>
Authored: Sat Feb 28 17:42:16 2015 +0800
Committer: Zhong,Jian <ji...@ebay.com>
Committed: Sat Feb 28 17:42:16 2015 +0800
----------------------------------------------------------------------
webapp/app/js/controllers/cube.js | 0
webapp/app/js/controllers/cubeSchema.js | 8 +-
webapp/app/js/controllers/sourceMeta.js | 37 ++---
webapp/app/js/model/tableModel.js | 92 ++++++-----
webapp/app/js/services/tables.js | 0
webapp/app/js/services/tree.js | 161 ++++++++++---------
webapp/app/partials/cubes/cube_detail.html | 12 +-
webapp/app/partials/tables/source_metadata.html | 44 ++---
.../app/partials/tables/source_table_tree.html | 6 +-
webapp/bower.json | 4 +-
10 files changed, 191 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
[23/26] incubator-kylin git commit: testing streaming on hbase
Posted by li...@apache.org.
testing streaming on hbase
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f980b9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f980b9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f980b9f6
Branch: refs/heads/streaming
Commit: f980b9f67d0804964cd67685a8d5cae864209b30
Parents: f42ef83
Author: honma <ho...@ebay.com>
Authored: Mon Mar 2 09:51:04 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Mar 3 15:39:44 2015 +0800
----------------------------------------------------------------------
common/pom.xml | 11 +-
.../org/apache/kylin/common/util/BasicTest.java | 52 ++++-
job/pom.xml | 5 +
.../job/deployment/HbaseConfigPrinterCLI.java | 34 ++-
.../kylin/job/tools/HbaseStreamingInput.java | 220 +++++++++++++++++++
pom.xml | 6 +
6 files changed, 316 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 97fb50a..41d2291 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -17,7 +17,8 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kylin-common</artifactId>
@@ -63,6 +64,10 @@
<artifactId>compress-lzf</artifactId>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
<!-- Env & Test -->
<dependency>
<groupId>org.apache.commons</groupId>
@@ -127,11 +132,11 @@
<artifactId>commons-compress</artifactId>
<version>1.2</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive-hcatalog.version}</version>
<scope>provided</scope>
- </dependency>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 1fe9dfd..6358177 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -18,13 +18,18 @@
package org.apache.kylin.common.util;
-import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.concurrent.Semaphore;
import org.apache.commons.configuration.ConfigurationException;
+import org.joda.time.DateTime;
import org.junit.Ignore;
import org.junit.Test;
+import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
import org.slf4j.LoggerFactory;
/**
@@ -61,10 +66,51 @@ public class BasicTest {
public void test1() throws Exception {
}
+ final private Semaphore semaphore = new Semaphore(0);
+
@Test
@Ignore("fix it later")
public void test2() throws IOException, ConfigurationException {
- int m = 1 << 15;
- System.out.println(m);
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+
+ semaphore.release();
+ semaphore.release();
+ semaphore.release();
+ try {
+ System.out.println("sleeping");
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ semaphore.release();
+ }
+ }).start();
+
+ try {
+ try {
+ System.out.println("sleeping");
+ Thread.sleep(5000);
+ int x = semaphore.drainPermits();
+ System.out.println("drained " +x);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ semaphore.acquire();
+ System.out.println("left " + semaphore.availablePermits());
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ private static String time(long t) {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(t);
+ return dateFormat.format(cal.getTime());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index c8fbc73..128e03c 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -136,6 +136,11 @@
hbase utils like Bytes, ImmutableBytesWritable etc. -->
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
index 7a73790..15ba2c4 100644
--- a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
@@ -40,12 +40,18 @@ import org.apache.kylin.job.tools.LZOSupportnessChecker;
*/
public class HbaseConfigPrinterCLI {
public static void main(String[] args) throws IOException {
- if (args.length != 1) {
- System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-0.5.7-SNAPSHOT-job.jar org.apache.kylin.job.deployment.HadoopConfigPrinter targetFile");
- System.exit(1);
- }
- printConfigs(args[0]);
+ if (args[0].equalsIgnoreCase("printconfig"))
+ printConfigs(args[1]);
+
+ if (args[0].equalsIgnoreCase("printenv"))
+ printAllEnv();
+
+ if (args[0].equalsIgnoreCase("printprop"))
+ printAllProperties();
+
+ if (args[0].equalsIgnoreCase("printhbaseconf"))
+ printHbaseConf();
}
private static void printConfigs(String targetFile) throws IOException {
@@ -68,7 +74,23 @@ public class HbaseConfigPrinterCLI {
FileUtils.writeStringToFile(output, sb.toString());
}
- @SuppressWarnings("unused")
+ private static void printHbaseConf() {
+ Configuration conf = HBaseConfiguration.create();
+ for (Map.Entry<String, String> entry : conf) {
+ System.out.println("Key: " + entry.getKey());
+ System.out.println("Value: " + entry.getValue());
+ System.out.println();
+ }
+ }
+
+ private static void printAllProperties() {
+ for (Map.Entry<Object, Object> entry : System.getProperties().entrySet()) {
+ System.out.println("Key: " + entry.getKey());
+ System.out.println("Value: " + entry.getValue());
+ System.out.println();
+ }
+ }
+
private static void printAllEnv() {
for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
System.out.println("Key: " + entry.getKey());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
new file mode 100644
index 0000000..b1c7e30
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
@@ -0,0 +1,220 @@
+package org.apache.kylin.job.tools;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 2/13/15.
+ */
+public class HbaseStreamingInput {
+ private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class);
+
+ private static final int CELL_SIZE = 128 * 1024; // 128 KB
+ private static final byte[] CF = "F".getBytes();
+ private static final byte[] QN = "C".getBytes();
+
+ public static void createTable(String tableName) throws IOException {
+ HBaseAdmin hadmin = new HBaseAdmin(getConnection());
+
+ try {
+ boolean tableExist = hadmin.tableExists(tableName);
+ if (tableExist) {
+ logger.info("HTable '" + tableName + "' already exists");
+ return;
+ }
+
+ logger.info("Creating HTable '" + tableName + "'");
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split
+
+ HColumnDescriptor fd = new HColumnDescriptor(CF);
+ fd.setBlocksize(CELL_SIZE);
+ desc.addFamily(fd);
+ hadmin.createTable(desc);
+
+ logger.info("HTable '" + tableName + "' created");
+ } finally {
+ hadmin.close();
+ }
+ }
+
+ private static void scheduleJob(Semaphore semaphore, int interval) {
+ while (true) {
+ semaphore.release();
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void addData(String tableName) throws IOException {
+ final Semaphore semaphore = new Semaphore(0);
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ scheduleJob(semaphore, 300000);//5 minutes a batch
+ }
+ }).start();
+
+ while (true) {
+ try {
+ semaphore.acquire();
+ int waiting = semaphore.availablePermits();
+ if (waiting > 0) {
+ logger.warn("There are another " + waiting + " batches waiting to be added");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ HConnection conn = getConnection();
+ HTableInterface table = conn.getTable(tableName);
+
+ byte[] key = new byte[8 + 4];//time + id
+
+ logger.info("============================================");
+ long startTime = System.currentTimeMillis();
+ logger.info("data load start time in millis: " + startTime);
+ logger.info("data load start at " + formatTime(startTime));
+ List<Put> buffer = Lists.newArrayList();
+ for (int i = 0; i < (1 << 10); ++i) {
+ long time = System.currentTimeMillis();
+ Bytes.putLong(key, 0, time);
+ Bytes.putInt(key, 8, i);
+ Put put = new Put(key);
+ byte[] cell = randomBytes(CELL_SIZE);
+ put.add(CF, QN, cell);
+ buffer.add(put);
+ }
+ table.put(buffer);
+ table.close();
+ conn.close();
+ long endTime = System.currentTimeMillis();
+ logger.info("data load end at " + formatTime(endTime));
+ logger.info("data load time consumed: " + (endTime - startTime));
+ logger.info("============================================");
+ }
+ }
+
+ public static void randomScan(String tableName) throws IOException {
+
+ final Semaphore semaphore = new Semaphore(0);
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ scheduleJob(semaphore, 60000);//1 minutes a batch
+ }
+ }).start();
+
+ while (true) {
+ try {
+ semaphore.acquire();
+ int waiting = semaphore.drainPermits();
+ if (waiting > 0) {
+ logger.warn("Too many queries to handle! Blocking " + waiting + " sets of scan requests");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ Random r = new Random();
+ HConnection conn = getConnection();
+ HTableInterface table = conn.getTable(tableName);
+
+ long leftBound = getFirstKeyTime(table);
+ long rightBound = System.currentTimeMillis();
+
+ for (int t = 0; t < 10; ++t) {
+ long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound));
+ long end = start + 600000;//a period of 10 minutes
+ logger.info("A scan from " + formatTime(start) + " to " + formatTime(end));
+
+ Scan scan = new Scan();
+ scan.setStartRow(Bytes.toBytes(start));
+ scan.setStopRow(Bytes.toBytes(end));
+ scan.addFamily(CF);
+ ResultScanner scanner = table.getScanner(scan);
+ long hash = 0;
+ int rowCount = 0;
+ for (Result result : scanner) {
+ Cell cell = result.getColumnLatestCell(CF, QN);
+ byte[] value = cell.getValueArray();
+ if (cell.getValueLength() != CELL_SIZE)
+ logger.error("value size invalid!!!!!");
+
+ hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()));
+ rowCount++;
+ }
+ scanner.close();
+ logger.info("Scanned " + rowCount + " rows, the (meaningless) hash for the scan is " + hash);
+ }
+ table.close();
+ conn.close();
+ }
+ }
+
+ private static long getFirstKeyTime(HTableInterface table) throws IOException {
+ long startTime = 0;
+
+ Scan scan = new Scan();
+ scan.addFamily(CF);
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ Cell cell = result.getColumnLatestCell(CF, QN);
+ byte[] key = cell.getRowArray();
+ startTime = Bytes.toLong(key, cell.getRowOffset(), 8);
+ logger.info("Retrieved first record time: " + formatTime(startTime));
+ break;//only get first one
+ }
+ scanner.close();
+ return startTime;
+
+ }
+
+ private static HConnection getConnection() throws IOException {
+ return HConnectionManager.createConnection(HBaseConfiguration.create());
+ }
+
+ private static String formatTime(long time) {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(time);
+ return dateFormat.format(cal.getTime());
+ }
+
+ private static byte[] randomBytes(int lenth) {
+ byte[] bytes = new byte[lenth];
+ Random rand = new Random();
+ rand.nextBytes(bytes);
+ return bytes;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ if (args[0].equalsIgnoreCase("createtable")) {
+ createTable(args[1]);
+ } else if (args[0].equalsIgnoreCase("adddata")) {
+ addData(args[1]);
+ } else if (args[0].equalsIgnoreCase("randomscan")) {
+ randomScan(args[1]);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f980b9f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 74743db..d86bcb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@
<compress-lzf.version>1.0.3</compress-lzf.version>
<extendedset.version>1.3.4</extendedset.version>
<jetty.version>9.2.7.v20150116</jetty.version>
+ <joda.version>2.7</joda.version>
<!-- REST Service -->
<spring.framework.version>3.1.2.RELEASE</spring.framework.version>
@@ -399,6 +400,11 @@
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>${joda.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
[15/26] incubator-kylin git commit: Merge remote-tracking branch
'origin/inverted-index' into streaming
Posted by li...@apache.org.
Merge remote-tracking branch 'origin/inverted-index' into streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9c3a6067
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9c3a6067
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9c3a6067
Branch: refs/heads/streaming
Commit: 9c3a60673fa31b9c900a18850f292ff7401410da
Parents: 04d9b7d 82c9db2
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 10:00:24 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 10:00:24 2015 +0800
----------------------------------------------------------------------
webapp/app/js/controllers/cube.js | 0
webapp/app/js/controllers/cubeEdit.js | 11 +-
webapp/app/js/controllers/cubeSchema.js | 8 +-
webapp/app/js/controllers/cubes.js | 4 +-
webapp/app/js/controllers/sourceMeta.js | 37 ++---
webapp/app/js/model/tableModel.js | 92 ++++++-----
webapp/app/js/services/tables.js | 0
webapp/app/js/services/tree.js | 161 ++++++++++---------
.../cubeDesigner/advanced_settings.html | 34 ++--
.../app/partials/cubeDesigner/incremental.html | 2 +-
webapp/app/partials/cubes/cube_detail.html | 12 +-
webapp/app/partials/tables/source_metadata.html | 44 ++---
.../app/partials/tables/source_table_tree.html | 6 +-
webapp/bower.json | 4 +-
14 files changed, 222 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
[20/26] incubator-kylin git commit: Merge branch 'streaming' of
https://github.com/KylinOLAP/Kylin into streaming
Posted by li...@apache.org.
Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/172efa6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/172efa6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/172efa6a
Branch: refs/heads/streaming
Commit: 172efa6ad9d42c58ed866616c9b92b039a5a763f
Parents: 58a6f73 f42ef83
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 10:14:32 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 10:14:32 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[21/26] incubator-kylin git commit: add log
Posted by li...@apache.org.
add log
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1f2fb283
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1f2fb283
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1f2fb283
Branch: refs/heads/streaming
Commit: 1f2fb283e50b99b65766fc9a00c08494b9231d1e
Parents: 172efa6
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 11:09:58 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 11:09:58 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/common/persistence/HBaseConnection.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1f2fb283/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
index 145cada..27d9e4c 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
@@ -77,6 +77,7 @@ public class HBaseConnection {
ConnPool.put(url, connection);
}
} catch (Throwable t) {
+ logger.error("Error when open connection " + url, t);
throw new StorageException("Error when open connection " + url, t);
}
[04/26] incubator-kylin git commit: add right 'return type' when
create measure in cube KYLIN-600
Posted by li...@apache.org.
add right 'return type' when create measure in cube KYLIN-600
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0a5830ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0a5830ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0a5830ef
Branch: refs/heads/streaming
Commit: 0a5830ef96dc6c4aced85c3b2c7bbb177a764d6e
Parents: 80a5065
Author: jiazhong <ji...@ebay.com>
Authored: Fri Feb 27 16:49:20 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Fri Feb 27 16:49:20 2015 +0800
----------------------------------------------------------------------
.../kylin/rest/controller/CubeController.java | 38 +++++++++++++++-----
webapp/app/js/controllers/cubeEdit.js | 7 ++--
webapp/app/js/controllers/cubeSchema.js | 26 ++++++++++----
webapp/app/js/controllers/cubes.js | 5 ++-
webapp/app/js/filters/filter.js | 4 +--
webapp/app/partials/cubeDesigner/measures.html | 11 +++---
6 files changed, 64 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index e624d45..be91436 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -18,10 +18,14 @@
package org.apache.kylin.rest.controller;
-import com.codahale.metrics.annotation.Metered;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
@@ -55,11 +59,17 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.*;
+import com.codahale.metrics.annotation.Metered;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
/**
* CubeController is defined as Restful API entrance for UI.
@@ -348,7 +358,17 @@ public class CubeController extends BasicController {
return cubeRequest;
}
try {
- metaManager.updateDataModelDesc(modelDesc);
+
+ DataModelDesc existingModel = metaManager.getDataModelDesc(modelDesc.getName());
+ if (existingModel == null) {
+ metaManager.createDataModelDesc(modelDesc);
+ } else {
+
+ //ignore overwriting conflict checking before splict MODEL & CUBE
+ modelDesc.setLastModified(existingModel.getLastModified());
+ metaManager.updateDataModelDesc(modelDesc);
+ }
+
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/js/controllers/cubeEdit.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js
index 0e05077..b632af1 100755
--- a/webapp/app/js/controllers/cubeEdit.js
+++ b/webapp/app/js/controllers/cubeEdit.js
@@ -38,11 +38,14 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
$scope.getColumnType = function (_column,table){
var columns = $scope.getColumnsByTable(table);
+ var type;
angular.forEach(columns,function(column){
- if(_column===column.name){
- return column.type;
+ if(_column === column.name){
+ type = column.datatype;
+ return;
}
});
+ return type;
};
var ColFamily = function () {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/js/controllers/cubeSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index 4bb5762..7bad2d8 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -107,17 +107,31 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
$scope.newMeasure = null;
};
- // !count !count distinct
- $scope.measureParamValueUpdate = function(){
- if(newMeasure.function.expression!=="COUNT"&&newMeasure.function.expression!=="COUNT_DISTINCT"){
+ //map right return type for param
+ $scope.measureReturnTypeUpdate = function(){
+ if($scope.newMeasure.function.expression!=="COUNT_DISTINCT"){
var column = $scope.newMeasure.function.parameter.value;
+ var colType = $scope.getColumnType(column, $scope.metaModel.model.fact_table); // $scope.getColumnType defined in cubeEdit.js
- switch(newMeasure.function.expression){
+ switch($scope.newMeasure.function.expression){
case "SUM":
- var colType = $scope.getColumnType(column, $scope.metaModel.model.fact_table);
- $log.log(colType);
+ if(colType==="smallint"||colType==="int"||colType==="bigint"){
+ $scope.newMeasure.function.returntype= 'bigint';
+ }else{
+ $scope.newMeasure.function.returntype= 'decimal';
+ }
+ break;
+ case "MIN":
+ case "MAX":
+ $scope.newMeasure.function.returntype = colType;
+ break;
+ case "COUNT":
+ $scope.newMeasure.function.returntype = "bigint";
+ break;
+ default:
+ $scope.newMeasure.function.returntype = "";
break;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/js/controllers/cubes.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubes.js b/webapp/app/js/controllers/cubes.js
index d576955..927fda1 100755
--- a/webapp/app/js/controllers/cubes.js
+++ b/webapp/app/js/controllers/cubes.js
@@ -77,7 +77,9 @@ KylinApp
$scope.loadDetail = function (cube) {
var defer = $q.defer();
- if (!cube.detail) {
+ if(cube.detail){
+ defer.resolve(cube.detail);
+ } else {
CubeDescService.get({cube_name: cube.name}, {}, function (detail) {
if (detail.length > 0&&detail[0].hasOwnProperty("name")) {
cube.detail = detail[0];
@@ -95,6 +97,7 @@ KylinApp
}
});
}
+
return defer.promise;
};
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/js/filters/filter.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/filters/filter.js b/webapp/app/js/filters/filter.js
old mode 100644
new mode 100755
index d0d49fd..f3364c7
--- a/webapp/app/js/filters/filter.js
+++ b/webapp/app/js/filters/filter.js
@@ -108,7 +108,8 @@ KylinApp
//convert GMT+0 time to specified Timezone
return function(item,timezone,format){
- if(angular.isUndefined(item)){
+ // undefined and 0 is not necessary to show
+ if(angular.isUndefined(item)||item===0){
return "";
}
@@ -129,7 +130,6 @@ KylinApp
gmttimezone = timezone;
}
-
var localOffset = new Date().getTimezoneOffset();
var convertedMillis = item;
if(gmttimezone.indexOf("GMT+")!=-1){
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a5830ef/webapp/app/partials/cubeDesigner/measures.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/measures.html b/webapp/app/partials/cubeDesigner/measures.html
index 9a932bd..a260e28 100755
--- a/webapp/app/partials/cubeDesigner/measures.html
+++ b/webapp/app/partials/cubeDesigner/measures.html
@@ -111,6 +111,7 @@
<div class="col-xs-12 col-sm-6">
<select class="form-control"
ng-init="newMeasure.function.expression = (!!newMeasure.function.expression)?newMeasure.function.expression:cubeConfig.dftSelections.measureExpression" chosen ng-model="newMeasure.function.expression" required
+ ng-change="measureReturnTypeUpdate();"
ng-options="me as me for me in cubeConfig.measureExpressions">
<option value=""></option>
</select>
@@ -146,7 +147,7 @@
<select class="form-control" chosen
ng-if="newMeasure.function.parameter.type == 'column'"
ng-model="newMeasure.function.parameter.value"
- ng-change="measureParamValueUpdate();"
+ ng-change="measureReturnTypeUpdate();"
ng-options="columns.name as columns.name for columns in getColumnsByTable(metaModel.model.fact_table)" >
<option value="">-- Select a Fact Table Column --</option>
</select>
@@ -166,12 +167,8 @@
<option value=""></option>
</select>
<span class="font-color-default"
- ng-if="newMeasure.function.expression == 'COUNT'"
- ng-init="newMeasure.function.returntype= 'bigint' "><b> BIGINT</b>
- </span>
- <span class="font-color-default"
- ng-if="newMeasure.function.expression != 'COUNT_DISTINCT' && newMeasure.function.expression != 'COUNT' "
- ><b> {{newMeasure.function.returntype}}</b>
+ ng-if="newMeasure.function.expression != 'COUNT_DISTINCT'"
+ ><b> {{newMeasure.function.returntype | uppercase}}</b>
</span>
</div>
</div>
[17/26] incubator-kylin git commit: raise max region to 500
Posted by li...@apache.org.
raise max region to 500
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8e011db0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8e011db0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8e011db0
Branch: refs/heads/streaming
Commit: 8e011db0042d924ecae765a53cf62a5d0d8c715c
Parents: 82c9db2
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Mar 2 14:03:28 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Mar 2 14:03:28 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e011db0/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
index a677381..6bdacfd 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -43,7 +43,7 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
public static final int MEDIUM_CUT = 20; // 20 GB per region
public static final int LARGE_CUT = 100; // 100 GB per region
- public static final int MAX_REGION = 200;
+ public static final int MAX_REGION = 500;
private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
[08/26] incubator-kylin git commit: Merge pull request #431 from
janzhongi/inverted-index
Posted by li...@apache.org.
Merge pull request #431 from janzhongi/inverted-index
filter partiton column use only date type column
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/10b50297
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/10b50297
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/10b50297
Branch: refs/heads/streaming
Commit: 10b50297ea6038a6dc99554460a598c0cc4085f0
Parents: 07a02fa 8735d15
Author: Zhong,Jian <ji...@ebay.com>
Authored: Fri Feb 27 18:29:29 2015 +0800
Committer: Zhong,Jian <ji...@ebay.com>
Committed: Fri Feb 27 18:29:29 2015 +0800
----------------------------------------------------------------------
webapp/app/js/controllers/cubeEdit.js | 11 +++++--
webapp/app/js/controllers/cubes.js | 4 +--
.../cubeDesigner/advanced_settings.html | 34 +++++++++++---------
.../app/partials/cubeDesigner/incremental.html | 2 +-
4 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
[13/26] incubator-kylin git commit: refactor source_table_tree & add
'model Json format info' to cube wizard
Posted by li...@apache.org.
refactor source_table_tree & add 'model Json format info' to cube wizard
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d804e602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d804e602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d804e602
Branch: refs/heads/streaming
Commit: d804e602ad64b41bfa7c4843ed5fb2874c8497c5
Parents: 4b65f02
Author: jiazhong <ji...@ebay.com>
Authored: Sat Feb 28 17:40:18 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Sat Feb 28 17:40:18 2015 +0800
----------------------------------------------------------------------
webapp/app/js/controllers/sourceMeta.js | 37 +++-----
webapp/app/js/model/tableModel.js | 92 +++++++++++---------
webapp/app/js/services/tables.js | 0
webapp/app/partials/cubes/cube_detail.html | 10 ++-
webapp/app/partials/tables/source_metadata.html | 44 +++++-----
.../app/partials/tables/source_table_tree.html | 6 +-
6 files changed, 97 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
old mode 100644
new mode 100755
index e2f166c..6f14e83
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -21,14 +21,13 @@
KylinApp
.controller('SourceMetaCtrl', function ($scope,$cacheFactory, $q, $window, $routeParams, CubeService, $modal, TableService,$route,loadingRequest,SweetAlert,tableConfig,TableModel) {
var $httpDefaultCache = $cacheFactory.get('$http');
- $scope.selectedSrcDb = [];
- $scope.selectedSrcTable = {};
+ $scope.tableModel = TableModel;
+ $scope.tableModel.selectedSrcDb = [];
+ $scope.tableModel.selectedSrcTable = {};
$scope.window = 0.68 * $window.innerHeight;
$scope.tableConfig = tableConfig;
- $scope.hiveTbLoad={
- status:"init"
- }
+
$scope.state = { filterAttr: 'id', filterReverse:false, reverseColumn: 'id',
dimensionFilter: '', measureFilter: ''};
@@ -42,7 +41,7 @@ KylinApp
};
$scope.aceSrcTbLoaded = function (forceLoad) {
- $scope.selectedSrcDb = [];
+ $scope.tableModel.selectedSrcDb = [];
$scope.treeOptions = {
nodeChildren: "columns",
injectClasses: {
@@ -57,7 +56,7 @@ KylinApp
}
};
- $scope.selectedSrcTable = {};
+ $scope.tableModel.selectedSrcTable = {};
var defer = $q.defer();
$scope.loading = true;
@@ -93,9 +92,10 @@ KylinApp
obj.sort(innerSort);
}
+ $scope.tableModel.selectedSrcDb = [];
for (var key in tableMap) {
var tables = tableMap[key];
- $scope.selectedSrcDb.push({
+ $scope.tableModel.selectedSrcDb.push({
"name": key,
"columns": tables
});
@@ -112,25 +112,20 @@ KylinApp
$scope.aceSrcTbLoaded();
});
- $scope.$watch('hiveTbLoad.status', function (newValue, oldValue) {
- if(newValue=="success"){
- $scope.aceSrcTbLoaded(true);
- }
- });
$scope.showSelected = function (obj) {
if (obj.uuid) {
- $scope.selectedSrcTable = obj;
+ $scope.tableModel.selectedSrcTable = obj;
}
else if(obj.datatype) {
- $scope.selectedSrcTable.selectedSrcColumn = obj;
+ $scope.tableModel.selectedSrcTable.selectedSrcColumn = obj;
}
};
$scope.aceSrcTbChanged = function () {
- $scope.selectedSrcDb = [];
- $scope.selectedSrcTable = {};
+ $scope.tableModel.selectedSrcDb = [];
+ $scope.tableModel.selectedSrcTable = {};
$scope.aceSrcTbLoaded(true);
};
@@ -146,9 +141,6 @@ KylinApp
projectName:function(){
return $scope.projectModel.selectedProject;
},
- hiveTbLoad:function(){
- return $scope.hiveTbLoad;
- },
scope: function () {
return $scope;
}
@@ -156,7 +148,7 @@ KylinApp
});
};
- var ModalInstanceCtrl = function ($scope,$location, $modalInstance, tableNames, MessageService,projectName,hiveTbLoad) {
+ var ModalInstanceCtrl = function ($scope,$location, $modalInstance, tableNames, MessageService,projectName,scope) {
$scope.tableNames = "";
$scope.projectName = projectName;
$scope.cancel = function () {
@@ -195,8 +187,8 @@ KylinApp
SweetAlert.swal('Partial loaded!','The following table(s) have been successfully synchronized: ' + loadTableInfo+"\n\n Failed to synchronize following table(s):" + unloadedTableInfo, 'warning');
}
loadingRequest.hide();
+ scope.aceSrcTbLoaded(true);
- hiveTbLoad.status="success";
},function(e){
if(e.data&& e.data.exception){
var message =e.data.exception;
@@ -206,7 +198,6 @@ KylinApp
SweetAlert.swal('Oops...', "Failed to take action.", 'error');
}
loadingRequest.hide();
- hiveTbLoad.status="init";
})
}
};
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/js/model/tableModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/tableModel.js b/webapp/app/js/model/tableModel.js
old mode 100644
new mode 100755
index 3f734a1..a3bc820
--- a/webapp/app/js/model/tableModel.js
+++ b/webapp/app/js/model/tableModel.js
@@ -1,43 +1,49 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-KylinApp.service('TableModel', function() {
-
-
- this.selectProjectTables = [];
-
- this.initTables = function(){
- this.selectProjectTables = [];
- }
-
- this.addTable = function(table){
- this.selectProjectTables.push(table);
- }
-
- this.setSelectedProjectTables = function(tables) {
- this.selectProjectTables = tables;
- }
-
-
- this.selectedSrcDb = [];
-
- this.selectedSrcTable = {};
-
-
-});
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+KylinApp.service('TableModel', function() {
+
+
+ //for tables in cubeDesigner
+ this.selectProjectTables = [];
+
+ this.initTables = function(){
+ this.selectProjectTables = [];
+ }
+
+ this.addTable = function(table){
+ this.selectProjectTables.push(table);
+ }
+
+ this.setSelectedProjectTables = function(tables) {
+ this.selectProjectTables = tables;
+ }
+
+
+ // for load table page
+ this.selectedSrcDb = [];
+ this.selectedSrcTable = {};
+
+ this.init = function(){
+ this.selectedSrcDb = [];
+ this.selectedSrcTable = {};
+ }
+
+
+});
+
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/js/services/tables.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js
old mode 100644
new mode 100755
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/partials/cubes/cube_detail.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubes/cube_detail.html b/webapp/app/partials/cubes/cube_detail.html
index 6668b8d..59f2d69 100755
--- a/webapp/app/partials/cubes/cube_detail.html
+++ b/webapp/app/partials/cubes/cube_detail.html
@@ -29,8 +29,12 @@
</li>
<li class="{{cube.visiblePage=='json'? 'active':''}}"
ng-if="userService.hasRole('ROLE_ADMIN') || hasPermission(cube, 16) && !newAccess">
- <a href="" ng-click="cube.visiblePage='json';">JSON</a>
+ <a href="" ng-click="cube.visiblePage='json';">JSON(Cube)</a>
</li>
+ <li class="{{cube.visiblePage=='json_model'? 'active':''}}"
+ ng-if="userService.hasRole('ROLE_ADMIN') || hasPermission(cube, 16) && !newAccess">
+ <a href="" ng-click="cube.visiblePage='json_model';">JSON(Model)</a>
+ </li>
<li class="{{cube.visiblePage=='access'? 'active':''}}">
<a href="" ng-click="cube.visiblePage='access';listAccess(cube, 'CubeInstance');">Access</a>
</li>
@@ -64,6 +68,10 @@
style="background-color: white;border: 0px">{{angular.toJson(cleanStatus(cube.detail), true)}}</pre>
</div>
+ <div ng-show="cube.visiblePage=='json_model'" class="cube-detail">
+ <pre ng-if="!state.jsonEdit"
+ style="background-color: white;border: 0px">{{angular.toJson(cleanStatus(cube.model), true)}}</pre>
+ </div>
<div ng-show="cube.visiblePage=='graph'" id="cube_graph_{{cube.name}}" class="cube-detail cube_graph">
</div>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/partials/tables/source_metadata.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/source_metadata.html b/webapp/app/partials/tables/source_metadata.html
old mode 100644
new mode 100755
index e77185a..62b4d0f
--- a/webapp/app/partials/tables/source_metadata.html
+++ b/webapp/app/partials/tables/source_metadata.html
@@ -45,7 +45,7 @@
</div>
<!--Tab-->
<div class="col-xs-9">
- <h3 class="text-info">Table Schema:{{ selectedSrcTable.name}}</h3>
+ <h3 class="text-info">Table Schema:{{ tableModel.selectedSrcTable.name}}</h3>
<div class="tabbable nav-tabs-custom">
<ul class="nav nav-tabs">
<li class="active">
@@ -58,52 +58,52 @@
<div class="tab-content">
<!--Schema-->
<div id="schema" class="tab-pane">
- <div ng-if="selectedSrcTable.uuid" class="table-responsive">
+ <div ng-if="tableModel.selectedSrcTable.uuid" class="table-responsive">
<table class="table">
<tbody>
<tr>
<th style="width:20%">NAME</th>
- <td>{{ selectedSrcTable.name}}</td>
+ <td>{{ tableModel.selectedSrcTable.name}}</td>
</tr>
<tr>
<th>Hive DATABASE</th>
- <td>{{selectedSrcTable.database}}</td>
+ <td>{{tableModel.selectedSrcTable.database}}</td>
</tr>
<tr>
<th>SNAPSHOT TIME</th>
- <td>{{selectedSrcTable.exd.lastUpdateTime | utcToConfigTimeZone}}</td>
+ <td>{{tableModel.selectedSrcTable.exd.lastUpdateTime | utcToConfigTimeZone}}</td>
</tr>
<tr>
<th>LOCATION</th>
- <td>{{selectedSrcTable.exd.location}}</td>
+ <td>{{tableModel.selectedSrcTable.exd.location}}</td>
</tr>
<tr>
<th>INPUT FORMAT</th>
- <td>{{selectedSrcTable.exd.inputformat}}</td>
+ <td>{{tableModel.selectedSrcTable.exd.inputformat}}</td>
</tr>
<tr>
<th>OUTPUT FORMAT</th>
- <td>{{selectedSrcTable.exd.outputformat}}</td>
+ <td>{{tableModel.selectedSrcTable.exd.outputformat}}</td>
</tr>
<tr>
<th>OWNER</th>
- <td><a href="mailto:{{selectedSrcTable.exd.owner}}">{{selectedSrcTable.exd.owner}}</a></td>
+ <td><a href="mailto:{{tableModel.selectedSrcTable.exd.owner}}">{{tableModel.selectedSrcTable.exd.owner}}</a></td>
</tr>
<tr>
<th>TOTAL FILE NUMBER</th>
- <td>{{selectedSrcTable.exd.totalNumberFiles}}</td>
+ <td>{{tableModel.selectedSrcTable.exd.totalNumberFiles}}</td>
</tr>
<tr>
<th>TOTAL FILE SIZE</th>
- <td>{{selectedSrcTable.exd.totalFileSize}}</td>
+ <td>{{tableModel.selectedSrcTable.exd.totalFileSize}}</td>
</tr>
<tr>
<th>PARTITIONED</th>
- <td>{{selectedSrcTable.exd.partitioned}}</td>
+ <td>{{tableModel.selectedSrcTable.exd.partitioned}}</td>
</tr>
<tr>
<th>PARTITION COLUMNS</th>
- <td>{{selectedSrcTable.exd.partitionColumns}}</td>
+ <td>{{tableModel.selectedSrcTable.exd.partitionColumns}}</td>
</tr>
</tbody>
</table>
@@ -120,7 +120,7 @@
</span>
</div>
<div class="space-6"></div>
- <div ng-if="(selectedSrcTable.columns | filter: columnName).length>0">
+ <div ng-if="(tableModel.selectedSrcTable.columns | filter: columnName).length>0">
<table class="table table-hover table-striped list">
<thead>
<tr style="cursor: pointer">
@@ -137,26 +137,26 @@
</tr>
</thead>
- <tr ng-repeat="column in selectedSrcTable.columns | filter: columnName | orderObjectBy:state.filterAttr:state.filterReverse">
- <td style="{{(selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+ <tr ng-repeat="column in tableModel.selectedSrcTable.columns | filter: columnName | orderObjectBy:state.filterAttr:state.filterReverse">
+ <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
{{ column.id}}
</td>
- <td style="{{(selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+ <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
{{ column.name}}
</td>
- <td style="{{(selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+ <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
{{ column.datatype}}
</td>
- <td style="{{(selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
- <!--{{ selectedSrcTable.cardinality[column.name]}}-->
+ <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+ <!--{{ tableModel.selectedSrcTable.cardinality[column.name]}}-->
{{column.cardinality}}
</td>
</tr>
</table>
</div>
- <div ng-if="(selectedSrcTable.columns | filter: columnName).length == 0" no-result
+ <div ng-if="(tableModel.selectedSrcTable.columns | filter: columnName).length == 0" no-result
text="No Matched Table Column."></div>
- <div ng-if="!!!selectedSrcTable.uuid">
+ <div ng-if="!!!tableModel.selectedSrcTable.uuid">
<div no-result text="No Table Selected."></div>
</div>
</div>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d804e602/webapp/app/partials/tables/source_table_tree.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/source_table_tree.html b/webapp/app/partials/tables/source_table_tree.html
old mode 100644
new mode 100755
index 972e996..f2cd7ed
--- a/webapp/app/partials/tables/source_table_tree.html
+++ b/webapp/app/partials/tables/source_table_tree.html
@@ -34,12 +34,12 @@
<div class="space-4"></div>
<!--tree-->
<div style="width:100%; height:{{window}}px; overflow:auto;">
- <treecontrol ng-if="selectedSrcDb.length > 0" class="tree-light"
+ <treecontrol ng-if="tableModel.selectedSrcDb.length > 0" class="tree-light"
dirSelection="true"
- tree-model="selectedSrcDb"
+ tree-model="tableModel.selectedSrcDb"
options="treeOptions"
on-selection="showSelected(node)"
- selected-node="selectedSrcTable">
+ selected-node="tableModel.selectedSrcTable">
{{node.name}} {{!!(node.datatype)?'(' + trimType(node.datatype) + ')' : ''}}
</treecontrol>
</div>
[25/26] incubator-kylin git commit: refactor
Posted by li...@apache.org.
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0970d76f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0970d76f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0970d76f
Branch: refs/heads/streaming
Commit: 0970d76fd4e9d6b92e684cc64c5e97676f986132
Parents: 1f2fb28
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 16:06:58 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 16:06:58 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/streaming/KafkaConfig.java | 151 +++++++++++++++++++
.../apache/kylin/streaming/KafkaConsumer.java | 133 ++++++++++++++++
.../org/apache/kylin/streaming/Requester.java | 146 ++++++++++++++++++
.../java/org/apache/kylin/streaming/Stream.java | 57 +++++++
.../apache/kylin/streaming/StreamBuilder.java | 95 ++++++++++++
.../org/apache/kylin/streaming/TopicMeta.java | 63 ++++++++
.../apache/kylin/streaming/kafka/Consumer.java | 120 ---------------
.../kylin/streaming/kafka/KafkaConfig.java | 151 -------------------
.../apache/kylin/streaming/kafka/Requester.java | 146 ------------------
.../apache/kylin/streaming/kafka/Stream.java | 57 -------
.../kylin/streaming/kafka/StreamBuilder.java | 94 ------------
.../apache/kylin/streaming/kafka/TopicMeta.java | 63 --------
.../apache/kylin/streaming/KafkaBaseTest.java | 78 ++++++++++
.../apache/kylin/streaming/KafkaConfigTest.java | 65 ++++++++
.../kylin/streaming/KafkaConsumerTest.java | 98 ++++++++++++
.../apache/kylin/streaming/RequesterTest.java | 71 +++++++++
.../apache/kylin/streaming/TestProducer.java | 114 ++++++++++++++
.../kylin/streaming/kafka/KafkaBaseTest.java | 77 ----------
.../kylin/streaming/kafka/KafkaConfigTest.java | 64 --------
.../streaming/kafka/KafkaConsumerTest.java | 101 -------------
.../kylin/streaming/kafka/RequesterTest.java | 70 ---------
.../kylin/streaming/kafka/TestProducer.java | 115 --------------
22 files changed, 1071 insertions(+), 1058 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
new file mode 100644
index 0000000..7d0cd6b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -0,0 +1,151 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfig {
+
+ private List<Broker> brokers;
+
+ private String zookeeper;
+
+ private String topic;
+
+ private int timeout;
+
+ private int maxReadCount;
+
+ private int bufferSize;
+
+ private int partitionId;
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMaxReadCount() {
+ return maxReadCount;
+ }
+
+ public void setMaxReadCount(int maxReadCount) {
+ this.maxReadCount = maxReadCount;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public List<Broker> getBrokers() {
+ return brokers;
+ }
+
+ public void setBrokers(List<Broker> brokers) {
+ this.brokers = brokers;
+ }
+
+ public String getZookeeper() {
+ return zookeeper;
+ }
+
+ public void setZookeeper(String zookeeper) {
+ this.zookeeper = zookeeper;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public static KafkaConfig load(KafkaConfig config) {
+ KafkaConfig result = new KafkaConfig();
+ result.setBufferSize(config.getBufferSize());
+ result.setMaxReadCount(config.getMaxReadCount());
+ result.setTimeout(config.getTimeout());
+ result.setTopic(config.getTopic());
+ result.setZookeeper(config.getZookeeper());
+ result.setPartitionId(config.getPartitionId());
+ result.setBrokers(config.getBrokers());
+ return result;
+ }
+
+ public static KafkaConfig load(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ KafkaConfig result = new KafkaConfig();
+ result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
+ result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
+ result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
+ result.setTopic(properties.getProperty("topic"));
+ result.setZookeeper(properties.getProperty("zookeeper"));
+ result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
+
+ int id = 0;
+ List<Broker> brokers = Lists.newArrayList();
+ for (String str: properties.getProperty("brokers").split(",")) {
+ final String[] split = str.split(":");
+ final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
+ brokers.add(broker);
+ }
+ result.setBrokers(brokers);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
new file mode 100644
index 0000000..e45b6e4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -0,0 +1,133 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class KafkaConsumer implements Runnable {
+
+ private String topic;
+ private int partitionId;
+
+ private KafkaConfig kafkaConfig;
+ private List<Broker> replicaBrokers;
+ private AtomicLong offset = new AtomicLong();
+ private BlockingQueue<Stream> streamQueue;
+
+ private Logger logger;
+
+ public KafkaConsumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
+ this.topic = topic;
+ this.partitionId = partitionId;
+ this.kafkaConfig = kafkaConfig;
+ this.replicaBrokers = initialBrokers;
+ logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
+ streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
+ }
+
+ public BlockingQueue<Stream> getStreamQueue() {
+ return streamQueue;
+ }
+
+ private Broker getLeadBroker() {
+ final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
+ if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ replicaBrokers = partitionMetadata.replicas();
+ return partitionMetadata.leader();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ Broker leadBroker = getLeadBroker();
+ if (leadBroker == null) {
+ logger.warn("cannot find lead broker");
+ } else {
+ final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+ offset.set(lastOffset);
+ }
+ while (true) {
+ if (leadBroker == null) {
+ leadBroker = getLeadBroker();
+ }
+ if (leadBroker == null) {
+ logger.warn("cannot find lead broker");
+ continue;
+ }
+
+ final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
+ if (fetchResponse.errorCode(topic, partitionId) != 0) {
+ logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+ continue;
+ }
+ for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
+ final ByteBuffer payload = messageAndOffset.message().payload();
+ //TODO use ByteBuffer maybe
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ logger.debug("get message offset:" + messageAndOffset.offset());
+ try {
+ streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
+ } catch (InterruptedException e) {
+ logger.error("error put streamQueue", e);
+ break;
+ }
+ offset.incrementAndGet();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("consumer has encountered an error", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
new file mode 100644
index 0000000..79332af
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
@@ -0,0 +1,146 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public final class Requester {
+
+ private static final Logger logger = LoggerFactory.getLogger(Requester.class);
+
+ public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
+ SimpleConsumer consumer;
+ for (Broker broker : kafkaConfig.getBrokers()) {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ TopicMetadataResponse resp = consumer.send(req);
+ final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+ if (topicMetadatas.size() != 1) {
+ break;
+ }
+ final TopicMetadata topicMetadata = topicMetadatas.get(0);
+ if (topicMetadata.errorCode() != 0) {
+ break;
+ }
+ List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
+ @Nullable
+ @Override
+ public Integer apply(PartitionMetadata partitionMetadata) {
+ return partitionMetadata.partitionId();
+ }
+ });
+ return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+ }
+ logger.debug("cannot find topic:" + kafkaConfig.getTopic());
+ return null;
+ }
+
+ public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
+ SimpleConsumer consumer;
+ for (Broker broker : brokers) {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(topic);
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ TopicMetadataResponse resp = consumer.send(req);
+ final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+ if (topicMetadatas.size() != 1) {
+ logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
+ break;
+ }
+ final TopicMetadata topicMetadata = topicMetadatas.get(0);
+ if (topicMetadata.errorCode() != 0) {
+ logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
+ break;
+ }
+ for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
+ if (partitionMetadata.partitionId() == partitionId) {
+ return partitionMetadata;
+ }
+ }
+ }
+ logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
+ return null;
+ }
+
+ public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
+ final String clientName = "client_" + topic + "_" + partitionId;
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+ kafka.api.FetchRequest req = new FetchRequestBuilder()
+ .clientId(clientName)
+ .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+ .build();
+ return consumer.fetch(req);
+ }
+
+ public static long getLastOffset(String topic, int partitionId,
+ long whichTime, Broker broker, KafkaConfig kafkaConfig) {
+ String clientName = "client_" + topic + "_" + partitionId;
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+ return 0;
+ }
+ long[] offsets = response.offsets(topic, partitionId);
+ return offsets[0];
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
new file mode 100644
index 0000000..3cb43b6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class Stream {
+
+ private long timestamp;
+ private byte[] rawData;
+
+ public Stream(long timestamp, byte[] rawData) {
+ this.timestamp = timestamp;
+ this.rawData = rawData;
+ }
+
+ public byte[] getRawData() {
+ return rawData;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
new file mode 100644
index 0000000..005c255
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -0,0 +1,95 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by qianzhou on 2/17/15.
+ */
+public abstract class StreamBuilder implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+
+ private static final int BATCH_BUILD_BYTES_THRESHOLD = 64 * 1024;
+ private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 5 * 60 * 1000;
+
+ private BlockingQueue<Stream> streamQueue;
+ private long lastBuildTime = System.currentTimeMillis();
+ private int bytesTotal = 0;
+
+ public StreamBuilder(BlockingQueue<Stream> streamQueue) {
+ this.streamQueue = streamQueue;
+ }
+
+ protected abstract void build(List<Stream> streamsToBuild);
+
+ private void buildStream(List<Stream> streams) {
+ build(streams);
+ clearCounter();
+ }
+
+ private void clearCounter() {
+ lastBuildTime = System.currentTimeMillis();
+ bytesTotal = 0;
+ }
+
+ @Override
+ public void run() {
+ try {
+ List<Stream> streamToBuild = Lists.newArrayList();
+ clearCounter();
+ while (true) {
+ final Stream stream = streamQueue.take();
+ streamToBuild.add(stream);
+ bytesTotal += stream.getRawData().length;
+ if (bytesTotal >= BATCH_BUILD_BYTES_THRESHOLD) {
+ buildStream(streamToBuild);
+ } else if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
+ buildStream(streamToBuild);
+ } else {
+ continue;
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.error("StreamBuilder has been interrupted", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
new file mode 100644
index 0000000..b93d589
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
+ *
+ * Created by qianzhou on 2/15/15.
+ */
+public class TopicMeta {
+
+ private final String name;
+
+ private final List<Integer> partitionIds;
+
+ public TopicMeta(String name, List<Integer> partitionIds) {
+ this.name = name;
+ this.partitionIds = Collections.unmodifiableList(partitionIds);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<Integer> getPartitionIds() {
+ return partitionIds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
deleted file mode 100644
index 074ef01..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class Consumer implements Runnable {
-
- private String topic;
- private int partitionId;
-
- private KafkaConfig kafkaConfig;
- private List<Broker> replicaBrokers;
- private AtomicLong offset = new AtomicLong();
- private BlockingQueue<Stream> streamQueue;
-
- private Logger logger;
-
- public Consumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
- this.topic = topic;
- this.partitionId = partitionId;
- this.kafkaConfig = kafkaConfig;
- this.replicaBrokers = initialBrokers;
- logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
- streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
- }
-
- public BlockingQueue<Stream> getStreamQueue() {
- return streamQueue;
- }
-
- private Broker getLeadBroker() {
- final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
- replicaBrokers = partitionMetadata.replicas();
- return partitionMetadata.leader();
- } else {
- return null;
- }
- }
-
- @Override
- public void run() {
- while (true) {
- final Broker leadBroker = getLeadBroker();
- if (leadBroker == null) {
- logger.warn("cannot find lead broker");
- continue;
- }
- final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
- offset.set(lastOffset);
- final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
- if (fetchResponse.errorCode(topic, partitionId) != 0) {
- logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
- continue;
- }
- for (MessageAndOffset messageAndOffset: fetchResponse.messageSet(topic, partitionId)) {
- final ByteBuffer payload = messageAndOffset.message().payload();
- //TODO use ByteBuffer maybe
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- logger.debug("get message offset:" + messageAndOffset.offset());
- try {
- streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
- } catch (InterruptedException e) {
- logger.error("error put streamQueue", e);
- }
- offset.incrementAndGet();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
deleted file mode 100644
index 82513eb..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 3/2/15.
- */
-public class KafkaConfig {
-
- private List<Broker> brokers;
-
- private String zookeeper;
-
- private String topic;
-
- private int timeout;
-
- private int maxReadCount;
-
- private int bufferSize;
-
- private int partitionId;
-
- public int getTimeout() {
- return timeout;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public int getMaxReadCount() {
- return maxReadCount;
- }
-
- public void setMaxReadCount(int maxReadCount) {
- this.maxReadCount = maxReadCount;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public void setBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- }
-
- public List<Broker> getBrokers() {
- return brokers;
- }
-
- public void setBrokers(List<Broker> brokers) {
- this.brokers = brokers;
- }
-
- public String getZookeeper() {
- return zookeeper;
- }
-
- public void setZookeeper(String zookeeper) {
- this.zookeeper = zookeeper;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public static KafkaConfig load(KafkaConfig config) {
- KafkaConfig result = new KafkaConfig();
- result.setBufferSize(config.getBufferSize());
- result.setMaxReadCount(config.getMaxReadCount());
- result.setTimeout(config.getTimeout());
- result.setTopic(config.getTopic());
- result.setZookeeper(config.getZookeeper());
- result.setPartitionId(config.getPartitionId());
- result.setBrokers(config.getBrokers());
- return result;
- }
-
- public static KafkaConfig load(Properties properties) {
- Preconditions.checkNotNull(properties);
- KafkaConfig result = new KafkaConfig();
- result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
- result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
- result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
- result.setTopic(properties.getProperty("topic"));
- result.setZookeeper(properties.getProperty("zookeeper"));
- result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
-
- int id = 0;
- List<Broker> brokers = Lists.newArrayList();
- for (String str: properties.getProperty("brokers").split(",")) {
- final String[] split = str.split(":");
- final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
- brokers.add(broker);
- }
- result.setBrokers(brokers);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
deleted file mode 100644
index 8811695..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.*;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public final class Requester {
-
- private static final Logger logger = LoggerFactory.getLogger(Requester.class);
-
- public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
- SimpleConsumer consumer;
- for (Broker broker : kafkaConfig.getBrokers()) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp = consumer.send(req);
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- break;
- }
- List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
- @Nullable
- @Override
- public Integer apply(PartitionMetadata partitionMetadata) {
- return partitionMetadata.partitionId();
- }
- });
- return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
- }
- logger.debug("cannot find topic:" + kafkaConfig.getTopic());
- return null;
- }
-
- public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
- SimpleConsumer consumer;
- for (Broker broker : brokers) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp = consumer.send(req);
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
- break;
- }
- for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
- if (partitionMetadata.partitionId() == partitionId) {
- return partitionMetadata;
- }
- }
- }
- logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
- return null;
- }
-
- public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
- final String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
- kafka.api.FetchRequest req = new FetchRequestBuilder()
- .clientId(clientName)
- .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
- .build();
- return consumer.fetch(req);
- }
-
- public static long getLastOffset(String topic, int partitionId,
- long whichTime, Broker broker, KafkaConfig kafkaConfig) {
- String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
- return 0;
- }
- long[] offsets = response.offsets(topic, partitionId);
- return offsets[0];
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
deleted file mode 100644
index 2a0ede4..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class Stream {
-
- private long timestamp;
- private byte[] rawData;
-
- public Stream(long timestamp, byte[] rawData) {
- this.timestamp = timestamp;
- this.rawData = rawData;
- }
-
- public byte[] getRawData() {
- return rawData;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
deleted file mode 100644
index 145b5c8..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Created by qianzhou on 2/17/15.
- */
-public abstract class StreamBuilder implements Runnable {
-
- private List<BlockingQueue<Stream>> streamQueues;
- private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
- private final int batchBuildCount;
-
- public StreamBuilder(List<BlockingQueue<Stream>> streamQueues, int batchBuildCount) {
- this.streamQueues = streamQueues;
- this.batchBuildCount = batchBuildCount;
- }
-
-
- private int getEarliestStreamIndex(Stream[] streamHead) {
- long ts = Long.MAX_VALUE;
- int idx = 0;
- for (int i = 0; i < streamHead.length; i++) {
- if (streamHead[i].getTimestamp() < ts) {
- ts = streamHead[i].getTimestamp();
- idx = i;
- }
- }
- return idx;
- }
-
- protected abstract void build(List<Stream> streamsToBuild);
-
- @Override
- public void run() {
- try {
- Stream[] streamHead = new Stream[streamQueues.size()];
- for (int i = 0; i < streamQueues.size(); i++) {
- streamHead[i] = streamQueues.get(i).take();
- }
- List<Stream> streamToBuild = Lists.newArrayListWithCapacity(batchBuildCount);
- while (true) {
- if (streamToBuild.size() >= batchBuildCount) {
- build(streamToBuild);
- streamToBuild.clear();
- }
- int idx = getEarliestStreamIndex(streamHead);
- streamToBuild.add(streamHead[idx]);
- streamHead[idx] = streamQueues.get(idx).take();
- }
- } catch (InterruptedException e) {
- logger.error("", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
deleted file mode 100644
index 7822797..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
- *
- * Created by qianzhou on 2/15/15.
- */
-public class TopicMeta {
-
- private final String name;
-
- private final List<Integer> partitionIds;
-
- public TopicMeta(String name, List<Integer> partitionIds) {
- this.name = name;
- this.partitionIds = Collections.unmodifiableList(partitionIds);
- }
-
- public String getName() {
- return name;
- }
-
- public List<Integer> getPartitionIds() {
- return partitionIds;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
new file mode 100644
index 0000000..89277d2
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kylin.streaming.KafkaConfig;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public abstract class KafkaBaseTest {
+
+ protected static final Logger logger = LoggerFactory.getLogger("kafka test");
+
+ protected static ZkClient zkClient;
+
+ protected static KafkaConfig kafkaConfig;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ kafkaConfig = KafkaConfig.load(properties);
+
+ zkClient = new ZkClient(kafkaConfig.getZookeeper());
+ }
+
+
+ public static void createTopic(String topic, int partition, int replica) {
+ try {
+ AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
+ } catch (TopicExistsException e) {
+ logger.info(e.getMessage());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
new file mode 100644
index 0000000..5612763
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.streaming.KafkaConfig;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfigTest {
+
+ @Test
+ public void test() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ KafkaConfig config = KafkaConfig.load(properties);
+ assertEquals(1000, config.getMaxReadCount());
+ assertEquals(65536, config.getBufferSize());
+ assertEquals(60000, config.getTimeout());
+ assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
+ assertEquals("kafka_stream_test", config.getTopic());
+ assertEquals(0, config.getPartitionId());
+ assertEquals(1, config.getBrokers().size());
+ assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
new file mode 100644
index 0000000..91e06fc
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class KafkaConsumerTest extends KafkaBaseTest {
+
+ private TestProducer producer;
+
+ private static final int TOTAL_SEND_COUNT = 100;
+
+ @Before
+ public void before() throws IOException {
+ producer = new TestProducer(TOTAL_SEND_COUNT);
+ producer.start();
+ }
+
+ @After
+ public void after() {
+ producer.stop();
+ }
+
+ private void waitForProducerToStop(TestProducer producer) {
+ while (!producer.isStopped()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
+ final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+ List<BlockingQueue<Stream>> queues = Lists.newArrayList();
+ for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
+ KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+ queues.add(consumer.getStreamQueue());
+ executorService.execute(consumer);
+ }
+ waitForProducerToStop(producer);
+ int count = 0;
+ for (BlockingQueue<Stream> queue : queues) {
+ count += queue.size();
+ }
+ //since there will be historical data
+ assertTrue(count >= TOTAL_SEND_COUNT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
new file mode 100644
index 0000000..d54f22c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.streaming.KafkaConfig;
+import org.apache.kylin.streaming.Requester;
+import org.apache.kylin.streaming.TopicMeta;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class RequesterTest extends KafkaBaseTest {
+
+ private static final String NON_EXISTED_TOPIC = "non_existent_topic";
+
+
+
+ @AfterClass
+ public static void afterClass() {
+ }
+
+ @Test
+ public void testTopicMeta() throws Exception {
+ TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
+ assertNotNull(kafkaTopicMeta);
+ assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
+ assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
+
+ KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
+ anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
+
+ kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
+ assertTrue(kafkaTopicMeta == null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
new file mode 100644
index 0000000..368904c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import kafka.cluster.Broker;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TestProducer {
+
+ private volatile boolean stopped = false;
+
+ private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
+
+ private final int sendCount;
+
+ public TestProducer(int sendCount) {
+ this.sendCount = sendCount;
+ }
+
+ public void start() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
+
+ Properties props = new Properties();
+ props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable Broker broker) {
+ return broker.getConnectionString();
+ }
+ }), ","));
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("request.required.acks", "1");
+ ProducerConfig config = new ProducerConfig(props);
+ final Producer<String, String> producer = new Producer<String, String>(config);
+
+ final Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int count = 0;
+ while (!stopped && count < sendCount) {
+ final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
+ producer.send(message);
+ count++;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ logger.debug("totally " + count +" messages have been sent");
+ stopped = true;
+
+ }
+ });
+ thread.setDaemon(false);
+ thread.start();
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
deleted file mode 100644
index 537a15d..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.TopicExistsException;
-import org.I0Itec.zkclient.ZkClient;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public abstract class KafkaBaseTest {
-
- protected static final Logger logger = LoggerFactory.getLogger("kafka test");
-
- protected static ZkClient zkClient;
-
- protected static KafkaConfig kafkaConfig;
-
- @BeforeClass
- public static void beforeClass() throws IOException {
- final Properties properties = new Properties();
- properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
- kafkaConfig = KafkaConfig.load(properties);
-
- zkClient = new ZkClient(kafkaConfig.getZookeeper());
- }
-
-
- public static void createTopic(String topic, int partition, int replica) {
- try {
- AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
- } catch (TopicExistsException e) {
- logger.info(e.getMessage());
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
deleted file mode 100644
index 3c9bd87..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Created by qianzhou on 3/2/15.
- */
-public class KafkaConfigTest {
-
- @Test
- public void test() throws IOException {
- final Properties properties = new Properties();
- properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
- KafkaConfig config = KafkaConfig.load(properties);
- assertEquals(1000, config.getMaxReadCount());
- assertEquals(65536, config.getBufferSize());
- assertEquals(60000, config.getTimeout());
- assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
- assertEquals("kafka_stream_test", config.getTopic());
- assertEquals(0, config.getPartitionId());
- assertEquals(1, config.getBrokers().size());
- assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
deleted file mode 100644
index 4449d37..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import kafka.consumer.ConsumerConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class KafkaConsumerTest extends KafkaBaseTest {
-
- private TestProducer producer;
-
- private static final int TOTAL_SEND_COUNT = 100;
-
- @Before
- public void before() throws IOException {
- producer = new TestProducer(TOTAL_SEND_COUNT);
- producer.start();
- }
-
- @After
- public void after() {
- producer.stop();
- }
-
- private void waitForProducerToStop(TestProducer producer) {
- while (!producer.isStopped()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Test
- public void test() throws InterruptedException {
- final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
- final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
- List<BlockingQueue<Stream>> queues = Lists.newArrayList();
- for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
- Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
- queues.add(consumer.getStreamQueue());
- executorService.execute(consumer);
- }
- waitForProducerToStop(producer);
- int count = 0;
- for (BlockingQueue<Stream> queue : queues) {
- count += queue.size();
- }
- //since there will be historical data
- assertTrue(count >= TOTAL_SEND_COUNT);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
deleted file mode 100644
index 694c1fd..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.*;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class RequesterTest extends KafkaBaseTest {
-
- private static final String NON_EXISTED_TOPIC = "non_existent_topic";
-
-
-
- @AfterClass
- public static void afterClass() {
- }
-
- @Test
- public void testTopicMeta() throws Exception {
- TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
- assertNotNull(kafkaTopicMeta);
- assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
- assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
-
- KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
- anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
-
- kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
- assertTrue(kafkaTopicMeta == null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
deleted file mode 100644
index cd3b166..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TestProducer {
-
- private volatile boolean stopped = false;
-
- private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
-
- private final int sendCount;
-
- public TestProducer(int sendCount) {
- this.sendCount = sendCount;
- }
-
- public void start() throws IOException {
- final Properties properties = new Properties();
- properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
- final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
-
- Properties props = new Properties();
- props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
- @Nullable
- @Override
- public String apply(@Nullable Broker broker) {
- return broker.getConnectionString();
- }
- }), ","));
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
- ProducerConfig config = new ProducerConfig(props);
- final Producer<String, String> producer = new Producer<String, String>(config);
-
- final Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- int count = 0;
- while (!stopped && count < sendCount) {
- final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
- producer.send(message);
- count++;
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- logger.debug("totally " + count +" messages have been sent");
- stopped = true;
-
- }
- });
- thread.setDaemon(false);
- thread.start();
- }
-
- public boolean isStopped() {
- return stopped;
- }
-
- public void stop() {
- stopped = true;
- }
-}
[16/26] incubator-kylin git commit: fix
Posted by li...@apache.org.
fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c038cb4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c038cb4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c038cb4b
Branch: refs/heads/streaming
Commit: c038cb4bada510a17c9bb36a32ddbdb7ff947ac9
Parents: 9c3a606
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 11:41:56 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 11:41:56 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/streaming/kafka/Consumer.java | 3 +++
.../apache/kylin/streaming/kafka/Requester.java | 27 +++++++++++++++++++-
2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c038cb4b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
index eaee2a1..c825d4b 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -34,6 +34,7 @@
package org.apache.kylin.streaming.kafka;
+import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
@@ -93,6 +94,8 @@ public class Consumer implements Runnable {
logger.warn("cannot find lead broker");
continue;
}
+ final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, consumerConfig);
+ offset.set(lastOffset);
final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
if (fetchResponse.errorCode(topic, partitionId) != 0) {
logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c038cb4b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
index 1f14a47..f4cdd8e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -37,7 +37,9 @@ package org.apache.kylin.streaming.kafka;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
+import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import org.slf4j.Logger;
@@ -45,7 +47,9 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Created by qianzhou on 2/15/15.
@@ -114,8 +118,29 @@ public final class Requester {
SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
kafka.api.FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
- .addFetch(topic, partitionId, offset, consumerConfig.getTimeout()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+ .addFetch(topic, partitionId, offset, consumerConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
.build();
return consumer.fetch(req);
}
+
+ public static long getLastOffset(String topic, int partitionId,
+ long whichTime, Broker broker, ConsumerConfig consumerConfig) {
+ String clientName = "client_" + topic + "_" + partitionId;
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+ return 0;
+ }
+ long[] offsets = response.offsets(topic, partitionId);
+ return offsets[0];
+ }
+
+
}
[10/26] incubator-kylin git commit: Merge branch 'streaming' of
https://github.com/KylinOLAP/Kylin into streaming
Posted by li...@apache.org.
Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/04d9b7d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/04d9b7d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/04d9b7d6
Branch: refs/heads/streaming
Commit: 04d9b7d60f01f10681da03cc7525153fd27dacdd
Parents: 06e4b20 bd0b9f7
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Sat Feb 28 09:41:26 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Sat Feb 28 09:41:26 2015 +0800
----------------------------------------------------------------------
.../kylin/rest/controller/CubeController.java | 38 +++++++++++++++-----
webapp/app/js/controllers/cubeEdit.js | 12 +++++++
webapp/app/js/controllers/cubeSchema.js | 30 ++++++++++++++++
webapp/app/js/controllers/cubes.js | 5 ++-
webapp/app/js/filters/filter.js | 4 +--
webapp/app/partials/cubeDesigner/measures.html | 15 +++-----
6 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
[05/26] incubator-kylin git commit: Merge pull request #430 from
janzhongi/inverted-index
Posted by li...@apache.org.
Merge pull request #430 from janzhongi/inverted-index
fix return type issue when create measure KYLIN-600
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/07a02fa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/07a02fa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/07a02fa0
Branch: refs/heads/streaming
Commit: 07a02fa03aa485823c17552571a868fa73f91b82
Parents: 598476f 0a5830e
Author: Zhong,Jian <ji...@ebay.com>
Authored: Fri Feb 27 16:52:25 2015 +0800
Committer: Zhong,Jian <ji...@ebay.com>
Committed: Fri Feb 27 16:52:25 2015 +0800
----------------------------------------------------------------------
.../kylin/rest/controller/CubeController.java | 38 +++++++++++++++-----
webapp/app/js/controllers/cubeEdit.js | 12 +++++++
webapp/app/js/controllers/cubeSchema.js | 30 ++++++++++++++++
webapp/app/js/controllers/cubes.js | 5 ++-
webapp/app/js/filters/filter.js | 4 +--
webapp/app/partials/cubeDesigner/measures.html | 15 +++-----
6 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
[24/26] incubator-kylin git commit: change read workload for
streaming test
Posted by li...@apache.org.
change read workload for streaming test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/29b27818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/29b27818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/29b27818
Branch: refs/heads/streaming
Commit: 29b27818c866b7fd5ba7f0046dc68ca612ff505b
Parents: 1d6505b
Author: honma <ho...@ebay.com>
Authored: Tue Mar 3 15:39:05 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Mar 3 15:39:45 2015 +0800
----------------------------------------------------------------------
common/src/test/java/org/apache/kylin/common/util/BasicTest.java | 4 +---
.../java/org/apache/kylin/job/tools/HbaseStreamingInput.java | 3 ++-
2 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29b27818/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 4a6edc8..4d2a25c 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -48,7 +48,6 @@ public class BasicTest {
private void foo(Long a) {
System.out.printf("a");
-
}
private void foo(Integer b) {
@@ -64,12 +63,11 @@ public class BasicTest {
public void test1() throws Exception {
}
- final private Semaphore semaphore = new Semaphore(0);
-
@Test
@Ignore("fix it later")
public void test2() throws IOException, ConfigurationException {
+ System.out.println(512<<20);
}
private static String time(long t) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29b27818/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
index ff313b4..8479391 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
@@ -41,6 +41,7 @@ public class HbaseStreamingInput {
logger.info("Creating HTable '" + tableName + "'");
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split
+ desc.setMemStoreFlushSize(512 << 20);//512M
HColumnDescriptor fd = new HColumnDescriptor(CF);
fd.setBlocksize(CELL_SIZE);
@@ -144,7 +145,7 @@ public class HbaseStreamingInput {
long leftBound = getFirstKeyTime(table);
long rightBound = System.currentTimeMillis();
- for (int t = 0; t < 10; ++t) {
+ for (int t = 0; t < 5; ++t) {
long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound));
long end = start + 600000;//a period of 10 minutes
logger.info("A scan from " + formatTime(start) + " to " + formatTime(end));
[09/26] incubator-kylin git commit: KYLIN-609 Add Hybrid as a
federation of Cube and Inverted-index realization
Posted by li...@apache.org.
KYLIN-609 Add Hybrid as a federation of Cube and Inverted-index realization
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/06e4b203
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/06e4b203
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/06e4b203
Branch: refs/heads/streaming
Commit: 06e4b20386621ca4bdd70513da4156c3022e8c80
Parents: 7f1abe0
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Sat Feb 28 09:41:13 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Sat Feb 28 09:41:13 2015 +0800
----------------------------------------------------------------------
.../RoutingRules/RealizationPriorityRule.java | 5 ++--
.../kylin/storage/hybrid/HybridInstance.java | 28 +++++++++++---------
.../storage/hybrid/HybridTupleIterator.java | 15 ++++++++---
3 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06e4b203/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
index 6a67140..09a7dce 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
@@ -33,8 +33,9 @@ public class RealizationPriorityRule extends RoutingRule {
static Map<RealizationType, Integer> priorities = Maps.newHashMap();
static {
- priorities.put(RealizationType.CUBE, 0);
- priorities.put(RealizationType.INVERTED_INDEX, 1);
+ priorities.put(RealizationType.HYBRID, 0);
+ priorities.put(RealizationType.CUBE, 1);
+ priorities.put(RealizationType.INVERTED_INDEX, 2);
}
public static void setPriorities(Map<RealizationType, Integer> priorities) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06e4b203/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index ebba9f5..789ddc0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -53,15 +53,11 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
throw new IllegalArgumentException("Didn't find realization '" + realTimeRealization.getType() + "'" + " with name '" + realTimeRealization.getRealization() + "' in '" + name + "'");
}
-
- if (realTimeRealizationInstance.getDateRangeEnd() < historyRealizationInstance.getDateRangeEnd()) {
- throw new IllegalStateException("The real time realization's dateRangeEnd should be greater than history realization's dateRangeEnd.");
- }
}
@Override
public boolean isCapable(SQLDigest digest) {
- return historyRealizationInstance.isCapable(digest) || realTimeRealizationInstance.isCapable(digest);
+ return getHistoryRealizationInstance().isCapable(digest) || getRealTimeRealizationInstance().isCapable(digest);
}
@Override
@@ -78,17 +74,17 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
@Override
public String getFactTable() {
- return null;
+ return getHistoryRealizationInstance().getFactTable();
}
@Override
public List<TblColRef> getAllColumns() {
- return null;
+ return getHistoryRealizationInstance().getAllColumns();
}
@Override
public List<MeasureDesc> getMeasures() {
- return null;
+ return getHistoryRealizationInstance().getMeasures();
}
@Override
@@ -138,28 +134,34 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
}
public IRealization getHistoryRealizationInstance() {
+ if (historyRealizationInstance == null) {
+ this.init();
+ }
return historyRealizationInstance;
}
public IRealization getRealTimeRealizationInstance() {
+ if (realTimeRealizationInstance == null) {
+ this.init();
+ }
return realTimeRealizationInstance;
}
@Override
public long getDateRangeStart() {
- return historyRealizationInstance.getDateRangeStart();
+ return Math.min(getHistoryRealizationInstance().getDateRangeStart(), getRealTimeRealizationInstance().getDateRangeStart());
}
@Override
public long getDateRangeEnd() {
- return realTimeRealizationInstance.getDateRangeEnd();
+ return Math.max(getHistoryRealizationInstance().getDateRangeEnd(), getRealTimeRealizationInstance().getDateRangeEnd());
}
public String getModelName() {
- if(historyRealizationInstance instanceof CubeInstance) {
- return ((CubeInstance)historyRealizationInstance).getDescriptor().getModelName();
+ if (getHistoryRealizationInstance() instanceof CubeInstance) {
+ return ((CubeInstance) historyRealizationInstance).getDescriptor().getModelName();
}
- return ((IIInstance)historyRealizationInstance).getDescriptor().getModelName();
+ return ((IIInstance) historyRealizationInstance).getDescriptor().getModelName();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06e4b203/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
index daf3057..516bc58 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
@@ -19,12 +19,19 @@ public class HybridTupleIterator implements ITupleIterator {
@Override
public boolean hasNext() {
- return iterators[currentIndex].hasNext() || (currentIndex + 1 < iterators.length && iterators[currentIndex + 1].hasNext());
+ if (iterators[currentIndex].hasNext())
+ return true;
+
+ while (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
+ currentIndex++;
+ }
+
+ return iterators[currentIndex].hasNext();
}
@Override
public ITuple next() {
- if (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
+ while (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
currentIndex++;
}
@@ -33,6 +40,8 @@ public class HybridTupleIterator implements ITupleIterator {
@Override
public void close() {
-
+ for (ITupleIterator i : iterators) {
+ i.close();
+ }
}
}
[06/26] incubator-kylin git commit: Merge branch 'inverted-index'
into streaming
Posted by li...@apache.org.
Merge branch 'inverted-index' into streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bd0b9f70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bd0b9f70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bd0b9f70
Branch: refs/heads/streaming
Commit: bd0b9f700f82f29c52d8385d5f8158aea2a5636d
Parents: aa49da3 07a02fa
Author: liyang@apache.org <ya...@D-SHC-00801746.corp.ebay.com>
Authored: Fri Feb 27 10:00:24 2015 +0000
Committer: liyang@apache.org <ya...@D-SHC-00801746.corp.ebay.com>
Committed: Fri Feb 27 10:00:24 2015 +0000
----------------------------------------------------------------------
.../kylin/rest/controller/CubeController.java | 38 +++++++++++++++-----
webapp/app/js/controllers/cubeEdit.js | 12 +++++++
webapp/app/js/controllers/cubeSchema.js | 30 ++++++++++++++++
webapp/app/js/controllers/cubes.js | 5 ++-
webapp/app/js/filters/filter.js | 4 +--
webapp/app/partials/cubeDesigner/measures.html | 15 +++-----
6 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
[02/26] incubator-kylin git commit: KYLIN-609 Add Hybrid as a
federation of Cube and Inverted-index realization
Posted by li...@apache.org.
KYLIN-609 Add Hybrid as a federation of Cube and Inverted-index realization
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1c0719e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1c0719e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1c0719e2
Branch: refs/heads/streaming
Commit: 1c0719e2310208e8cce2438f93ac6b1b595705c9
Parents: a536576
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Fri Feb 27 14:24:59 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Fri Feb 27 14:24:59 2015 +0800
----------------------------------------------------------------------
.../kylin/storage/hybrid/HybridInstance.java | 22 ++++++
.../storage/hybrid/HybridStorageEngine.java | 79 +++++++++++++++++++-
.../storage/hybrid/HybridTupleIterator.java | 38 ++++++++++
3 files changed, 135 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index b550fc0..ebba9f5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -5,6 +5,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.RealizationEntry;
@@ -43,6 +45,18 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
historyRealizationInstance = registry.getRealization(historyRealization.getType(), historyRealization.getRealization());
realTimeRealizationInstance = registry.getRealization(realTimeRealization.getType(), realTimeRealization.getRealization());
+ if (historyRealizationInstance == null) {
+ throw new IllegalArgumentException("Didn't find realization '" + historyRealization.getType() + "'" + " with name '" + historyRealization.getRealization() + "' in '" + name + "'");
+ }
+
+ if (realTimeRealizationInstance == null) {
+ throw new IllegalArgumentException("Didn't find realization '" + realTimeRealization.getType() + "'" + " with name '" + realTimeRealization.getRealization() + "' in '" + name + "'");
+ }
+
+
+ if (realTimeRealizationInstance.getDateRangeEnd() < historyRealizationInstance.getDateRangeEnd()) {
+ throw new IllegalStateException("The real time realization's dateRangeEnd should be greater than history realization's dateRangeEnd.");
+ }
}
@Override
@@ -140,4 +154,12 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
public long getDateRangeEnd() {
return realTimeRealizationInstance.getDateRangeEnd();
}
+
+ public String getModelName() {
+ if(historyRealizationInstance instanceof CubeInstance) {
+ return ((CubeInstance)historyRealizationInstance).getDescriptor().getModelName();
+ }
+
+ return ((IIInstance)historyRealizationInstance).getDescriptor().getModelName();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
index 566a4f5..8a50b7e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -1,13 +1,24 @@
package org.apache.kylin.storage.hybrid;
-import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.filter.*;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.storage.IStorageEngine;
import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageEngineFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+
/**
* Created by shaoshi on 2/13/15.
*/
@@ -27,10 +38,70 @@ public class HybridStorageEngine implements IStorageEngine {
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
- long conditionBoundry = hybridInstance.getHistoryRealizationInstance().getDateRangeEnd();
+ long boundary = hybridInstance.getHistoryRealizationInstance().getDateRangeEnd();
+ FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd");
+ String boundaryDate = format.format(boundary);
+
+ Collection<TblColRef> filterCols = sqlDigest.filterColumns;
+
+ String modelName = hybridInstance.getModelName();
+
+ MetadataManager metaMgr = getMetadataManager();
+
+ DataModelDesc modelDesc = metaMgr.getDataModelDesc(modelName);
+
+ String partitionColFull = modelDesc.getPartitionDesc().getPartitionDateColumn();
+
+ String partitionTable = partitionColFull.substring(0, partitionColFull.lastIndexOf("."));
+ String partitionCol = partitionColFull.substring(partitionColFull.lastIndexOf(".") + 1);
+
+
+ TableDesc factTbl = metaMgr.getTableDesc(partitionTable);
+ ColumnDesc columnDesc = factTbl.findColumnByName(partitionCol);
+ TblColRef partitionColRef = new TblColRef(columnDesc);
+
+ // search the historic realization
+
+ ITupleIterator iterator1 = searchRealization(hybridInstance.getHistoryRealizationInstance(), context, sqlDigest);
+
+
+ // now search the realtime realization, need add the boundary condition
- TupleFilter filter = sqlDigest.filter;
+ CompareTupleFilter compareTupleFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
+
+ ColumnTupleFilter columnTupleFilter = new ColumnTupleFilter(partitionColRef);
+ ConstantTupleFilter constantTupleFilter = new ConstantTupleFilter(boundaryDate);
+ compareTupleFilter.addChild(columnTupleFilter);
+ compareTupleFilter.addChild(constantTupleFilter);
+
+ LogicalTupleFilter logicalTupleFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+
+ logicalTupleFilter.addChild(sqlDigest.filter);
+ logicalTupleFilter.addChild(compareTupleFilter);
+
+ sqlDigest.filter = logicalTupleFilter;
+
+ if (!sqlDigest.filterColumns.contains(partitionColRef)) {
+ sqlDigest.filterColumns.add(partitionColRef);
+ }
+
+ if (!sqlDigest.allColumns.contains(partitionColRef)) {
+ sqlDigest.allColumns.add(partitionColRef);
+ }
+
+ ITupleIterator iterator2 = searchRealization(hybridInstance.getRealTimeRealizationInstance(), context, sqlDigest);
+
+
+ return new HybridTupleIterator(new ITupleIterator[]{iterator1, iterator2});
+ }
+
+ private ITupleIterator searchRealization(IRealization realization, StorageContext context, SQLDigest sqlDigest) {
+
+ IStorageEngine storageEngine = StorageEngineFactory.getStorageEngine(realization);
+ return storageEngine.search(context, sqlDigest);
+ }
- return null;
+ private MetadataManager getMetadataManager() {
+ return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
new file mode 100644
index 0000000..daf3057
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
@@ -0,0 +1,38 @@
+package org.apache.kylin.storage.hybrid;
+
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+
+/**
+ * Created by shaoshi on 2/27/15.
+ */
+public class HybridTupleIterator implements ITupleIterator {
+
+ private ITupleIterator[] iterators;
+
+ private int currentIndex;
+
+ public HybridTupleIterator(ITupleIterator[] iterators) {
+ this.iterators = iterators;
+ currentIndex = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterators[currentIndex].hasNext() || (currentIndex + 1 < iterators.length && iterators[currentIndex + 1].hasNext());
+ }
+
+ @Override
+ public ITuple next() {
+ if (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
+ currentIndex++;
+ }
+
+ return iterators[currentIndex].next();
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
[07/26] incubator-kylin git commit: filter partiton column use only
date type column
Posted by li...@apache.org.
filter partiton column use only date type column
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8735d15e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8735d15e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8735d15e
Branch: refs/heads/streaming
Commit: 8735d15e76352f94078244b5d5e909ab697f2841
Parents: 0a5830e
Author: jiazhong <ji...@ebay.com>
Authored: Fri Feb 27 18:28:06 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Fri Feb 27 18:28:06 2015 +0800
----------------------------------------------------------------------
webapp/app/js/controllers/cubeEdit.js | 11 +++++--
webapp/app/js/controllers/cubes.js | 4 +--
.../cubeDesigner/advanced_settings.html | 34 +++++++++++---------
.../app/partials/cubeDesigner/incremental.html | 2 +-
4 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8735d15e/webapp/app/js/controllers/cubeEdit.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js
index b632af1..21a666e 100755
--- a/webapp/app/js/controllers/cubeEdit.js
+++ b/webapp/app/js/controllers/cubeEdit.js
@@ -26,16 +26,23 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
$scope.cubeMode = absUrl.indexOf("/cubes/add")!=-1?'addNewCube':absUrl.indexOf("/cubes/edit")!=-1?'editExistCube':'default';
- $scope.getColumnsByTable = function (name) {
+ $scope.getColumnsByTable = function (tableName) {
var temp = [];
angular.forEach(TableModel.selectProjectTables, function (table) {
- if (table.name == name) {
+ if (table.name == tableName) {
temp = table.columns;
}
});
return temp;
};
+ $scope.getPartitonColumns = function(tableName){
+ var columns = _.filter($scope.getColumnsByTable(tableName),function(column){
+ return column.datatype==="date";
+ });
+ return columns;
+ };
+
$scope.getColumnType = function (_column,table){
var columns = $scope.getColumnsByTable(table);
var type;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8735d15e/webapp/app/js/controllers/cubes.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubes.js b/webapp/app/js/controllers/cubes.js
index 927fda1..35266a4 100755
--- a/webapp/app/js/controllers/cubes.js
+++ b/webapp/app/js/controllers/cubes.js
@@ -360,9 +360,9 @@ KylinApp
}
});
-var jobSubmitCtrl = function ($scope, $modalInstance, CubeService, MessageService, $location, cube,metaModel, buildType,SweetAlert,loadingRequest) {
+var jobSubmitCtrl = function ($scope, $modalInstance, CubeService, MessageService, $location, cube,MetaModel, buildType,SweetAlert,loadingRequest) {
$scope.cube = cube;
- $scope.metaModel = metaModel;
+ $scope.metaModel = MetaModel;
$scope.jobBuildRequest = {
buildType: buildType,
startTime: 0,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8735d15e/webapp/app/partials/cubeDesigner/advanced_settings.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html
old mode 100644
new mode 100755
index 5c65886..b0d4f4f
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -95,9 +95,9 @@
<tr>
<th>ID</th>
<th>Column</th>
- <th>Length</th>
- <th>Dictionary</th>
<th>Mandatory</th>
+ <th>Dictionary</th>
+ <th>Length</th>
<th ng-if="state.mode=='edit'"></th>
</tr>
@@ -115,13 +115,17 @@
<span ng-if="state.mode=='view'">{{rowkey_column.column}}</span>
</td>
<td>
- <!--Column Length -->
- <input type="text" class="form-control" placeholder="Column Length.." ng-if="state.mode=='edit'"
- tooltip="rowkey column length.." tooltip-trigger="focus"
- ng-model="rowkey_column.length" class="form-control">
+ <!-- Mandatory -->
+ <button type="button " ng-if="state.mode=='edit'"
+ class="btn btn-xs btn-default {{rowkey_column.mandatory? 'active':''}}"
+ ng-model="rowkey_column.mandatory"
+ btn-checkbox btn-checkbox-true="true" btn-checkbox-false="false">
+ {{rowkey_column.mandatory? 'Y':'N'}}
+ </button>
- <span ng-if="state.mode=='view'">{{rowkey_column.length}}</span>
+ <span ng-if="state.mode=='view'">{{rowkey_column.mandatory? 'Y':'N'}}</span>
</td>
+
<td>
<!--Use Dictionary-->
<div>
@@ -133,17 +137,17 @@
<span ng-if="state.mode=='view'">{{rowkey_column.dictionary}}</span>
</div>
</td>
+
<td>
- <!-- Mandatory -->
- <button type="button " ng-if="state.mode=='edit'"
- class="btn btn-xs btn-default {{rowkey_column.mandatory? 'active':''}}"
- ng-model="rowkey_column.mandatory"
- btn-checkbox btn-checkbox-true="true" btn-checkbox-false="false">
- {{rowkey_column.mandatory? 'Y':'N'}}
- </button>
+ <!--Column Length -->
+ <input type="text" class="form-control" placeholder="Column Length.." ng-if="state.mode=='edit'"
+ tooltip="rowkey column length.." tooltip-trigger="focus"
+ ng-model="rowkey_column.length" class="form-control">
- <span ng-if="state.mode=='view'">{{rowkey_column.mandatory? 'Y':'N'}}</span>
+ <span ng-if="state.mode=='view'">{{rowkey_column.length}}</span>
</td>
+
+
<td ng-if="state.mode=='edit'">
<button class="btn btn-xs btn-info"
ng-click="removeElement(cubeMetaFrame.rowkey.rowkey_columns, rowkey_column)"><i
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8735d15e/webapp/app/partials/cubeDesigner/incremental.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/incremental.html b/webapp/app/partials/cubeDesigner/incremental.html
index 496cc36..365ef43 100755
--- a/webapp/app/partials/cubeDesigner/incremental.html
+++ b/webapp/app/partials/cubeDesigner/incremental.html
@@ -46,7 +46,7 @@
ng-required="metaModel.model.partition_desc.partition_date_start"
ng-model="metaModel.model.partition_desc.partition_date_column"
ng-if="state.mode=='edit'"
- ng-options="metaModel.model.fact_table+'.'+columns.name as metaModel.model.fact_table+'.'+columns.name for columns in getColumnsByTable(metaModel.model.fact_table)" >
+ ng-options="metaModel.model.fact_table+'.'+columns.name as metaModel.model.fact_table+'.'+columns.name for columns in getPartitonColumns(metaModel.model.fact_table)" >
<option value=""></option>
</select>
<span ng-if="state.mode=='view'">
[12/26] incubator-kylin git commit: enable visulization in cubeWizard
Posted by li...@apache.org.
enable visulization in cubeWizard
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4b65f02b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4b65f02b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4b65f02b
Branch: refs/heads/streaming
Commit: 4b65f02bf0d6a7abc7829da5a376724d2ad63d9f
Parents: 504f6bf
Author: jiazhong <ji...@ebay.com>
Authored: Sat Feb 28 15:39:30 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Sat Feb 28 15:39:30 2015 +0800
----------------------------------------------------------------------
webapp/app/js/services/tree.js | 106 +++++++++++++++++++-----------------
1 file changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b65f02b/webapp/app/js/services/tree.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tree.js b/webapp/app/js/services/tree.js
index 0eaf5e7..0a565af 100755
--- a/webapp/app/js/services/tree.js
+++ b/webapp/app/js/services/tree.js
@@ -69,62 +69,68 @@ KylinApp.service('CubeGraphService', function () {
};
}
- //if (dimension.join && dimension.join.primary_key)
- //{
- // angular.forEach(dimension.join.primary_key, function(pk, index){
- // for (var i = 0; i < dimensionNode._children.length; i++) {
- // if(dimensionNode._children[i].name == pk)
- // break;
- // }
- // if(i == dimensionNode._children.length) {
- // dimensionNode._children.push({
- // "type": "column",
- // "name": pk
- // });
- // }
- //
- // });
- //}
- //
- //if (dimension.derived)
- //{
- // angular.forEach(dimension.derived, function(derived, index){
- // for (var i = 0; i < dimensionNode._children.length; i++) {
- // if(dimensionNode._children[i].name == derived)
- // break;
- // }
- // if(i == dimensionNode._children.length) {
- // dimensionNode._children.push({
- // "type": "column",
- // "name": derived + "(DERIVED)"
- // });
- // }
- // });
- //}
- //
- //if (dimension.hierarchy)
- //{
- // angular.forEach(dimension.hierarchy, function(hierarchy, index){
- // for (var i = 0; i < dimensionNode._children.length; i++) {
- // if(dimensionNode._children[i].name == hierarchy)
- // break;
- // }
- // if(i == dimensionNode._children.length) {
- // dimensionNode._children.push({
- // "type": "column",
- // "name": hierarchy.column + "(HIERARCHY)"
- // });
- // }
- // });
- //}
-
if(j == graphData.children.length) {
graphData.children.push(dimensionNode);
}
}
});
- cube.graph.columnsCount = 0;
+
+ angular.forEach(cube.detail.dimensions, function (dimension, index) {
+ // for dimension on lookup table
+ if(cube.model.fact_table!==dimension.table){
+ var lookup = _.find(graphData.children,function(item){
+ return item.name === dimension.table;
+ });
+
+ angular.forEach(lookup.join.primary_key, function(pk, index){
+ for (var i = 0; i < lookup._children.length; i++) {
+ if(lookup._children[i].name == pk)
+ break;
+ }
+ if(i == lookup._children.length) {
+ lookup._children.push({
+ "type": "column",
+ "name": pk
+ });
+ }
+ });
+
+ if (dimension.derived&&dimension.derived.length)
+ {
+ angular.forEach(dimension.derived, function(derived, index){
+ for (var i = 0; i < lookup._children.length; i++) {
+ if(lookup._children[i].name == derived)
+ break;
+ }
+ if(i == lookup._children.length) {
+ lookup._children.push({
+ "type": "column",
+ "name": derived + "(DERIVED)"
+ });
+ }
+ });
+ }
+
+ if (dimension.hierarchy)
+ {
+ angular.forEach(dimension.column, function(hierarchy, index){
+ for (var i = 0; i < lookup._children.length; i++) {
+ if(lookup._children[i].name == hierarchy)
+ break;
+ }
+ if(i == lookup._children.length) {
+ lookup._children.push({
+ "type": "column",
+ "name": hierarchy + "(HIERARCHY)"
+ });
+ }
+ });
+ }
+
+ };
+ });
+ cube.graph.columnsCount = 0;
cube.graph.tree = tree;
cube.graph.root = graphData;
cube.graph.svg = svg;
[03/26] incubator-kylin git commit: Merge branch 'streaming' of
https://github.com/KylinOLAP/Kylin into streaming
Posted by li...@apache.org.
Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7f1abe08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7f1abe08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7f1abe08
Branch: refs/heads/streaming
Commit: 7f1abe081d8152fcefc8a6a7030ac26b31ebcc4b
Parents: 1c0719e aa49da3
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Fri Feb 27 14:25:40 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Fri Feb 27 14:25:40 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 10 +-
.../org/apache/kylin/common/util/ClassUtil.java | 53 ++++++++
.../apache/kylin/common/util/ClasspathUtil.java | 45 -------
.../common/persistence/ResourceToolTest.java | 4 +-
.../common/util/AbstractKylinTestCase.java | 6 +-
.../kylin/common/util/BasicHadoopTest.java | 2 +-
.../common/util/HBaseMetadataTestCase.java | 2 +-
.../common/util/LocalFileMetadataTestCase.java | 4 +-
.../kylin/dict/DictionaryInfoSerializer.java | 3 +-
.../org/apache/kylin/dict/TrieDictionary.java | 3 +-
.../main/java/org/apache/kylin/dict/Util.java | 44 -------
.../apache/kylin/job/CubeMetadataUpgrade.java | 2 +-
.../kylin/job/common/HadoopShellExecutable.java | 5 +-
.../kylin/job/common/MapReduceExecutable.java | 35 +++---
.../kylin/job/engine/JobEngineConfig.java | 2 +-
.../kylin/job/hadoop/AbstractHadoopJob.java | 2 +-
.../kylin/job/manager/ExecutableManager.java | 28 +++--
.../kylin/job/BuildCubeWithEngineTest.java | 6 +-
.../apache/kylin/job/BuildIIWithEngineTest.java | 4 +-
.../org/apache/kylin/job/ExportHBaseData.java | 2 +-
.../org/apache/kylin/job/ImportHBaseData.java | 2 +-
.../apache/kylin/job/SampleCubeSetupTest.java | 4 +-
.../kylin/job/tools/CubeMigrationTests.java | 4 +-
.../java/org/apache/kylin/rest/DebugTomcat.java | 2 +-
webapp/app/js/controllers/cubeEdit.js | 17 +--
webapp/app/js/controllers/cubeSchema.js | 2 +-
webapp/app/js/controllers/cubes.js | 122 ++++++++++---------
webapp/app/js/model/cubeListModel.js | 0
webapp/app/js/model/jobListModel.js | 2 +-
.../app/partials/cubeDesigner/incremental.html | 2 +-
webapp/app/partials/cubeDesigner/measures.html | 2 +-
webapp/grunt.json | 4 +-
32 files changed, 201 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
[26/26] incubator-kylin git commit: Merge branch 'streaming' of
https://github.com/KylinOLAP/Kylin into streaming
Posted by li...@apache.org.
Merge branch 'streaming' of https://github.com/KylinOLAP/Kylin into streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/82540950
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/82540950
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/82540950
Branch: refs/heads/streaming
Commit: 825409504f8b34c098d89392fa3070eb4641b0f7
Parents: 0970d76 29b2781
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 16:07:04 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 16:07:04 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BasicTest.java | 17 +-
.../job/deployment/HbaseConfigPrinterCLI.java | 34 ++-
.../kylin/job/tools/HbaseStreamingInput.java | 225 +++++++++++++++++++
3 files changed, 266 insertions(+), 10 deletions(-)
----------------------------------------------------------------------