You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:02:02 UTC
[49/50] incubator-kylin git commit: minor change to kafka test
minor change to kafka test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0b13251b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0b13251b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0b13251b
Branch: refs/heads/streaming
Commit: 0b13251b57b426bbea54f6037f9a09a0778951d0
Parents: 0b71528
Author: honma <ho...@ebay.com>
Authored: Wed Mar 11 00:21:59 2015 -0700
Committer: honma <ho...@ebay.com>
Committed: Wed Mar 11 15:30:52 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/streaming/KafkaConsumer.java | 4 +++-
.../test/java/org/apache/kylin/streaming/KafkaConsumerTest.java | 2 ++
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0b13251b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index a9f5679..60f85d8 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -112,10 +112,12 @@ public class KafkaConsumer implements Runnable {
}
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
final ByteBuffer payload = messageAndOffset.message().payload();
+
//TODO use ByteBuffer maybe
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
- logger.debug("get message offset:" + messageAndOffset.offset());
+ logger.info("get message offset:" + messageAndOffset.offset());
+ logger.info(new String(bytes));
try {
streamQueue.put(new Stream(messageAndOffset.offset(), bytes));
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0b13251b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
index 91e06fc..a6176ab 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -92,6 +92,8 @@ public class KafkaConsumerTest extends KafkaBaseTest {
for (BlockingQueue<Stream> queue : queues) {
count += queue.size();
}
+
+ logger.info("count of messages are " + count);
//since there will be historical data
assertTrue(count >= TOTAL_SEND_COUNT);
}