You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/02/09 02:06:13 UTC

[1/3] kylin git commit: KYLIN-3145 minor, update sample streaming message (#4013)

Repository: kylin
Updated Branches:
  refs/heads/2.3.x 353a725c9 -> f0a79ab96


KYLIN-3145 minor, update sample streaming message (#4013)

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/37fd9d4d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/37fd9d4d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/37fd9d4d

Branch: refs/heads/2.3.x
Commit: 37fd9d4d3510338a9e35dd9b7fa23c6642176e48
Parents: 353a725
Author: Shaofeng Shi <sh...@gmail.com>
Authored: Tue Feb 6 16:39:56 2018 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Feb 9 10:04:03 2018 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/source/kafka/util/KafkaSampleProducer.java     | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/37fd9d4d/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 4b91e03..7c8deef 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -129,6 +129,7 @@ public class KafkaSampleProducer {
             user.put("id", UUID.randomUUID().toString());
             user.put("gender", genders.get(rnd.nextInt(2)));
             user.put("age", rnd.nextInt(20) + 10);
+            user.put("first_name", "unknown");
             record.put("user", user);
             //send message
             ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));


[3/3] kylin git commit: KYLIN-3145 Support Kafka JSON message whose property name includes _

Posted by sh...@apache.org.
KYLIN-3145 Support Kafka JSON message whose property name includes _

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0a79ab9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0a79ab9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0a79ab9

Branch: refs/heads/2.3.x
Commit: f0a79ab9613ff8b1a6ab9b2f938abd87721d8bc9
Parents: b4e42cf
Author: shaofengshi <sh...@apache.org>
Authored: Sun Feb 4 16:32:22 2018 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Feb 9 10:04:15 2018 +0800

----------------------------------------------------------------------
 .../apache/kylin/metadata/model/TblColRef.java  |  6 +++
 .../kylin/source/kafka/StreamingParser.java     |  6 ++-
 .../source/kafka/TimedJsonStreamParser.java     | 47 +++++++++++++++-----
 .../source/kafka/TimedJsonStreamParserTest.java | 36 +++++++++++++--
 source-kafka/src/test/resources/message.json    |  2 +-
 5 files changed, 79 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index b48b0c4..ee33e8a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -96,12 +96,18 @@ public class TblColRef implements Serializable {
 
     // for test mainly
     public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype) {
+        return mockup(table, oneBasedColumnIndex, name, datatype, null);
+    }
+
+    // for test mainly
+    public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype, String comment) {
         ColumnDesc desc = new ColumnDesc();
         String id = "" + oneBasedColumnIndex;
         desc.setId(id);
         desc.setName(name);
         desc.setDatatype(datatype);
         desc.init(table);
+        desc.setComment(comment);
         return new TblColRef(desc);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index c2b5104..c97db36 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -42,7 +42,8 @@ public abstract class StreamingParser {
     public static final String PROPERTY_TS_COLUMN_NAME = "tsColName";
     public static final String PROPERTY_TS_PARSER = "tsParser";
     public static final String PROPERTY_TS_PATTERN = "tsPattern";
-    public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
+    public static final String PROPERTY_EMBEDDED_SEPARATOR = "separator";
+    public static final String PROPERTY_STRICT_CHECK = "strictCheck"; // whether need check each column strictly, default be false (fault tolerant).
 
     public static final Map<String, String> defaultProperties = Maps.newHashMap();
     public static final Map<String, Integer> derivedTimeColumns = Maps.newHashMap();
@@ -57,7 +58,8 @@ public abstract class StreamingParser {
         defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
         defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser");
         defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
-        defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_");
+        defaultProperties.put(PROPERTY_EMBEDDED_SEPARATOR, "_");
+        defaultProperties.put(PROPERTY_STRICT_CHECK, "false");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index de167b4..3618ba6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -52,8 +52,9 @@ import com.google.common.collect.Lists;
  * By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser
  * with property StreamingParser#PROPERTY_TS_PARSER.
  * <p>
- * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
- * the property names.
+ * It also support embedded JSON format; Use TimedJsonStreamParser#EMBEDDED_PROPERTY_SEPARATOR) to separate them and save into
+ * the column's "comment" filed. For example: "{ 'user' : { 'first_name': 'Tom'}}"; The 'first_name' field is expressed as
+ * 'user_first_name' field, and its comment value is 'user|first_name'.
  */
 public final class TimedJsonStreamParser extends StreamingParser {
 
@@ -64,13 +65,17 @@ public final class TimedJsonStreamParser extends StreamingParser {
     private String tsColName = null;
     private String tsParser = null;
     private String separator = null;
+    private boolean strictCheck = true;
     private final Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
     private final Map<String, Object> tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
     private final Map<String, String[]> nameMap = new HashMap<>();
+    public static final String EMBEDDED_PROPERTY_SEPARATOR = "|";
 
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
 
     private AbstractTimeParser streamTimeParser;
+    
+    private long vcounter = 0;
 
     public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, String> properties) {
         this.allColumns = allColumns;
@@ -80,7 +85,8 @@ public final class TimedJsonStreamParser extends StreamingParser {
 
         tsColName = properties.get(PROPERTY_TS_COLUMN_NAME);
         tsParser = properties.get(PROPERTY_TS_PARSER);
-        separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR);
+        separator = properties.get(PROPERTY_EMBEDDED_SEPARATOR);
+        strictCheck = Boolean.parseBoolean(properties.get(PROPERTY_STRICT_CHECK));
 
         if (!StringUtils.isEmpty(tsParser)) {
             try {
@@ -112,7 +118,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
             for (TblColRef column : allColumns) {
                 final String columnName = column.getName().toLowerCase();
                 if (populateDerivedTimeColumns(columnName, result, t) == false) {
-                    result.add(getValueByKey(columnName, root));
+                    result.add(getValueByKey(column, root));
                 }
             }
 
@@ -131,16 +137,30 @@ public final class TimedJsonStreamParser extends StreamingParser {
         return true;
     }
 
-    protected String getValueByKey(String key, Map<String, Object> rootMap) throws IOException {
+    public String[] getEmbeddedPropertyNames(TblColRef column) {
+        final String colName = column.getName().toLowerCase();
+        String[] names = nameMap.get(colName);
+        if (names == null) {
+            String comment = column.getColumnDesc().getComment(); // use comment to parse the structure
+            if (!StringUtils.isEmpty(comment) && comment.contains(EMBEDDED_PROPERTY_SEPARATOR)) {
+                names = comment.toLowerCase().split("\\" + EMBEDDED_PROPERTY_SEPARATOR);
+                nameMap.put(colName, names);
+            } else if (colName.contains(separator)) { // deprecated, just be compitable for old version
+                names = colName.toLowerCase().split(separator);
+                nameMap.put(colName, names);
+            }
+        }
+
+        return names;
+    }
+
+    protected String getValueByKey(TblColRef column, Map<String, Object> rootMap) throws IOException {
+        final String key = column.getName().toLowerCase();
         if (rootMap.containsKey(key)) {
             return objToString(rootMap.get(key));
         }
 
-        String[] names = nameMap.get(key);
-        if (names == null && key.contains(separator)) {
-            names = key.toLowerCase().split(separator);
-            nameMap.put(key, names);
-        }
+        String[] names = getEmbeddedPropertyNames(column);
 
         if (names != null && names.length > 0) {
             tempMap.clear();
@@ -150,8 +170,11 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 if (o instanceof Map) {
                     tempMap.clear();
                     tempMap.putAll((Map<String, Object>) o);
-                } else {
-                    throw new IOException("Property '" + names[i] + "' is not embedded format");
+                } else if (strictCheck || vcounter++ % 100 == 0) {
+                    final String msg = "Property '" + names[i] + "' value is not embedded JSON format. ";
+                    logger.warn(msg);
+                    if (strictCheck)
+                        throw new IOException(msg);
                 }
             }
             Object finalObject = tempMap.get(names[names.length - 1]);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
index 8dc840b..b8aa7f3 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
@@ -43,9 +43,11 @@ import com.fasterxml.jackson.databind.type.SimpleType;
 public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
 
     private static String[] userNeedColNames;
+    private static String[] userNeedColNamesComment;
     private static final String jsonFilePath = "src/test/resources/message.json";
     private static ObjectMapper mapper;
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
+            SimpleType.construct(Object.class));
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -75,8 +77,11 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testEmbeddedValue() throws Exception {
-        userNeedColNames = new String[] { "user_id", "user_description", "user_isProtected" };
-        List<TblColRef> allCol = mockupTblColRefList();
+        userNeedColNames = new String[] { "user_id", "user_description", "user_isProtected",
+                "user_is_Default_Profile_Image" };
+        userNeedColNamesComment = new String[] { "", "", "",
+                "user" + TimedJsonStreamParser.EMBEDDED_PROPERTY_SEPARATOR + "is_Default_Profile_Image" };
+        List<TblColRef> allCol = mockupTblColRefListWithComment(userNeedColNamesComment);
         TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
         Object msg = mapper.readValue(new File(jsonFilePath), mapType);
         ByteBuffer buffer = getJsonByteBuffer(msg);
@@ -85,6 +90,21 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
         assertEquals("4853763947", result.get(0));
         assertEquals("Noticias", result.get(1));
         assertEquals("false", result.get(2));
+        assertEquals("false", result.get(3));
+    }
+
+    @Test
+    public void testEmbeddedValueFaultTolerant() throws Exception {
+        userNeedColNames = new String[] { "user_id", "nonexisted_description" };
+        userNeedColNamesComment = new String[] { "", "" };
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        List<StreamingMessageRow> msgList = parser.parse(buffer);
+        List<String> result = msgList.get(0).getData();
+        assertEquals("4853763947", result.get(0));
+        assertEquals(StringUtils.EMPTY, result.get(1));
     }
 
     @Test
@@ -143,4 +163,14 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
         }
         return list;
     }
+
+    private static List<TblColRef> mockupTblColRefListWithComment(String[] comments) {
+        TableDesc t = TableDesc.mockup("table_a");
+        List<TblColRef> list = new ArrayList<>();
+        for (int i = 0; i < userNeedColNames.length; i++) {
+            TblColRef c = TblColRef.mockup(t, i, userNeedColNames[i], "string", comments[i]);
+            list.add(c);
+        }
+        return list;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/source-kafka/src/test/resources/message.json
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/resources/message.json b/source-kafka/src/test/resources/message.json
index 55f35d3..27183eb 100644
--- a/source-kafka/src/test/resources/message.json
+++ b/source-kafka/src/test/resources/message.json
@@ -36,7 +36,7 @@
   "user": {
     "id": 4853763947,
     "description": "Noticias",
-    "isDefaultProfileImage": false,
+    "is_Default_Profile_Image": false,
     "isProtected": false
   }
 }
\ No newline at end of file


[2/3] kylin git commit: KYLIN-3145 Support Kafka JSON message whose property name includes _ (#4028)

Posted by sh...@apache.org.
KYLIN-3145 Support Kafka JSON message whose property name includes _ (#4028)

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b4e42cfd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b4e42cfd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b4e42cfd

Branch: refs/heads/2.3.x
Commit: b4e42cfdd31549beb7de65fb0aad6be3e115df53
Parents: 37fd9d4
Author: luguosheng1314 <55...@qq.com>
Authored: Wed Feb 7 10:58:13 2018 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Feb 9 10:04:09 2018 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/sourceMeta.js | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b4e42cfd/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index 7deef76..49a9613 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -683,14 +683,15 @@ KylinApp
 
         //streaming table data change structure
         var columnList=[]
-        function changeObjTree(obj,base){
+        function changeObjTree(obj,base,comment){
           base=base?base+"_":"";
+          comment= comment?comment+"|":""
           for(var i in obj){
             if(Object.prototype.toString.call(obj[i])=="[object Object]"){
-              changeObjTree(obj[i],base+i);
+              changeObjTree(obj[i],base+i,comment+i);
               continue;
             }
-            columnList.push(createNewObj(base+i,obj[i]));
+            columnList.push(createNewObj(base+i,obj[i],comment+i));
           }
         }
 
@@ -722,12 +723,13 @@ KylinApp
           return defaultType;
         }
 
-        function createNewObj(key,val){
+        function createNewObj(key,val,comment){
           var obj={};
           obj.name=key;
           obj.type=checkValType(val,key);
           obj.fromSource="Y";
           obj.checked="Y";
+          obj.comment=comment;
           if(Object.prototype.toString.call(val)=="[object Array]"){
             obj.checked="N";
           }
@@ -798,6 +800,7 @@ KylinApp
             var columnInstance = {
               "id": ++$index,
               "name": column.name,
+              "comment": /[|]/.test(column.comment)? column.comment : "",
               "datatype": column.type
             }
             columns.push(columnInstance);