You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/04 09:47:57 UTC
incubator-kylin git commit: KYLIN-808 minor changes
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 702ea88a5 -> 3614eef35
KYLIN-808 minor changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3614eef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3614eef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3614eef3
Branch: refs/heads/0.8.0
Commit: 3614eef359368a4678eec33696683dcef4f2b9eb
Parents: 702ea88
Author: honma <ho...@ebay.com>
Authored: Thu Jun 4 15:24:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 4 15:24:28 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/dict/DictionaryGenerator.java | 1 +
.../apache/kylin/job/streaming/CubeStreamConsumer.java | 8 +-------
.../java/org/apache/kylin/streaming/StreamBuilder.java | 11 ++++++++++-
3 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3614eef3/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 7b0b877..92d1705 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -148,6 +148,7 @@ public class DictionaryGenerator {
}
//FIXME: except for date type, all other date time family types are treated as TimeStrDictionary
+ logger.info("Using TimeStrDictionary");
return new TimeStrDictionary();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3614eef3/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 6c90209..00b9d14 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -82,13 +82,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
@Override
public void consume(MicroStreamBatch microStreamBatch) throws Exception {
- if (microStreamBatch.size() == 0) {
- logger.info("nothing to build, skip to next iteration after sleeping 10s");
- Thread.sleep(10000);
- return;
- } else {
- logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(microStreamBatch.size()), DateFormat.formatToTimeStr(microStreamBatch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(microStreamBatch.getTimestamp().getSecond()) });
- }
+
totalConsumedMessageCount += microStreamBatch.size();
totalRawMessageCount += microStreamBatch.getRawMessageCount();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3614eef3/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index f188990..451e7c5 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -36,6 +36,7 @@ package org.apache.kylin.streaming;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.DateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +118,15 @@ public class StreamBuilder implements Runnable {
batch.getTimestamp().setFirst(start);
batch.getTimestamp().setSecond(start + condition.getBatchInterval());
start += condition.getBatchInterval();
- consumer.consume(batch);
+
+ if (batch.size() == 0) {
+ logger.info("nothing to build, skip to next iteration after sleeping 10s");
+ Thread.sleep(10000);
+ return;
+ } else {
+ logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(batch.size()), DateFormat.formatToTimeStr(batch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(batch.getTimestamp().getSecond()) });
+ consumer.consume(batch);
+ }
}
} catch (InterruptedException e) {
throw new RuntimeException("stream fetcher thread should not be interrupted", e);