You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/08 10:00:20 UTC
kylin git commit: add junit test and fix some bug
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2067 72425d4ec -> 41cdadd3b
add junit test and fix some bug
Signed-off-by: XieFan <fa...@kyligence.io>
code style
style
merge patch
shorten json msg
shorten json
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/41cdadd3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/41cdadd3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/41cdadd3
Branch: refs/heads/KYLIN-2067
Commit: 41cdadd3bfbf71f91204a3f953f1b268e65e045f
Parents: 72425d4
Author: xiefan46 <95...@qq.com>
Authored: Fri Sep 30 13:31:08 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Oct 8 17:57:22 2016 +0800
----------------------------------------------------------------------
source-kafka/pom.xml | 17 ++
.../source/kafka/TimedJsonStreamParser.java | 30 ++--
.../test/java/TimedJsonStreamParserTest.java | 178 +++++++++++++++++++
source-kafka/src/test/resources/message.json | 48 +++++
4 files changed, 259 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/41cdadd3/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index f91ab8f..d4cdfd5 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -40,6 +40,12 @@
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-streaming</artifactId>
</dependency>
@@ -64,11 +70,22 @@
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+
+
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/41cdadd3/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index d4327c5..e4c702d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -21,12 +21,13 @@ package org.apache.kylin.source.kafka;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
+import java.util.HashMap;
import java.util.TreeMap;
+import java.util.Collections;
+import java.util.Arrays;
import com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.commons.lang3.StringUtils;
@@ -43,16 +44,15 @@ import com.google.common.collect.Lists;
/**
* An utility class which parses a JSON streaming message to a list of strings (represent a row in table).
- *
+ * <p>
* Each message should have a property whose value represents the message's timestamp, default the column name is "timestamp"
* but can be customized by StreamingParser#PROPERTY_TS_PARSER.
- *
+ * <p>
* By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser
* with property StreamingParser#PROPERTY_TS_PARSER.
- *
+ * <p>
* It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
* the property names.
- *
*/
public final class TimedJsonStreamParser extends StreamingParser {
@@ -106,14 +106,14 @@ public final class TimedJsonStreamParser extends StreamingParser {
ArrayList<String> result = Lists.newArrayList();
for (TblColRef column : allColumns) {
- String columnName = column.getName().toLowerCase();
-
+ String columnName = column.getName();
+ columnName = columnName.toLowerCase();
if (populateDerivedTimeColumns(columnName, result, t) == false) {
result.add(getValueByKey(columnName, root));
}
}
- return new StreamingMessage(result, 0, t, Collections.<String, Object> emptyMap());
+ return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap());
} catch (IOException e) {
logger.error("error", e);
throw new RuntimeException(e);
@@ -132,11 +132,12 @@ public final class TimedJsonStreamParser extends StreamingParser {
if (key.contains(separator)) {
String[] names = key.toLowerCase().split(separator);
- Map<String, Object> tempMap = null;
+ Map<String, Object> tempMap = root;
for (int i = 0; i < names.length - 1; i++) {
Object o = root.get(names[i]);
if (o instanceof Map) {
- tempMap = (Map<String, Object>) o;
+ tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ tempMap.putAll((Map<String, Object>) o);
} else {
throw new IOException("Property '" + names[i] + "' is not embedded format");
}
@@ -149,10 +150,11 @@ public final class TimedJsonStreamParser extends StreamingParser {
return StringUtils.EMPTY;
}
- static String objToString(Object value) {
+ public static String objToString(Object value) {
if (value == null)
return StringUtils.EMPTY;
-
+ if (value.getClass().isArray())
+ return String.valueOf(Arrays.asList((Object[]) value));
return String.valueOf(value);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/41cdadd3/source-kafka/src/test/java/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
new file mode 100644
index 0000000..fb33059
--- /dev/null
+++ b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.fasterxml.jackson.databind.JavaType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.TimedJsonStreamParser;
+
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+
+
+public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
+
+ private static String[] userNeedColNames;
+
+ private static final String jsonFilePath = "src/test/resources/message.json";
+
+ private static ObjectMapper mapper;
+
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
+ SimpleType.construct(Object.class));
+
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ staticCreateTestMetadata();
+ mapper = new ObjectMapper();
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ cleanAfterClass();
+ }
+
+
+ @Test
+ public void testNormalValue() throws Exception {
+ userNeedColNames = new String[]{"createdAt", "id", "isTruncated", "text"};
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ assertEquals("Jul 20, 2016 9:59:17 AM", result.get(0));
+ assertEquals("755703618762862600", result.get(1));
+ assertEquals("false", result.get(2));
+ assertEquals("dejamos las tapas regionales de este #Miercoles https://t.co/kfe0kT2Fup", result.get(3));
+ }
+
+ @Test
+ public void testEmbeddedValue() throws Exception {
+ userNeedColNames = new String[]{"user_id", "user_description", "user_isProtected"};
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ assertEquals("4853763947", result.get(0));
+ assertEquals("Noticias, an\ufffd\ufffdlisis e informaci\ufffd\ufffdn para el crecimiento de la regi\ufffd\ufffdn.", result.get(1));
+ assertEquals("false", result.get(2));
+ }
+
+ @Test
+ public void testArrayValue() throws Exception {
+ userNeedColNames = new String[]{"userMentionEntities", "mediaEntities"};
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ HashMap<String, Object> map = (HashMap<String, Object>) msg;
+ Object array = map.get("mediaEntities");
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ System.out.println(result);
+
+ }
+
+ @Test
+ public void testMapValue() throws Exception {
+ userNeedColNames = new String[]{"user"};
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ System.out.println("result:" + result);
+
+ }
+
+ @Test
+ public void testNullKey() throws Exception {
+ userNeedColNames = new String[]{"null", ""};
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ assertEquals(StringUtils.EMPTY, result.get(0));
+ assertEquals(StringUtils.EMPTY, result.get(1));
+ }
+
+
+ private static ByteBuffer getJsonByteBuffer(Object obj) throws IOException {
+ byte[] bytes = mapper.writeValueAsBytes(obj);
+ ByteBuffer buff = ByteBuffer.wrap(bytes);
+ buff.position(0);
+ return buff;
+ }
+
+
+ private static List<TblColRef> mockupTblColRefList() {
+ TableDesc t = mockupTableDesc("table_a");
+ List<TblColRef> list = new ArrayList<>();
+ for (int i = 0; i < userNeedColNames.length; i++) {
+ ColumnDesc c = mockupColumnDesc(t, i, userNeedColNames[i], "string");
+ list.add(c.getRef());
+ }
+ return list;
+ }
+
+ private static TableDesc mockupTableDesc(String tableName) {
+ TableDesc mockup = new TableDesc();
+ mockup.setName(tableName);
+ return mockup;
+ }
+
+ private static ColumnDesc mockupColumnDesc(TableDesc table, int oneBasedColumnIndex, String name, String datatype) {
+ ColumnDesc desc = new ColumnDesc();
+ String id = "" + oneBasedColumnIndex;
+ desc.setId(id);
+ desc.setName(name);
+ desc.setDatatype(datatype);
+ desc.init(table);
+ return desc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/41cdadd3/source-kafka/src/test/resources/message.json
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/resources/message.json b/source-kafka/src/test/resources/message.json
new file mode 100644
index 0000000..dfafd45
--- /dev/null
+++ b/source-kafka/src/test/resources/message.json
@@ -0,0 +1,48 @@
+{
+ "createdAt": "Jul 20, 2016 9:59:17 AM",
+ "id": 755703618762862600,
+ "text": "dejamos las tapas regionales de este #Miercoles https://t.co/kfe0kT2Fup",
+ "source": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
+ "isTruncated": false,
+ "inReplyToStatusId": -1,
+ "inReplyToUserId": -1,
+ "favoriteCount": 0,
+ "retweetCount": 0,
+ "isPossiblySensitive": false,
+ "lang": "es",
+ "contributorsIDs": [],
+ "urlEntities": [],
+ "mediaEntities": [
+ {
+ "id": 755703584084328400,
+ "url": "https://t.co/kfe0kT2Fup",
+ "displayURL": "pic.twitter.com/kfe0kT2Fup",
+ "sizes": {
+ "0": {
+ "width": 150,
+ "height": 150,
+ "resize": 101
+ },
+ "1": {
+ "width": 520,
+ "height": 680,
+ "resize": 100
+ }
+ },
+ "type": "photo",
+ "start": 48,
+ "end": 71
+ }
+ ],
+ "symbolEntities": [],
+ "currentUserRetweetId": -1,
+ "user": {
+ "id": 4853763947,
+ "name": "El Metropolitano",
+ "screenName": "ElTWdelMetro",
+ "description": "Noticias, an\ufffd\ufffdlisis e informaci\ufffd\ufffdn para el crecimiento de la regi\ufffd\ufffdn.",
+ "isDefaultProfileImage": false,
+ "url": "http://elmetropolitano.com.ar/",
+ "isProtected": false
+ }
+}
\ No newline at end of file