You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/02/09 02:06:13 UTC
[1/3] kylin git commit: KYLIN-3145 minor,
update sample streaming message (#4013)
Repository: kylin
Updated Branches:
refs/heads/2.3.x 353a725c9 -> f0a79ab96
KYLIN-3145 minor, update sample streaming message (#4013)
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/37fd9d4d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/37fd9d4d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/37fd9d4d
Branch: refs/heads/2.3.x
Commit: 37fd9d4d3510338a9e35dd9b7fa23c6642176e48
Parents: 353a725
Author: Shaofeng Shi <sh...@gmail.com>
Authored: Tue Feb 6 16:39:56 2018 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Feb 9 10:04:03 2018 +0800
----------------------------------------------------------------------
.../org/apache/kylin/source/kafka/util/KafkaSampleProducer.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/37fd9d4d/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 4b91e03..7c8deef 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -129,6 +129,7 @@ public class KafkaSampleProducer {
user.put("id", UUID.randomUUID().toString());
user.put("gender", genders.get(rnd.nextInt(2)));
user.put("age", rnd.nextInt(20) + 10);
+ user.put("first_name", "unknown");
record.put("user", user);
//send message
ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
[3/3] kylin git commit: KYLIN-3145 Support Kafka JSON message whose
property name includes _
Posted by sh...@apache.org.
KYLIN-3145 Support Kafka JSON message whose property name includes _
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0a79ab9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0a79ab9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0a79ab9
Branch: refs/heads/2.3.x
Commit: f0a79ab9613ff8b1a6ab9b2f938abd87721d8bc9
Parents: b4e42cf
Author: shaofengshi <sh...@apache.org>
Authored: Sun Feb 4 16:32:22 2018 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Feb 9 10:04:15 2018 +0800
----------------------------------------------------------------------
.../apache/kylin/metadata/model/TblColRef.java | 6 +++
.../kylin/source/kafka/StreamingParser.java | 6 ++-
.../source/kafka/TimedJsonStreamParser.java | 47 +++++++++++++++-----
.../source/kafka/TimedJsonStreamParserTest.java | 36 +++++++++++++--
source-kafka/src/test/resources/message.json | 2 +-
5 files changed, 79 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index b48b0c4..ee33e8a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -96,12 +96,18 @@ public class TblColRef implements Serializable {
// for test mainly
public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype) {
+ return mockup(table, oneBasedColumnIndex, name, datatype, null);
+ }
+
+ // for test mainly
+ public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype, String comment) {
ColumnDesc desc = new ColumnDesc();
String id = "" + oneBasedColumnIndex;
desc.setId(id);
desc.setName(name);
desc.setDatatype(datatype);
desc.init(table);
+ desc.setComment(comment);
return new TblColRef(desc);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index c2b5104..c97db36 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -42,7 +42,8 @@ public abstract class StreamingParser {
public static final String PROPERTY_TS_COLUMN_NAME = "tsColName";
public static final String PROPERTY_TS_PARSER = "tsParser";
public static final String PROPERTY_TS_PATTERN = "tsPattern";
- public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
+ public static final String PROPERTY_EMBEDDED_SEPARATOR = "separator";
+ public static final String PROPERTY_STRICT_CHECK = "strictCheck"; // whether need check each column strictly, default be false (fault tolerant).
public static final Map<String, String> defaultProperties = Maps.newHashMap();
public static final Map<String, Integer> derivedTimeColumns = Maps.newHashMap();
@@ -57,7 +58,8 @@ public abstract class StreamingParser {
defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser");
defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
- defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_");
+ defaultProperties.put(PROPERTY_EMBEDDED_SEPARATOR, "_");
+ defaultProperties.put(PROPERTY_STRICT_CHECK, "false");
}
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index de167b4..3618ba6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -52,8 +52,9 @@ import com.google.common.collect.Lists;
* By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser
* with property StreamingParser#PROPERTY_TS_PARSER.
* <p>
- * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
- * the property names.
+ * It also support embedded JSON format; Use TimedJsonStreamParser#EMBEDDED_PROPERTY_SEPARATOR) to separate them and save into
+ * the column's "comment" filed. For example: "{ 'user' : { 'first_name': 'Tom'}}"; The 'first_name' field is expressed as
+ * 'user_first_name' field, and its comment value is 'user|first_name'.
*/
public final class TimedJsonStreamParser extends StreamingParser {
@@ -64,13 +65,17 @@ public final class TimedJsonStreamParser extends StreamingParser {
private String tsColName = null;
private String tsParser = null;
private String separator = null;
+ private boolean strictCheck = true;
private final Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private final Map<String, Object> tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private final Map<String, String[]> nameMap = new HashMap<>();
+ public static final String EMBEDDED_PROPERTY_SEPARATOR = "|";
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
private AbstractTimeParser streamTimeParser;
+
+ private long vcounter = 0;
public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, String> properties) {
this.allColumns = allColumns;
@@ -80,7 +85,8 @@ public final class TimedJsonStreamParser extends StreamingParser {
tsColName = properties.get(PROPERTY_TS_COLUMN_NAME);
tsParser = properties.get(PROPERTY_TS_PARSER);
- separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR);
+ separator = properties.get(PROPERTY_EMBEDDED_SEPARATOR);
+ strictCheck = Boolean.parseBoolean(properties.get(PROPERTY_STRICT_CHECK));
if (!StringUtils.isEmpty(tsParser)) {
try {
@@ -112,7 +118,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
for (TblColRef column : allColumns) {
final String columnName = column.getName().toLowerCase();
if (populateDerivedTimeColumns(columnName, result, t) == false) {
- result.add(getValueByKey(columnName, root));
+ result.add(getValueByKey(column, root));
}
}
@@ -131,16 +137,30 @@ public final class TimedJsonStreamParser extends StreamingParser {
return true;
}
- protected String getValueByKey(String key, Map<String, Object> rootMap) throws IOException {
+ public String[] getEmbeddedPropertyNames(TblColRef column) {
+ final String colName = column.getName().toLowerCase();
+ String[] names = nameMap.get(colName);
+ if (names == null) {
+ String comment = column.getColumnDesc().getComment(); // use comment to parse the structure
+ if (!StringUtils.isEmpty(comment) && comment.contains(EMBEDDED_PROPERTY_SEPARATOR)) {
+ names = comment.toLowerCase().split("\\" + EMBEDDED_PROPERTY_SEPARATOR);
+ nameMap.put(colName, names);
+ } else if (colName.contains(separator)) { // deprecated, just be compitable for old version
+ names = colName.toLowerCase().split(separator);
+ nameMap.put(colName, names);
+ }
+ }
+
+ return names;
+ }
+
+ protected String getValueByKey(TblColRef column, Map<String, Object> rootMap) throws IOException {
+ final String key = column.getName().toLowerCase();
if (rootMap.containsKey(key)) {
return objToString(rootMap.get(key));
}
- String[] names = nameMap.get(key);
- if (names == null && key.contains(separator)) {
- names = key.toLowerCase().split(separator);
- nameMap.put(key, names);
- }
+ String[] names = getEmbeddedPropertyNames(column);
if (names != null && names.length > 0) {
tempMap.clear();
@@ -150,8 +170,11 @@ public final class TimedJsonStreamParser extends StreamingParser {
if (o instanceof Map) {
tempMap.clear();
tempMap.putAll((Map<String, Object>) o);
- } else {
- throw new IOException("Property '" + names[i] + "' is not embedded format");
+ } else if (strictCheck || vcounter++ % 100 == 0) {
+ final String msg = "Property '" + names[i] + "' value is not embedded JSON format. ";
+ logger.warn(msg);
+ if (strictCheck)
+ throw new IOException(msg);
}
}
Object finalObject = tempMap.get(names[names.length - 1]);
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
index 8dc840b..b8aa7f3 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
@@ -43,9 +43,11 @@ import com.fasterxml.jackson.databind.type.SimpleType;
public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
private static String[] userNeedColNames;
+ private static String[] userNeedColNamesComment;
private static final String jsonFilePath = "src/test/resources/message.json";
private static ObjectMapper mapper;
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
+ SimpleType.construct(Object.class));
@BeforeClass
public static void setUp() throws Exception {
@@ -75,8 +77,11 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
@Test
public void testEmbeddedValue() throws Exception {
- userNeedColNames = new String[] { "user_id", "user_description", "user_isProtected" };
- List<TblColRef> allCol = mockupTblColRefList();
+ userNeedColNames = new String[] { "user_id", "user_description", "user_isProtected",
+ "user_is_Default_Profile_Image" };
+ userNeedColNamesComment = new String[] { "", "", "",
+ "user" + TimedJsonStreamParser.EMBEDDED_PROPERTY_SEPARATOR + "is_Default_Profile_Image" };
+ List<TblColRef> allCol = mockupTblColRefListWithComment(userNeedColNamesComment);
TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
Object msg = mapper.readValue(new File(jsonFilePath), mapType);
ByteBuffer buffer = getJsonByteBuffer(msg);
@@ -85,6 +90,21 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
assertEquals("4853763947", result.get(0));
assertEquals("Noticias", result.get(1));
assertEquals("false", result.get(2));
+ assertEquals("false", result.get(3));
+ }
+
+ @Test
+ public void testEmbeddedValueFaultTolerant() throws Exception {
+ userNeedColNames = new String[] { "user_id", "nonexisted_description" };
+ userNeedColNamesComment = new String[] { "", "" };
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ List<StreamingMessageRow> msgList = parser.parse(buffer);
+ List<String> result = msgList.get(0).getData();
+ assertEquals("4853763947", result.get(0));
+ assertEquals(StringUtils.EMPTY, result.get(1));
}
@Test
@@ -143,4 +163,14 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
}
return list;
}
+
+ private static List<TblColRef> mockupTblColRefListWithComment(String[] comments) {
+ TableDesc t = TableDesc.mockup("table_a");
+ List<TblColRef> list = new ArrayList<>();
+ for (int i = 0; i < userNeedColNames.length; i++) {
+ TblColRef c = TblColRef.mockup(t, i, userNeedColNames[i], "string", comments[i]);
+ list.add(c);
+ }
+ return list;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0a79ab9/source-kafka/src/test/resources/message.json
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/resources/message.json b/source-kafka/src/test/resources/message.json
index 55f35d3..27183eb 100644
--- a/source-kafka/src/test/resources/message.json
+++ b/source-kafka/src/test/resources/message.json
@@ -36,7 +36,7 @@
"user": {
"id": 4853763947,
"description": "Noticias",
- "isDefaultProfileImage": false,
+ "is_Default_Profile_Image": false,
"isProtected": false
}
}
\ No newline at end of file
[2/3] kylin git commit: KYLIN-3145 Support Kafka JSON message whose
property name includes _ (#4028)
Posted by sh...@apache.org.
KYLIN-3145 Support Kafka JSON message whose property name includes _ (#4028)
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b4e42cfd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b4e42cfd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b4e42cfd
Branch: refs/heads/2.3.x
Commit: b4e42cfdd31549beb7de65fb0aad6be3e115df53
Parents: 37fd9d4
Author: luguosheng1314 <55...@qq.com>
Authored: Wed Feb 7 10:58:13 2018 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Feb 9 10:04:09 2018 +0800
----------------------------------------------------------------------
webapp/app/js/controllers/sourceMeta.js | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4e42cfd/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index 7deef76..49a9613 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -683,14 +683,15 @@ KylinApp
//streaming table data change structure
var columnList=[]
- function changeObjTree(obj,base){
+ function changeObjTree(obj,base,comment){
base=base?base+"_":"";
+ comment= comment?comment+"|":""
for(var i in obj){
if(Object.prototype.toString.call(obj[i])=="[object Object]"){
- changeObjTree(obj[i],base+i);
+ changeObjTree(obj[i],base+i,comment+i);
continue;
}
- columnList.push(createNewObj(base+i,obj[i]));
+ columnList.push(createNewObj(base+i,obj[i],comment+i));
}
}
@@ -722,12 +723,13 @@ KylinApp
return defaultType;
}
- function createNewObj(key,val){
+ function createNewObj(key,val,comment){
var obj={};
obj.name=key;
obj.type=checkValType(val,key);
obj.fromSource="Y";
obj.checked="Y";
+ obj.comment=comment;
if(Object.prototype.toString.call(val)=="[object Array]"){
obj.checked="N";
}
@@ -798,6 +800,7 @@ KylinApp
var columnInstance = {
"id": ++$index,
"name": column.name,
+ "comment": /[|]/.test(column.comment)? column.comment : "",
"datatype": column.type
}
columns.push(columnInstance);