You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:46 UTC

[09/50] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/21b8f0f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/21b8f0f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/21b8f0f6

Branch: refs/heads/streaming-localdict
Commit: 21b8f0f6f43bec5caba0d7c5bbac2f47a5aef27a
Parents: 9a1c4cb
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Mar 26 18:04:14 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Mar 26 18:04:14 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/streaming/KafkaConsumer.java  |  5 +++--
 .../org/apache/kylin/streaming/StreamingBootstrap.java  | 12 +++++-------
 .../kylin/streaming/invertedindex/IIStreamBuilder.java  |  4 ++--
 3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/21b8f0f6/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 910041c..18c8403 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -59,7 +60,7 @@ public abstract class KafkaConsumer implements Runnable {
     private KafkaConfig kafkaConfig;
     private List<Broker> replicaBrokers;
     private long offset;
-    private BlockingQueue<Stream> streamQueue;
+    private LinkedBlockingQueue<Stream> streamQueue;
 
     private Logger logger;
 
@@ -70,7 +71,7 @@ public abstract class KafkaConsumer implements Runnable {
         offset = startOffset;
         this.replicaBrokers = initialBrokers;
         logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
-        streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
+        streamQueue = new LinkedBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
     }
 
     public BlockingQueue<Stream> getStreamQueue() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/21b8f0f6/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
index 4528a72..4b7c6b7 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
@@ -43,9 +43,11 @@ import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /**
  * Created by qianzhou on 3/26/15.
@@ -91,12 +93,8 @@ public class StreamingBootstrap {
             }
         };
         final IIDesc desc = ii.getDescriptor();
-        Executors.newSingleThreadExecutor().execute(consumer);
-        while (true) {
-            final Stream stream = consumer.getStreamQueue().poll();
-            if (stream != null) {
-                System.out.println("offset:" + stream.getOffset() + " content:" + new String(stream.getRawData()));
-            }
-        }
+        Executors.newSingleThreadExecutor().submit(consumer);
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(new IIStreamBuilder(consumer.getStreamQueue(), ii.getSegments().get(0).getStorageLocationIdentifier(), desc, partitionId));
+        future.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/21b8f0f6/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 9724ba7..f9adefe 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -64,7 +64,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -78,7 +78,7 @@ public class IIStreamBuilder extends StreamBuilder {
     private final HTableInterface hTable;
     private final BatchSliceBuilder sliceBuilder;
 
-    public IIStreamBuilder(LinkedBlockingDeque<Stream> queue, String hTableName, IIDesc desc, int partitionId) {
+    public IIStreamBuilder(BlockingQueue<Stream> queue, String hTableName, IIDesc desc, int partitionId) {
         super(queue, desc.getSliceSize());
         this.desc = desc;
         try {