You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:43 UTC
[06/50] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9a1c4cb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9a1c4cb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9a1c4cb6
Branch: refs/heads/streaming-localdict
Commit: 9a1c4cb6b3dcb967ab017c23de76cb910a103cb9
Parents: 71324f4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Mar 26 17:49:39 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Mar 26 17:49:39 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/invertedindex/IIInstance.java | 12 +++
.../apache/kylin/invertedindex/IIManager.java | 17 ++--
.../invertedindex/index/BatchSliceBuilder.java | 6 +-
.../org/apache/kylin/streaming/KafkaConfig.java | 22 ++--
.../apache/kylin/streaming/KafkaConsumer.java | 17 ++--
.../kylin/streaming/StreamingBootstrap.java | 102 +++++++++++++++++++
.../apache/kylin/streaming/StreamingCLI.java | 70 +++++++++++++
.../kylin/streaming/KafkaConsumerTest.java | 2 +-
8 files changed, 210 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index 7684699..fd300e0 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonManagedReference;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
@@ -79,6 +80,9 @@ public class IIInstance extends RootPersistentEntity implements IRealization {
@JsonProperty("segments")
private List<IISegment> segments = new ArrayList<IISegment>();
+ @JsonProperty("stream_offset")
+ private List<Long> streamOffsets = Lists.newArrayList();
+
@JsonProperty("create_time_utc")
private long createTimeUTC;
@@ -357,4 +361,12 @@ public class IIInstance extends RootPersistentEntity implements IRealization {
public void setCost(int cost) {
this.cost = cost;
}
+
+ public List<Long> getStreamOffsets() {
+ return streamOffsets;
+ }
+
+ public void setStreamOffsets(List<Long> streamOffsets) {
+ this.streamOffsets = streamOffsets;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 6ebfbf8..b086d5d 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -240,19 +240,18 @@ public class IIManager implements IRealizationProvider {
}
private String generateStorageLocation() {
- String namePrefix = IRealizationConstants.IIHbaseStorageLocationPrefix;
- String tableName = "";
- do {
- StringBuffer sb = new StringBuffer();
- sb.append(namePrefix);
+ while (true) {
+ StringBuilder sb = new StringBuilder(IRealizationConstants.IIHbaseStorageLocationPrefix);
for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
int idx = (int) (Math.random() * ALPHA_NUM.length());
sb.append(ALPHA_NUM.charAt(idx));
}
- tableName = sb.toString();
- } while (this.usedStorageLocation.contains(tableName));
-
- return tableName;
+ if (usedStorageLocation.contains(sb.toString())) {
+ continue;
+ } else {
+ return sb.toString();
+ }
+ }
}
private void loadAllIIInstance() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
index 94b70c1..6ba328c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
@@ -86,11 +86,7 @@ public class BatchSliceBuilder {
}
private long increaseSliceTimestamp(long timestamp) {
- if (timestamp < sliceTimestamp) {
- throw new IllegalStateException();
- }
-
- if (timestamp == sliceTimestamp) {
+ if (timestamp <= sliceTimestamp) {
return ++timestamp; // ensure slice timestamp increases
} else {
return timestamp;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index b22c7e0..5194e9d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -76,10 +76,8 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("bufferSize")
private int bufferSize;
- @JsonProperty("iiDesc")
- private String iiDesc;
-
- private int partitionId;
+ @JsonProperty("iiName")
+ private String iiName;
public int getTimeout() {
return timeout;
@@ -121,14 +119,6 @@ public class KafkaConfig extends RootPersistentEntity {
this.topic = topic;
}
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
public void setBrokerConfigs(List<BrokerConfig> brokerConfigs) {
this.brokerConfigs = brokerConfigs;
}
@@ -143,6 +133,14 @@ public class KafkaConfig extends RootPersistentEntity {
});
}
+ public String getIiName() {
+ return iiName;
+ }
+
+ public void setIiName(String iiName) {
+ this.iiName = iiName;
+ }
+
public String getName() {
return name;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 42a0f1f..910041c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -58,15 +58,16 @@ public abstract class KafkaConsumer implements Runnable {
private KafkaConfig kafkaConfig;
private List<Broker> replicaBrokers;
- private AtomicLong offset = new AtomicLong();
+ private long offset;
private BlockingQueue<Stream> streamQueue;
private Logger logger;
- public KafkaConsumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
+ public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
this.topic = topic;
this.partitionId = partitionId;
this.kafkaConfig = kafkaConfig;
+ offset = startOffset;
this.replicaBrokers = initialBrokers;
logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
@@ -90,12 +91,6 @@ public abstract class KafkaConsumer implements Runnable {
public void run() {
try {
Broker leadBroker = getLeadBroker();
- if (leadBroker == null) {
- logger.warn("cannot find lead broker");
- } else {
- final long lastOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
- offset.set(lastOffset);
- }
while (true) {
if (leadBroker == null) {
leadBroker = getLeadBroker();
@@ -105,9 +100,9 @@ public abstract class KafkaConsumer implements Runnable {
continue;
}
- final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
+ final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaConfig);
if (fetchResponse.errorCode(topic, partitionId) != 0) {
- logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+ logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
continue;
}
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
@@ -117,7 +112,7 @@ public abstract class KafkaConsumer implements Runnable {
logger.error("error put streamQueue", e);
break;
}
- offset.incrementAndGet();
+ offset++;
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
new file mode 100644
index 0000000..4528a72
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
@@ -0,0 +1,102 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.PartitionMetadata;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.invertedindex.IIDescManager;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by qianzhou on 3/26/15.
+ */
+public class StreamingBootstrap {
+
+ private static KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ private static StreamManager streamManager = StreamManager.getInstance(kylinConfig);
+ private static IIManager iiManager = IIManager.getInstance(kylinConfig);
+ private static IIDescManager iiDescManager = IIDescManager.getInstance(kylinConfig);
+
+
+ private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
+ final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+ if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ return partitionMetadata.leader();
+ } else {
+ return null;
+ }
+ }
+
+ public static void startStreaming(String streamingConf, int partitionId) throws Exception {
+ final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
+ Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
+ final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
+ Preconditions.checkNotNull(ii);
+
+ final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+ Preconditions.checkState(leadBroker != null, "cannot find lead broker");
+ final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+ long streamOffset = ii.getStreamOffsets().get(partitionId);
+ if (streamOffset < earliestOffset) {
+ streamOffset = earliestOffset;
+ }
+
+
+ KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), 0, streamOffset, kafkaConfig.getBrokers(), kafkaConfig) {
+ @Override
+ protected void consume(long offset, ByteBuffer payload) throws Exception {
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ getStreamQueue().put(new Stream(offset, bytes));
+ }
+ };
+ final IIDesc desc = ii.getDescriptor();
+ Executors.newSingleThreadExecutor().execute(consumer);
+ while (true) {
+ final Stream stream = consumer.getStreamQueue().poll();
+ if (stream != null) {
+ System.out.println("offset:" + stream.getOffset() + " content:" + new String(stream.getRawData()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
new file mode 100644
index 0000000..70290f1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
@@ -0,0 +1,70 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by qianzhou on 3/26/15.
+ */
+public class StreamingCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
+
+ public static void main(String[] args) {
+ try {
+ if (args.length < 2) {
+ printArgsError(args);
+ return;
+ }
+ if (args[0].equals("start")) {
+ String kafkaConfName = args[1];
+ StreamingBootstrap.startStreaming(kafkaConfName, 0);
+ } else if (args.equals("stop")) {
+
+ } else {
+ printArgsError(args);
+ }
+ } catch (Exception e) {
+ }
+ }
+
+ private static void printArgsError(String[] args) {
+ logger.warn("invalid args:" + StringUtils.join(args, " "));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
index c824c48..337dfc7 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -90,7 +90,7 @@ public class KafkaConsumerTest extends KafkaBaseTest {
final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
List<BlockingQueue<Stream>> queues = Lists.newArrayList();
for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
- KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig) {
+ KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaConfig.getBrokers(), kafkaConfig) {
@Override
protected void consume(long offset, ByteBuffer payload) throws Exception {
//TODO use ByteBuffer maybe