You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/02 16:26:17 UTC

[2/2] incubator-kylin git commit: bug fix

bug fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/487c0af0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/487c0af0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/487c0af0

Branch: refs/heads/0.8.0
Commit: 487c0af027ade5603fe7a951cb4ff2f80c5aa12e
Parents: 6d6a7c1
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 18:23:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 22:25:55 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/streaming/CubeStreamBuilder.java   | 4 ++--
 .../org/apache/kylin/job/streaming/StreamingBootstrap.java  | 6 +++++-
 .../main/java/org/apache/kylin/streaming/StreamBuilder.java | 9 +++++++--
 3 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 2831caa..3c98464 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -370,11 +370,11 @@ public class CubeStreamBuilder extends StreamBuilder {
 
     @Override
     protected int batchInterval() {
-        return 5 * 60 * 1000;//30 min
+        return 5 * 60 * 1000;//5 min
     }
 
     @Override
     protected int batchSize() {
-        return 1000;
+        return 10000;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index ee00880..dcfa774 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -130,7 +130,7 @@ public class StreamingBootstrap {
 
     private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
         List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
-        for (int partitionId = 0; partitionId < partitionCount && partitionId < 3; ++partitionId) {
+        for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
             final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
 
             final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
@@ -153,10 +153,14 @@ public class StreamingBootstrap {
         Executors.newSingleThreadExecutor().execute(new Runnable() {
             @Override
             public void run() {
+                int totalMessage = 0;
                 while (true) {
                     for (BlockingQueue<StreamMessage> queue : queues) {
                         try {
                             streamQueue.put(queue.take());
+                            if (totalMessage++ % 10000 == 1) {
+                                logger.info("Total stream message count: " + totalMessage);
+                            }
                         } catch (InterruptedException e) {
                             throw new RuntimeException(e);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cb5dc1d..3008722 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -35,7 +35,6 @@
 package org.apache.kylin.streaming;
 
 import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +49,6 @@ public abstract class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
 
-
     private StreamParser streamParser = StringStreamParser.instance;
 
     private StreamFilter streamFilter = DefaultStreamFilter.instance;
@@ -84,6 +82,7 @@ public abstract class StreamBuilder implements Runnable {
     public void run() {
         try {
             List<List<String>> parsedStreamMessages = null;
+            int filteredMsgCount = 0;
             while (true) {
                 if (parsedStreamMessages == null) {
                     parsedStreamMessages = Lists.newLinkedList();
@@ -113,6 +112,11 @@ public abstract class StreamBuilder implements Runnable {
                 final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
 
                 if (getStreamFilter().filter(parsedStreamMessage)) {
+
+                    if (filteredMsgCount++ % 10000 == 1) {
+                        logger.info("Total stream message count: " + filteredMsgCount);
+                    }
+
                     if (startOffset > parsedStreamMessage.getOffset()) {
                         startOffset = parsedStreamMessage.getOffset();
                     }
@@ -158,5 +162,6 @@ public abstract class StreamBuilder implements Runnable {
     }
 
     protected abstract int batchInterval();
+
     protected abstract int batchSize();
 }