You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/03 04:30:53 UTC
[06/25] kylin git commit: KYLIN-1311 on the way
http://git-wip-us.apache.org/repos/asf/kylin/blob/76132a53/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java b/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
new file mode 100644
index 0000000..3182c16
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+import java.io.File;
+
+/**
+ */
+public class TestBaseWithZookeeper extends LocalFileMetadataTestCase {
+ protected static final String zkAddress = "localhost:2199";
+ static ZkServer server;
+ static boolean zkStarted = false;
+
+ @BeforeClass
+ public static void setupResource() throws Exception {
+ staticCreateTestMetadata();
+
+ if (zkStarted == false) {
+ final File tmpDir = File.createTempFile("KylinTest", null);
+ FileUtil.fullyDelete(tmpDir);
+ tmpDir.mkdirs();
+ tmpDir.deleteOnExit();
+ server = new ZkServer(tmpDir.getAbsolutePath() + "/dataDir", tmpDir.getAbsolutePath() + "/logDir", new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+ }
+ }, 2199, 1000, 2000);
+
+ server.start();
+ zkStarted = true;
+ System.setProperty("kylin.zookeeper.address", zkAddress);
+ }
+
+ }
+
+ @AfterClass
+ public static void tearDownResource() {
+ if (server == null) {
+ server.shutdown();
+ zkStarted = false;
+ System.setProperty("kylin.zookeeper.address", "");
+ }
+
+ staticCleanupTestMetadata();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/76132a53/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 0907623..b075387 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -40,7 +40,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import com.google.common.collect.Maps;
import kafka.message.MessageAndOffset;
import org.apache.commons.lang3.StringUtils;
@@ -102,7 +105,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
@Override
public StreamingMessage parse(MessageAndOffset messageAndOffset) {
try {
- Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+ Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+ ConcurrentMap<String, String> root = new ConcurrentSkipListMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+ root.putAll(message);
String tsStr = root.get(tsColName);
//Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
//" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));