You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/04 09:47:57 UTC

incubator-kylin git commit: KYLIN-808 minor changes

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 702ea88a5 -> 3614eef35


KYLIN-808 minor changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3614eef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3614eef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3614eef3

Branch: refs/heads/0.8.0
Commit: 3614eef359368a4678eec33696683dcef4f2b9eb
Parents: 702ea88
Author: honma <ho...@ebay.com>
Authored: Thu Jun 4 15:24:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 4 15:24:28 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/dict/DictionaryGenerator.java  |  1 +
 .../apache/kylin/job/streaming/CubeStreamConsumer.java   |  8 +-------
 .../java/org/apache/kylin/streaming/StreamBuilder.java   | 11 ++++++++++-
 3 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3614eef3/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 7b0b877..92d1705 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -148,6 +148,7 @@ public class DictionaryGenerator {
         }
 
         //FIXME: except for date type, all other date time family types are treated as TimeStrDictionary
+        logger.info("Using TimeStrDictionary");
         return new TimeStrDictionary();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3614eef3/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 6c90209..00b9d14 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -82,13 +82,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
 
     @Override
     public void consume(MicroStreamBatch microStreamBatch) throws Exception {
-        if (microStreamBatch.size() == 0) {
-            logger.info("nothing to build, skip to next iteration after sleeping 10s");
-            Thread.sleep(10000);
-            return;
-        } else {
-            logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(microStreamBatch.size()), DateFormat.formatToTimeStr(microStreamBatch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(microStreamBatch.getTimestamp().getSecond()) });
-        }
+
 
         totalConsumedMessageCount += microStreamBatch.size();
         totalRawMessageCount += microStreamBatch.getRawMessageCount();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3614eef3/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index f188990..451e7c5 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -36,6 +36,7 @@ package org.apache.kylin.streaming;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.DateFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,7 +118,15 @@ public class StreamBuilder implements Runnable {
                 batch.getTimestamp().setFirst(start);
                 batch.getTimestamp().setSecond(start + condition.getBatchInterval());
                 start += condition.getBatchInterval();
-                consumer.consume(batch);
+
+                if (batch.size() == 0) {
+                    logger.info("nothing to build, skip to next iteration after sleeping 10s");
+                    Thread.sleep(10000);
+                    return;
+                } else {
+                    logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(batch.size()), DateFormat.formatToTimeStr(batch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(batch.getTimestamp().getSecond()) });
+                    consumer.consume(batch);
+                }
             }
         } catch (InterruptedException e) {
             throw new RuntimeException("stream fetcher thread should not be interrupted", e);