You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:54 UTC
[17/50] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2b5495ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2b5495ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2b5495ce
Branch: refs/heads/streaming-localdict
Commit: 2b5495ce1debe21be361e942428cfff0bd1dff36
Parents: c3ff4f4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 27 10:05:20 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 27 10:05:20 2015 +0800
----------------------------------------------------------------------
.../invertedindex/index/BatchSliceBuilder.java | 2 +-
.../kylin/job/streaming/StreamingBootstrap.java | 117 +++++++++++++++++
.../kylin/job/streaming/StreamingCLI.java | 71 ++++++++++
.../apache/kylin/job/IIStreamBuilderTest.java | 2 +-
pom.xml | 1 +
.../apache/kylin/streaming/KafkaRequester.java | 128 +++++++++++--------
.../kylin/streaming/StreamingBootstrap.java | 109 ----------------
.../apache/kylin/streaming/StreamingCLI.java | 71 ----------
8 files changed, 265 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
index 6ba328c..037dd6c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
@@ -87,7 +87,7 @@ public class BatchSliceBuilder {
private long increaseSliceTimestamp(long timestamp) {
if (timestamp <= sliceTimestamp) {
- return ++timestamp; // ensure slice timestamp increases
+ return sliceTimestamp+1; // ensure slice timestamp increases
} else {
return timestamp;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
new file mode 100644
index 0000000..ddaae29
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -0,0 +1,117 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job.streaming;
+
+import com.google.common.base.Preconditions;
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.PartitionMetadata;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.streaming.*;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Created by qianzhou on 3/26/15.
+ */
+public class StreamingBootstrap {
+
+ private KylinConfig kylinConfig;
+ private StreamManager streamManager;
+ private IIManager iiManager;
+
+ public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
+ return new StreamingBootstrap(kylinConfig);
+ }
+
+ private StreamingBootstrap(KylinConfig kylinConfig) {
+ this.kylinConfig = kylinConfig;
+ this.streamManager = StreamManager.getInstance(kylinConfig);
+ this.iiManager = IIManager.getInstance(kylinConfig);
+ }
+
+ private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
+ final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+ if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ return partitionMetadata.leader();
+ } else {
+ return null;
+ }
+ }
+
+ public void startStreaming(String streamingConf, int partitionId) throws Exception {
+ final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
+ Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
+ final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
+ Preconditions.checkNotNull(ii);
+ Preconditions.checkArgument(ii.getSegments().size() > 0);
+ final IISegment iiSegment = ii.getSegments().get(0);
+
+ final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+ Preconditions.checkState(leadBroker != null, "cannot find lead broker");
+ final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+ long streamOffset = ii.getStreamOffsets().get(partitionId);
+ if (streamOffset < earliestOffset) {
+ streamOffset = earliestOffset;
+ }
+
+
+ KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), 0, streamOffset, kafkaConfig.getBrokers(), kafkaConfig) {
+ @Override
+ protected void consume(long offset, ByteBuffer payload) throws Exception {
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ getStreamQueue().put(new Stream(offset, bytes));
+ }
+ };
+ final IIDesc desc = ii.getDescriptor();
+
+ Executors.newSingleThreadExecutor().submit(consumer);
+ final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(), iiSegment.getStorageLocationIdentifier(), desc, partitionId);
+ task.setStreamParser(JsonStreamParser.instance);
+ final Future<?> future = Executors.newSingleThreadExecutor().submit(task);
+ future.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
new file mode 100644
index 0000000..8813cb3
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job.streaming;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by qianzhou on 3/26/15.
+ */
+public class StreamingCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
+
+ public static void main(String[] args) {
+ try {
+ if (args.length < 2) {
+ printArgsError(args);
+ return;
+ }
+ if (args[0].equals("start")) {
+ String kafkaConfName = args[1];
+ StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).startStreaming(kafkaConfName, 0);
+ } else if (args.equals("stop")) {
+
+ } else {
+ printArgsError(args);
+ }
+ } catch (Exception e) {
+ }
+ }
+
+ private static void printArgsError(String[] args) {
+ logger.warn("invalid args:" + StringUtils.join(args, " "));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
index 35a0fe9..d42da33 100644
--- a/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
@@ -38,7 +38,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamingBootstrap;
+import org.apache.kylin.job.streaming.StreamingBootstrap;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 38d6220..064ea11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -616,6 +616,7 @@
<exclude>**/BuildCubeWithEngineTest.java</exclude>
<exclude>**/BuildIIWithEngineTest.java</exclude>
<exclude>**/BuildIIWithStreamTest.java</exclude>
+ <exclude>**/IIStreamBuilderTest.java</exclude>
<exclude>**/SampleCubeSetupTest.java</exclude>
<exclude>**/KylinQueryTest.java</exclude>
<exclude>**/Kafka*Test.java</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
index 699c0ed..ce87047 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
@@ -59,53 +59,65 @@ public final class KafkaRequester {
private static final Logger logger = LoggerFactory.getLogger(KafkaRequester.class);
public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
- SimpleConsumer consumer;
+ SimpleConsumer consumer = null;
for (Broker broker : kafkaConfig.getBrokers()) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp = consumer.send(req);
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- break;
- }
- List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
- @Nullable
- @Override
- public Integer apply(PartitionMetadata partitionMetadata) {
- return partitionMetadata.partitionId();
+ try {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ TopicMetadataResponse resp = consumer.send(req);
+ final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+ if (topicMetadatas.size() != 1) {
+ break;
+ }
+ final TopicMetadata topicMetadata = topicMetadatas.get(0);
+ if (topicMetadata.errorCode() != 0) {
+ break;
}
- });
- return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+ List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
+ @Nullable
+ @Override
+ public Integer apply(PartitionMetadata partitionMetadata) {
+ return partitionMetadata.partitionId();
+ }
+ });
+ return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
}
logger.debug("cannot find topic:" + kafkaConfig.getTopic());
return null;
}
public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
- SimpleConsumer consumer;
+ SimpleConsumer consumer = null;
for (Broker broker : brokers) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp = consumer.send(req);
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
- break;
- }
- for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
- if (partitionMetadata.partitionId() == partitionId) {
- return partitionMetadata;
+ try {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(topic);
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ TopicMetadataResponse resp = consumer.send(req);
+ final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+ if (topicMetadatas.size() != 1) {
+ logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
+ break;
+ }
+ final TopicMetadata topicMetadata = topicMetadatas.get(0);
+ if (topicMetadata.errorCode() != 0) {
+ logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
+ break;
+ }
+ for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
+ if (partitionMetadata.partitionId() == partitionId) {
+ return partitionMetadata;
+ }
+ }
+ } finally {
+ if (consumer != null) {
+ consumer.close();
}
}
}
@@ -116,30 +128,38 @@ public final class KafkaRequester {
public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
final String clientName = "client_" + topic + "_" + partitionId;
SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
- kafka.api.FetchRequest req = new FetchRequestBuilder()
- .clientId(clientName)
- .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
- .build();
- return consumer.fetch(req);
+ try {
+ kafka.api.FetchRequest req = new FetchRequestBuilder()
+ .clientId(clientName)
+ .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+ .build();
+ return consumer.fetch(req);
+ } finally {
+ consumer.close();
+ }
}
public static long getLastOffset(String topic, int partitionId,
long whichTime, Broker broker, KafkaConfig kafkaConfig) {
String clientName = "client_" + topic + "_" + partitionId;
SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
+ try {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
- if (response.hasError()) {
- System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
- return 0;
+ if (response.hasError()) {
+ System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+ return 0;
+ }
+ long[] offsets = response.offsets(topic, partitionId);
+ return offsets[0];
+ } finally {
+ consumer.close();
}
- long[] offsets = response.offsets(topic, partitionId);
- return offsets[0];
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
deleted file mode 100644
index bd1ab42..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import com.google.common.base.Preconditions;
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.PartitionMetadata;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Created by qianzhou on 3/26/15.
- */
-public class StreamingBootstrap {
-
- private KylinConfig kylinConfig;
- private StreamManager streamManager;
- private IIManager iiManager;
-
- public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
- return new StreamingBootstrap(kylinConfig);
- }
-
- private StreamingBootstrap(KylinConfig kylinConfig) {
- this.kylinConfig = kylinConfig;
- this.streamManager = StreamManager.getInstance(kylinConfig);
- this.iiManager = IIManager.getInstance(kylinConfig);
- }
-
- private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
- final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
- return partitionMetadata.leader();
- } else {
- return null;
- }
- }
-
- public void startStreaming(String streamingConf, int partitionId) throws Exception {
- final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
- Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
- final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
- Preconditions.checkNotNull(ii);
-
- final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
- Preconditions.checkState(leadBroker != null, "cannot find lead broker");
- final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
- long streamOffset = ii.getStreamOffsets().get(partitionId);
- if (streamOffset < earliestOffset) {
- streamOffset = earliestOffset;
- }
-
-
- KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), 0, streamOffset, kafkaConfig.getBrokers(), kafkaConfig) {
- @Override
- protected void consume(long offset, ByteBuffer payload) throws Exception {
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- getStreamQueue().put(new Stream(offset, bytes));
- }
- };
- final IIDesc desc = ii.getDescriptor();
- Executors.newSingleThreadExecutor().submit(consumer);
- final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(), ii.getSegments().get(0).getStorageLocationIdentifier(), desc, partitionId);
- task.setStreamParser(JsonStreamParser.instance);
- final Future<?> future = Executors.newSingleThreadExecutor().submit(task);
- future.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
deleted file mode 100644
index dac8ce0..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by qianzhou on 3/26/15.
- */
-public class StreamingCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
- public static void main(String[] args) {
- try {
- if (args.length < 2) {
- printArgsError(args);
- return;
- }
- if (args[0].equals("start")) {
- String kafkaConfName = args[1];
- StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).startStreaming(kafkaConfName, 0);
- } else if (args.equals("stop")) {
-
- } else {
- printArgsError(args);
- }
- } catch (Exception e) {
- }
- }
-
- private static void printArgsError(String[] args) {
- logger.warn("invalid args:" + StringUtils.join(args, " "));
- }
-
-}