You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/07/03 13:43:41 UTC

[2/3] kylin git commit: refactor some streaming classes

refactor some streaming classes


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd116a6c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd116a6c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd116a6c

Branch: refs/heads/stream_m1
Commit: cd116a6c5625009fe40f4136792c382a9a355e5f
Parents: 25081d9
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jun 23 10:34:52 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jun 24 15:39:29 2016 +0800

----------------------------------------------------------------------
 .../src/test/java/org/apache/kylin/job/DeployUtil.java    |  2 +-
 .../org/apache/kylin/job/hadoop/invertedindex/IITest.java |  8 ++------
 .../org/apache/kylin/common/util/StreamingMessage.java    |  4 ++++
 .../apache/kylin/source/kafka/KafkaStreamingInput.java    |  3 ++-
 .../org/apache/kylin/source/kafka/StreamingParser.java    |  3 ++-
 .../apache/kylin/source/kafka/StringStreamingParser.java  | 10 ++++------
 .../apache/kylin/source/kafka/TimedJsonStreamParser.java  |  9 ++++-----
 .../kylin/source/kafka/diagnose/KafkaInputAnalyzer.java   |  3 ++-
 .../org/apache/kylin/source/kafka/util/KafkaUtils.java    |  3 ++-
 9 files changed, 23 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index a0a9f88..d56dd64 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -168,7 +168,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData();
+            List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index da25143..c34ce55 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -38,11 +38,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.common.util.*;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -114,7 +110,7 @@ public class IITest extends LocalFileMetadataTestCase {
             @Nullable
             @Override
             public StreamingMessage apply(@Nullable MessageAndOffset input) {
-                return parser.parse(input);
+                return parser.parse(input.message().payload());
             }
         });
         StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
index f327db2..53ab195 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -51,6 +51,10 @@ public class StreamingMessage {
         return offset;
     }
 
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
     public final long getTimestamp() {
         return timestamp;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 564c221..3243754 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -206,7 +206,8 @@ public class KafkaStreamingInput implements IStreamingInput {
                     for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
                         offset++;
                         consumeMsgCount++;
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+                        streamingMessage.setOffset(messageAndOffset.offset());
                         if (streamingParser.filter(streamingMessage)) {
                             final long timestamp = streamingMessage.getTimestamp();
                             if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 9075c77..3bc42ac 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -40,6 +40,7 @@ import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.common.util.TimeUtil;
@@ -66,7 +67,7 @@ public abstract class StreamingParser {
      * @param message
      * @return StreamingMessage must not be NULL
      */
-    abstract public StreamingMessage parse(Object message);
+    abstract public StreamingMessage parse(ByteBuffer message);
 
     abstract public boolean filter(StreamingMessage streamingMessage);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index 5226899..37bcbfa 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -55,12 +55,10 @@ public final class StringStreamingParser extends StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(Object message) {
-        MessageAndOffset kafkaMessage = (MessageAndOffset) message;
-        final ByteBuffer payload = kafkaMessage.message().payload();
-        byte[] bytes = new byte[payload.limit()];
-        payload.get(bytes);
-        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object> emptyMap());
+    public StreamingMessage parse(ByteBuffer message) {
+        byte[] bytes = new byte[message.limit()];
+        message.get(bytes);
+        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 63f5637..4b1c579 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -37,9 +37,9 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.util.*;
 
-import kafka.message.MessageAndOffset;
 
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -95,10 +95,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(Object msg) {
-        MessageAndOffset messageAndOffset = (MessageAndOffset) msg;
+    public StreamingMessage parse(ByteBuffer buffer) {
         try {
-            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
             Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
             String tsStr = root.get(tsColName);
@@ -119,7 +118,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 }
             }
 
-            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
+            return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap());
 
         } catch (IOException e) {
             logger.error("error", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index 0e29a0c..19fc87f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -109,7 +109,8 @@ public class KafkaInputAnalyzer extends AbstractApplication {
                         offset++;
                         consumeMsgCount++;
 
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+                        streamingMessage.setOffset(messageAndOffset.offset());
                         if (streamingParser.filter(streamingMessage)) {
                             streamQueue.add(streamingMessage);
                         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index f506999..a2984b6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -162,7 +162,8 @@ public final class KafkaUtils {
         final ByteBuffer payload = messageAndOffset.message().payload();
         byte[] bytes = new byte[payload.limit()];
         payload.get(bytes);
-        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+        streamingMessage.setOffset(messageAndOffset.offset());
         logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp()));
         return streamingMessage.getTimestamp();