You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2016/04/18 08:34:44 UTC

[02/50] kylin git commit: KYLIN-1417 TimedJsonStreamParser should be case insensitive

KYLIN-1417 TimedJsonStreamParser should be case insensitive


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ebce31e0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ebce31e0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ebce31e0

Branch: refs/heads/1.4-rc
Commit: ebce31e01cc4de1aed66a2fccf1e3998c45b7d06
Parents: 809fc62
Author: shaofengshi <sh...@apache.org>
Authored: Mon Feb 15 11:23:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Feb 15 15:50:40 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/source/kafka/TimedJsonStreamParser.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ebce31e0/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 0907623..65835cd 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -40,6 +40,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import kafka.message.MessageAndOffset;
 
@@ -102,7 +104,10 @@ public final class TimedJsonStreamParser extends StreamingParser {
     @Override
     public StreamingMessage parse(MessageAndOffset messageAndOffset) {
         try {
-            Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            ConcurrentMap<String, String> root = new ConcurrentSkipListMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+            root.putAll(message);
+            
             String tsStr = root.get(tsColName);
             //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
             //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));