You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:02:02 UTC

[49/50] incubator-kylin git commit: minor change to kafka test

minor change to kafka test


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0b13251b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0b13251b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0b13251b

Branch: refs/heads/streaming
Commit: 0b13251b57b426bbea54f6037f9a09a0778951d0
Parents: 0b71528
Author: honma <ho...@ebay.com>
Authored: Wed Mar 11 00:21:59 2015 -0700
Committer: honma <ho...@ebay.com>
Committed: Wed Mar 11 15:30:52 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/kylin/streaming/KafkaConsumer.java  | 4 +++-
 .../test/java/org/apache/kylin/streaming/KafkaConsumerTest.java  | 2 ++
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0b13251b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index a9f5679..60f85d8 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -112,10 +112,12 @@ public class KafkaConsumer implements Runnable {
                 }
                 for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
                     final ByteBuffer payload = messageAndOffset.message().payload();
+
                     //TODO use ByteBuffer maybe
                     byte[] bytes = new byte[payload.limit()];
                     payload.get(bytes);
-                    logger.debug("get message offset:" + messageAndOffset.offset());
+                    logger.info("get message offset:" + messageAndOffset.offset());
+                    logger.info(new String(bytes));
                     try {
                         streamQueue.put(new Stream(messageAndOffset.offset(), bytes));
                     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0b13251b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
index 91e06fc..a6176ab 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -92,6 +92,8 @@ public class KafkaConsumerTest extends KafkaBaseTest {
         for (BlockingQueue<Stream> queue : queues) {
             count += queue.size();
         }
+
+        logger.info("count of messages are " + count);
         //since there will be historical data
         assertTrue(count >= TOTAL_SEND_COUNT);
     }