You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2016/04/18 08:34:44 UTC
[02/50] kylin git commit: KYLIN-1417 TimedJsonStreamParser should be
case insensitive
KYLIN-1417 TimedJsonStreamParser should be case insensitive
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ebce31e0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ebce31e0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ebce31e0
Branch: refs/heads/1.4-rc
Commit: ebce31e01cc4de1aed66a2fccf1e3998c45b7d06
Parents: 809fc62
Author: shaofengshi <sh...@apache.org>
Authored: Mon Feb 15 11:23:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Feb 15 15:50:40 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/source/kafka/TimedJsonStreamParser.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ebce31e0/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 0907623..65835cd 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -40,6 +40,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import kafka.message.MessageAndOffset;
@@ -102,7 +104,10 @@ public final class TimedJsonStreamParser extends StreamingParser {
@Override
public StreamingMessage parse(MessageAndOffset messageAndOffset) {
try {
- Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+ Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+ ConcurrentMap<String, String> root = new ConcurrentSkipListMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+ root.putAll(message);
+
String tsStr = root.get(tsColName);
//Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
//" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));