You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/08 10:00:20 UTC

kylin git commit: add junit test and fix some bug

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2067 72425d4ec -> 41cdadd3b


add junit test and fix some bug

Signed-off-by: XieFan <fa...@kyligence.io>

code style

style

merge patch

shorten json msg

shorten json
Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/41cdadd3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/41cdadd3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/41cdadd3

Branch: refs/heads/KYLIN-2067
Commit: 41cdadd3bfbf71f91204a3f953f1b268e65e045f
Parents: 72425d4
Author: xiefan46 <95...@qq.com>
Authored: Fri Sep 30 13:31:08 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Oct 8 17:57:22 2016 +0800

----------------------------------------------------------------------
 source-kafka/pom.xml                            |  17 ++
 .../source/kafka/TimedJsonStreamParser.java     |  30 ++--
 .../test/java/TimedJsonStreamParserTest.java    | 178 +++++++++++++++++++
 source-kafka/src/test/resources/message.json    |  48 +++++
 4 files changed, 259 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/41cdadd3/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index f91ab8f..d4cdfd5 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -40,6 +40,12 @@
 
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-streaming</artifactId>
         </dependency>
 
@@ -64,11 +70,22 @@
             <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+
+
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/41cdadd3/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index d4327c5..e4c702d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -21,12 +21,13 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.TreeMap;
+import java.util.Collections;
+import java.util.Arrays;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.commons.lang3.StringUtils;
@@ -43,16 +44,15 @@ import com.google.common.collect.Lists;
 
 /**
  * An utility class which parses a JSON streaming message to a list of strings (represent a row in table).
- *
+ * <p>
  * Each message should have a property whose value represents the message's timestamp, default the column name is "timestamp"
  * but can be customized by StreamingParser#PROPERTY_TS_PARSER.
- *
+ * <p>
  * By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser
  * with property StreamingParser#PROPERTY_TS_PARSER.
- *
+ * <p>
  * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
  * the property names.
- *
  */
 public final class TimedJsonStreamParser extends StreamingParser {
 
@@ -106,14 +106,14 @@ public final class TimedJsonStreamParser extends StreamingParser {
             ArrayList<String> result = Lists.newArrayList();
 
             for (TblColRef column : allColumns) {
-                String columnName = column.getName().toLowerCase();
-
+                String columnName = column.getName();
+                columnName = columnName.toLowerCase();
                 if (populateDerivedTimeColumns(columnName, result, t) == false) {
                     result.add(getValueByKey(columnName, root));
                 }
             }
 
-            return new StreamingMessage(result, 0, t, Collections.<String, Object> emptyMap());
+            return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap());
         } catch (IOException e) {
             logger.error("error", e);
             throw new RuntimeException(e);
@@ -132,11 +132,12 @@ public final class TimedJsonStreamParser extends StreamingParser {
 
         if (key.contains(separator)) {
             String[] names = key.toLowerCase().split(separator);
-            Map<String, Object> tempMap = null;
+            Map<String, Object> tempMap = root;
             for (int i = 0; i < names.length - 1; i++) {
                 Object o = root.get(names[i]);
                 if (o instanceof Map) {
-                    tempMap = (Map<String, Object>) o;
+                    tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+                    tempMap.putAll((Map<String, Object>) o);
                 } else {
                     throw new IOException("Property '" + names[i] + "' is not embedded format");
                 }
@@ -149,10 +150,11 @@ public final class TimedJsonStreamParser extends StreamingParser {
         return StringUtils.EMPTY;
     }
 
-    static String objToString(Object value) {
+    public static String objToString(Object value) {
         if (value == null)
             return StringUtils.EMPTY;
-
+        if (value.getClass().isArray())
+            return String.valueOf(Arrays.asList((Object[]) value));
         return String.valueOf(value);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/41cdadd3/source-kafka/src/test/java/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
new file mode 100644
index 0000000..fb33059
--- /dev/null
+++ b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.fasterxml.jackson.databind.JavaType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.TimedJsonStreamParser;
+
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+
+
+public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
+
+    private static String[] userNeedColNames;
+
+    private static final String jsonFilePath = "src/test/resources/message.json";
+
+    private static ObjectMapper mapper;
+
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
+            SimpleType.construct(Object.class));
+
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        staticCreateTestMetadata();
+        mapper = new ObjectMapper();
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        cleanAfterClass();
+    }
+
+
+    @Test
+    public void testNormalValue() throws Exception {
+        userNeedColNames = new String[]{"createdAt", "id", "isTruncated", "text"};
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        assertEquals("Jul 20, 2016 9:59:17 AM", result.get(0));
+        assertEquals("755703618762862600", result.get(1));
+        assertEquals("false", result.get(2));
+        assertEquals("dejamos las tapas regionales de este #Miercoles https://t.co/kfe0kT2Fup", result.get(3));
+    }
+
+    @Test
+    public void testEmbeddedValue() throws Exception {
+        userNeedColNames = new String[]{"user_id", "user_description", "user_isProtected"};
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        assertEquals("4853763947", result.get(0));
+        assertEquals("Noticias, an\ufffd\ufffdlisis e informaci\ufffd\ufffdn para el crecimiento de la regi\ufffd\ufffdn.", result.get(1));
+        assertEquals("false", result.get(2));
+    }
+
+    @Test
+    public void testArrayValue() throws Exception {
+        userNeedColNames = new String[]{"userMentionEntities", "mediaEntities"};
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        HashMap<String, Object> map = (HashMap<String, Object>) msg;
+        Object array = map.get("mediaEntities");
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        System.out.println(result);
+
+    }
+
+    @Test
+    public void testMapValue() throws Exception {
+        userNeedColNames = new String[]{"user"};
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        System.out.println("result:" + result);
+
+    }
+
+    @Test
+    public void testNullKey() throws Exception {
+        userNeedColNames = new String[]{"null", ""};
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        assertEquals(StringUtils.EMPTY, result.get(0));
+        assertEquals(StringUtils.EMPTY, result.get(1));
+    }
+
+
+    private static ByteBuffer getJsonByteBuffer(Object obj) throws IOException {
+        byte[] bytes = mapper.writeValueAsBytes(obj);
+        ByteBuffer buff = ByteBuffer.wrap(bytes);
+        buff.position(0);
+        return buff;
+    }
+
+
+    private static List<TblColRef> mockupTblColRefList() {
+        TableDesc t = mockupTableDesc("table_a");
+        List<TblColRef> list = new ArrayList<>();
+        for (int i = 0; i < userNeedColNames.length; i++) {
+            ColumnDesc c = mockupColumnDesc(t, i, userNeedColNames[i], "string");
+            list.add(c.getRef());
+        }
+        return list;
+    }
+
+    private static TableDesc mockupTableDesc(String tableName) {
+        TableDesc mockup = new TableDesc();
+        mockup.setName(tableName);
+        return mockup;
+    }
+
+    private static ColumnDesc mockupColumnDesc(TableDesc table, int oneBasedColumnIndex, String name, String datatype) {
+        ColumnDesc desc = new ColumnDesc();
+        String id = "" + oneBasedColumnIndex;
+        desc.setId(id);
+        desc.setName(name);
+        desc.setDatatype(datatype);
+        desc.init(table);
+        return desc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/41cdadd3/source-kafka/src/test/resources/message.json
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/resources/message.json b/source-kafka/src/test/resources/message.json
new file mode 100644
index 0000000..dfafd45
--- /dev/null
+++ b/source-kafka/src/test/resources/message.json
@@ -0,0 +1,48 @@
+{
+  "createdAt": "Jul 20, 2016 9:59:17 AM",
+  "id": 755703618762862600,
+  "text": "dejamos las tapas regionales de este #Miercoles https://t.co/kfe0kT2Fup",
+  "source": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
+  "isTruncated": false,
+  "inReplyToStatusId": -1,
+  "inReplyToUserId": -1,
+  "favoriteCount": 0,
+  "retweetCount": 0,
+  "isPossiblySensitive": false,
+  "lang": "es",
+  "contributorsIDs": [],
+  "urlEntities": [],
+  "mediaEntities": [
+    {
+      "id": 755703584084328400,
+      "url": "https://t.co/kfe0kT2Fup",
+      "displayURL": "pic.twitter.com/kfe0kT2Fup",
+      "sizes": {
+        "0": {
+          "width": 150,
+          "height": 150,
+          "resize": 101
+        },
+        "1": {
+          "width": 520,
+          "height": 680,
+          "resize": 100
+        }
+      },
+      "type": "photo",
+      "start": 48,
+      "end": 71
+    }
+  ],
+  "symbolEntities": [],
+  "currentUserRetweetId": -1,
+  "user": {
+    "id": 4853763947,
+    "name": "El Metropolitano",
+    "screenName": "ElTWdelMetro",
+    "description": "Noticias, an\ufffd\ufffdlisis e informaci\ufffd\ufffdn para el crecimiento de la regi\ufffd\ufffdn.",
+    "isDefaultProfileImage": false,
+    "url": "http://elmetropolitano.com.ar/",
+    "isProtected": false
+  }
+}
\ No newline at end of file