You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:46 UTC
[09/50] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/21b8f0f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/21b8f0f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/21b8f0f6
Branch: refs/heads/streaming-localdict
Commit: 21b8f0f6f43bec5caba0d7c5bbac2f47a5aef27a
Parents: 9a1c4cb
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Mar 26 18:04:14 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Mar 26 18:04:14 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/streaming/KafkaConsumer.java | 5 +++--
.../org/apache/kylin/streaming/StreamingBootstrap.java | 12 +++++-------
.../kylin/streaming/invertedindex/IIStreamBuilder.java | 4 ++--
3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/21b8f0f6/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 910041c..18c8403 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -59,7 +60,7 @@ public abstract class KafkaConsumer implements Runnable {
private KafkaConfig kafkaConfig;
private List<Broker> replicaBrokers;
private long offset;
- private BlockingQueue<Stream> streamQueue;
+ private LinkedBlockingQueue<Stream> streamQueue;
private Logger logger;
@@ -70,7 +71,7 @@ public abstract class KafkaConsumer implements Runnable {
offset = startOffset;
this.replicaBrokers = initialBrokers;
logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
- streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
+ streamQueue = new LinkedBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
}
public BlockingQueue<Stream> getStreamQueue() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/21b8f0f6/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
index 4528a72..4b7c6b7 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
@@ -43,9 +43,11 @@ import org.apache.kylin.invertedindex.IIDescManager;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/**
* Created by qianzhou on 3/26/15.
@@ -91,12 +93,8 @@ public class StreamingBootstrap {
}
};
final IIDesc desc = ii.getDescriptor();
- Executors.newSingleThreadExecutor().execute(consumer);
- while (true) {
- final Stream stream = consumer.getStreamQueue().poll();
- if (stream != null) {
- System.out.println("offset:" + stream.getOffset() + " content:" + new String(stream.getRawData()));
- }
- }
+ Executors.newSingleThreadExecutor().submit(consumer);
+ final Future<?> future = Executors.newSingleThreadExecutor().submit(new IIStreamBuilder(consumer.getStreamQueue(), ii.getSegments().get(0).getStorageLocationIdentifier(), desc, partitionId));
+ future.get();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/21b8f0f6/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 9724ba7..f9adefe 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -64,7 +64,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
@@ -78,7 +78,7 @@ public class IIStreamBuilder extends StreamBuilder {
private final HTableInterface hTable;
private final BatchSliceBuilder sliceBuilder;
- public IIStreamBuilder(LinkedBlockingDeque<Stream> queue, String hTableName, IIDesc desc, int partitionId) {
+ public IIStreamBuilder(BlockingQueue<Stream> queue, String hTableName, IIDesc desc, int partitionId) {
super(queue, desc.getSliceSize());
this.desc = desc;
try {