You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/02 16:26:17 UTC
[2/2] incubator-kylin git commit: bug fix
bug fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/487c0af0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/487c0af0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/487c0af0
Branch: refs/heads/0.8.0
Commit: 487c0af027ade5603fe7a951cb4ff2f80c5aa12e
Parents: 6d6a7c1
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 18:23:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 22:25:55 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/job/streaming/CubeStreamBuilder.java | 4 ++--
.../org/apache/kylin/job/streaming/StreamingBootstrap.java | 6 +++++-
.../main/java/org/apache/kylin/streaming/StreamBuilder.java | 9 +++++++--
3 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 2831caa..3c98464 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -370,11 +370,11 @@ public class CubeStreamBuilder extends StreamBuilder {
@Override
protected int batchInterval() {
- return 5 * 60 * 1000;//30 min
+ return 5 * 60 * 1000;//5 min
}
@Override
protected int batchSize() {
- return 1000;
+ return 10000;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index ee00880..dcfa774 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -130,7 +130,7 @@ public class StreamingBootstrap {
private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
- for (int partitionId = 0; partitionId < partitionCount && partitionId < 3; ++partitionId) {
+ for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
@@ -153,10 +153,14 @@ public class StreamingBootstrap {
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
+ int totalMessage = 0;
while (true) {
for (BlockingQueue<StreamMessage> queue : queues) {
try {
streamQueue.put(queue.take());
+ if (totalMessage++ % 10000 == 1) {
+ logger.info("Total stream message count: " + totalMessage);
+ }
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cb5dc1d..3008722 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -35,7 +35,6 @@
package org.apache.kylin.streaming;
import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +49,6 @@ public abstract class StreamBuilder implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
-
private StreamParser streamParser = StringStreamParser.instance;
private StreamFilter streamFilter = DefaultStreamFilter.instance;
@@ -84,6 +82,7 @@ public abstract class StreamBuilder implements Runnable {
public void run() {
try {
List<List<String>> parsedStreamMessages = null;
+ int filteredMsgCount = 0;
while (true) {
if (parsedStreamMessages == null) {
parsedStreamMessages = Lists.newLinkedList();
@@ -113,6 +112,11 @@ public abstract class StreamBuilder implements Runnable {
final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
if (getStreamFilter().filter(parsedStreamMessage)) {
+
+ if (filteredMsgCount++ % 10000 == 1) {
+ logger.info("Total stream message count: " + filteredMsgCount);
+ }
+
if (startOffset > parsedStreamMessage.getOffset()) {
startOffset = parsedStreamMessage.getOffset();
}
@@ -158,5 +162,6 @@ public abstract class StreamBuilder implements Runnable {
}
protected abstract int batchInterval();
+
protected abstract int batchSize();
}