You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/02 16:26:16 UTC

[1/2] incubator-kylin git commit: bug fix

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 6d6a7c1c5 -> 2e6d27c89


bug fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2e6d27c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2e6d27c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2e6d27c8

Branch: refs/heads/0.8.0
Commit: 2e6d27c89b09c3253628073270c10d257789b1c3
Parents: 487c0af
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 22:25:49 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 22:25:55 2015 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/kylin/job/streaming/StreamingCLI.java  | 5 +++++
 .../main/java/org/apache/kylin/rest/service/CacheService.java   | 1 -
 .../src/main/java/org/apache/kylin/streaming/StreamBuilder.java | 2 ++
 3 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e6d27c8/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index c2dd2ea..ea2b2a5 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -36,6 +36,8 @@ package org.apache.kylin.job.streaming;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.restclient.AbstractRestCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +53,9 @@ public class StreamingCLI {
                 printArgsError(args);
                 return;
             }
+
+            AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
+
             if (args[0].equals("start")) {
                 String kafkaConfName = args[1];
                 int partition = Integer.parseInt(args[2]);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e6d27c8/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index b4a5711..56f9d4b 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -70,7 +70,6 @@ public class CacheService extends BasicService {
             case CUBE:
                 CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
                 getProjectManager().clearL2Cache();
-
                 //clean query related cache first
                 super.cleanDataCache(newCube.getUuid());
                 //move this logic to other place

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e6d27c8/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 3008722..ce5e3d7 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -98,6 +98,7 @@ public abstract class StreamBuilder implements Runnable {
                 if (streamMessage == null) {
                     logger.info("The stream queue is drained, current available stream count: " + parsedStreamMessages.size());
                     if ((System.currentTimeMillis() - lastBuildTime) > batchInterval() && !parsedStreamMessages.isEmpty()) {
+                        logger.info("Building batch due to time threshold, batch size: " + parsedStreamMessages.size());
                         build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
                         parsedStreamMessages = null;
                     }
@@ -131,6 +132,7 @@ public abstract class StreamBuilder implements Runnable {
                     }
                     parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
                     if (parsedStreamMessages.size() >= batchSize()) {
+                        logger.info("Building batch due to size threshold, batch size: " + parsedStreamMessages.size());
                         build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
                         parsedStreamMessages = null;
                     }


[2/2] incubator-kylin git commit: bug fix

Posted by ma...@apache.org.
bug fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/487c0af0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/487c0af0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/487c0af0

Branch: refs/heads/0.8.0
Commit: 487c0af027ade5603fe7a951cb4ff2f80c5aa12e
Parents: 6d6a7c1
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 18:23:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 22:25:55 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/streaming/CubeStreamBuilder.java   | 4 ++--
 .../org/apache/kylin/job/streaming/StreamingBootstrap.java  | 6 +++++-
 .../main/java/org/apache/kylin/streaming/StreamBuilder.java | 9 +++++++--
 3 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 2831caa..3c98464 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -370,11 +370,11 @@ public class CubeStreamBuilder extends StreamBuilder {
 
     @Override
     protected int batchInterval() {
-        return 5 * 60 * 1000;//30 min
+        return 5 * 60 * 1000;//5 min
     }
 
     @Override
     protected int batchSize() {
-        return 1000;
+        return 10000;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index ee00880..dcfa774 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -130,7 +130,7 @@ public class StreamingBootstrap {
 
     private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
         List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
-        for (int partitionId = 0; partitionId < partitionCount && partitionId < 3; ++partitionId) {
+        for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
             final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
 
             final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
@@ -153,10 +153,14 @@ public class StreamingBootstrap {
         Executors.newSingleThreadExecutor().execute(new Runnable() {
             @Override
             public void run() {
+                int totalMessage = 0;
                 while (true) {
                     for (BlockingQueue<StreamMessage> queue : queues) {
                         try {
                             streamQueue.put(queue.take());
+                            if (totalMessage++ % 10000 == 1) {
+                                logger.info("Total stream message count: " + totalMessage);
+                            }
                         } catch (InterruptedException e) {
                             throw new RuntimeException(e);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cb5dc1d..3008722 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -35,7 +35,6 @@
 package org.apache.kylin.streaming;
 
 import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +49,6 @@ public abstract class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
 
-
     private StreamParser streamParser = StringStreamParser.instance;
 
     private StreamFilter streamFilter = DefaultStreamFilter.instance;
@@ -84,6 +82,7 @@ public abstract class StreamBuilder implements Runnable {
     public void run() {
         try {
             List<List<String>> parsedStreamMessages = null;
+            int filteredMsgCount = 0;
             while (true) {
                 if (parsedStreamMessages == null) {
                     parsedStreamMessages = Lists.newLinkedList();
@@ -113,6 +112,11 @@ public abstract class StreamBuilder implements Runnable {
                 final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
 
                 if (getStreamFilter().filter(parsedStreamMessage)) {
+
+                    if (filteredMsgCount++ % 10000 == 1) {
+                        logger.info("Total stream message count: " + filteredMsgCount);
+                    }
+
                     if (startOffset > parsedStreamMessage.getOffset()) {
                         startOffset = parsedStreamMessage.getOffset();
                     }
@@ -158,5 +162,6 @@ public abstract class StreamBuilder implements Runnable {
     }
 
     protected abstract int batchInterval();
+
     protected abstract int batchSize();
 }