You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/04 03:03:01 UTC

[20/43] kylin git commit: KYLIN-1311 on the way

http://git-wip-us.apache.org/repos/asf/kylin/blob/1c4deab9/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java b/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
new file mode 100644
index 0000000..3182c16
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+import java.io.File;
+
+/**
+ */
+public class TestBaseWithZookeeper extends LocalFileMetadataTestCase {
+    protected static final String zkAddress = "localhost:2199";
+    static ZkServer server;
+    static boolean zkStarted = false;
+
+    @BeforeClass
+    public static void setupResource() throws Exception {
+        staticCreateTestMetadata();
+
+        if (zkStarted == false) {
+            final File tmpDir = File.createTempFile("KylinTest", null);
+            FileUtil.fullyDelete(tmpDir);
+            tmpDir.mkdirs();
+            tmpDir.deleteOnExit();
+            server = new ZkServer(tmpDir.getAbsolutePath() + "/dataDir", tmpDir.getAbsolutePath() + "/logDir", new IDefaultNameSpace() {
+                @Override
+                public void createDefaultNameSpace(ZkClient zkClient) {
+                }
+            }, 2199, 1000, 2000);
+
+            server.start();
+            zkStarted = true;
+            System.setProperty("kylin.zookeeper.address", zkAddress);
+        }
+
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+        if (server == null) {
+            server.shutdown();
+            zkStarted = false;
+            System.setProperty("kylin.zookeeper.address", "");
+        }
+
+        staticCleanupTestMetadata();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1c4deab9/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 0907623..b075387 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -40,7 +40,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
+import com.google.common.collect.Maps;
 import kafka.message.MessageAndOffset;
 
 import org.apache.commons.lang3.StringUtils;
@@ -102,7 +105,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
     @Override
     public StreamingMessage parse(MessageAndOffset messageAndOffset) {
         try {
-            Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            ConcurrentMap<String, String> root = new ConcurrentSkipListMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+            root.putAll(message);
             String tsStr = root.get(tsColName);
             //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
             //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));