You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/10/08 02:26:08 UTC

[46/50] [abbrv] kylin git commit: KYLIN-1919 support embedded json format

KYLIN-1919 support embedded json format

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa51ce0c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa51ce0c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa51ce0c

Branch: refs/heads/orderedbytes
Commit: aa51ce0c3382d330ba5418b49eb669c964315f96
Parents: 792d4ee
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 29 16:04:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 6 14:44:05 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   |  2 +-
 .../kylin/provision/BuildCubeWithEngine.java    |  2 +-
 .../kylin/provision/BuildCubeWithStream.java    | 16 ++--
 .../java/org/apache/kylin/rest/DebugTomcat.java |  3 +-
 .../kylin/source/kafka/AbstractTimeParser.java  |  4 +-
 .../kylin/source/kafka/DateTimeParser.java      | 40 ++-------
 .../kylin/source/kafka/DefaultTimeParser.java   |  4 +-
 .../kylin/source/kafka/StreamingParser.java     | 41 ++++++++-
 .../source/kafka/StringStreamingParser.java     |  3 +-
 .../source/kafka/TimedJsonStreamParser.java     | 95 ++++++++++++--------
 .../kafka/diagnose/KafkaInputAnalyzer.java      |  6 +-
 11 files changed, 133 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9e9df05..be9b2a9 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -156,7 +156,7 @@ public class DeployUtil {
         for (ColumnDesc columnDesc : tableDesc.getColumns()) {
             tableColumns.add(columnDesc.getRef());
         }
-        TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
+        TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, null);
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
             List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 31cf0eb..971b293 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -84,7 +84,7 @@ public class BuildCubeWithEngine {
             afterClass();
             logger.info("Going to exit");
             System.exit(0);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("error", e);
             System.exit(1);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 58715f1..f8805a6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -234,6 +234,8 @@ public class BuildCubeWithStream {
             segments = cubeManager.getCube(cubeName).getSegments();
             Assert.assertTrue(segments.size() == 1);
         }
+
+        logger.info("Build is done");
     }
 
 
@@ -309,20 +311,22 @@ public class BuildCubeWithStream {
     }
 
     public static void main(String[] args) throws Exception {
+        BuildCubeWithStream buildCubeWithStream = null;
         try {
             beforeClass();
-
-            BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+            buildCubeWithStream = new BuildCubeWithStream();
             buildCubeWithStream.before();
             buildCubeWithStream.build();
-            logger.info("Build is done");
-            buildCubeWithStream.after();
-            afterClass();
             logger.info("Going to exit");
             System.exit(0);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("error", e);
             System.exit(1);
+        } finally {
+            if (buildCubeWithStream != null) {
+                buildCubeWithStream.after();
+            }
+            afterClass();
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 7417a05..0f2c500 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -48,7 +48,8 @@ public class DebugTomcat {
                 System.setProperty("catalina.home", ".");
 
             if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
-                throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+                System.err.println("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+                System.exit(1);
             }
 
             // workaround for job submission from win to linux -- https://issues.apache.org/jira/browse/MAPREDUCE-4052

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
index 96a4ece..26624ef 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
@@ -18,11 +18,13 @@
 
 package org.apache.kylin.source.kafka;
 
+import java.util.Map;
+
 /**
  */
 public abstract class AbstractTimeParser {
 
-    public AbstractTimeParser(String[] properties) {
+    public AbstractTimeParser(Map<String, String> properties) {
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
index 2bd699d..3382783 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
@@ -18,51 +18,29 @@
 
 package org.apache.kylin.source.kafka;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.kylin.common.util.DateFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.ParseException;
+import java.util.Map;
 
 /**
  */
 public class DateTimeParser extends AbstractTimeParser {
 
     private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
-    private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS;
+    private String tsPattern  = null;
 
     private FastDateFormat formatter = null;
 
     //call by reflection
-    public DateTimeParser(String[] properties) {
+    public DateTimeParser(Map<String, String> properties) {
         super(properties);
-        for (String prop : properties) {
-            try {
-                String[] parts = prop.split("=");
-                if (parts.length == 2) {
-                    switch (parts[0]) {
-                    case "tsPattern":
-                        this.tsPattern = parts[1];
-                        break;
-                    default:
-                        break;
-                    }
-                }
-            } catch (Exception e) {
-                logger.error("Failed to parse property " + prop);
-                //ignore
-            }
-        }
+        tsPattern = properties.get(StreamingParser.PROPERTY_TS_PATTERN);
 
-        if (!StringUtils.isEmpty(tsPattern)) {
-            try {
-                formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
-            } catch (Throwable e) {
-                throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
-            }
-        } else {
+        try {
+            formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+        } catch (Throwable e) {
             throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
         }
     }
@@ -77,8 +55,8 @@ public class DateTimeParser extends AbstractTimeParser {
 
         try {
             return formatter.parse(timeStr).getTime();
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e);
+        } catch (Throwable e) {
+            throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
index 85f2bfa..4bcd572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -20,11 +20,13 @@ package org.apache.kylin.source.kafka;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.Map;
+
 /**
  */
 public class DefaultTimeParser extends AbstractTimeParser {
 
-    public DefaultTimeParser(String[] properties) {
+    public DefaultTimeParser(Map<String, String> properties) {
         super(properties);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 4d840b8..43b2ac5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -20,8 +20,10 @@ package org.apache.kylin.source.kafka;
 
 import java.lang.reflect.Constructor;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.DateFormat;
@@ -30,12 +32,21 @@ import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params
+ * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, Map properties) as params
  */
 public abstract class StreamingParser {
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamingParser.class);
+    public static final String PROPERTY_TS_COLUMN_NAME = "tsColName";
+    public static final String PROPERTY_TS_PARSER = "tsParser";
+    public static final String PROPERTY_TS_PATTERN = "tsPattern";
+    public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
+
+    public static final Map<String, String> defaultProperties = Maps.newHashMap();
     public static final Set derivedTimeColumns = Sets.newHashSet();
     static {
         derivedTimeColumns.add("minute_start");
@@ -45,6 +56,10 @@ public abstract class StreamingParser {
         derivedTimeColumns.add("month_start");
         derivedTimeColumns.add("quarter_start");
         derivedTimeColumns.add("year_start");
+        defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
+        defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser");
+        defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+        defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_");
     }
 
     /**
@@ -57,14 +72,34 @@ public abstract class StreamingParser {
 
     public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
         if (!StringUtils.isEmpty(parserName)) {
+            logger.info("Construct StreamingParse {} with properties {}", parserName, parserProperties);
             Class clazz = Class.forName(parserName);
-            Constructor constructor = clazz.getConstructor(List.class, String.class);
-            return (StreamingParser) constructor.newInstance(columns, parserProperties);
+            Map<String, String> properties = parseProperties(parserProperties);
+            Constructor constructor = clazz.getConstructor(List.class, Map.class);
+            return (StreamingParser) constructor.newInstance(columns, properties);
         } else {
             throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + ".");
         }
     }
 
+    public static Map<String, String> parseProperties(String propertiesStr) {
+
+        Map<String, String> result = Maps.newHashMap(defaultProperties);
+        if (!StringUtils.isEmpty(propertiesStr)) {
+            String[] properties = propertiesStr.split(";");
+            for (String prop : properties) {
+                String[] parts = prop.split("=");
+                if (parts.length == 2) {
+                    result.put(parts[0], parts[1]);
+                } else {
+                    logger.warn("Ignored invalid property expression '" + prop + "'");
+                }
+            }
+        }
+
+        return result;
+    }
+
     /**
      * Calculate the derived time column value and put to the result list.
      * @param columnName the column name, should be in lower case

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index cea8e0b..f74df83 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -33,7 +34,7 @@ public final class StringStreamingParser extends StreamingParser {
 
     public static final StringStreamingParser instance = new StringStreamingParser(null, null);
 
-    private StringStreamingParser(List<TblColRef> allColumns, String propertiesStr) {
+    private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 2125c05..d4327c5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.source.kafka;
 
@@ -42,7 +42,17 @@ import com.fasterxml.jackson.databind.type.SimpleType;
 import com.google.common.collect.Lists;
 
 /**
- * each json message with a "timestamp" field
+ * An utility class which parses a JSON streaming message to a list of strings (represent a row in table).
+ *
+ * Each message should have a property whose value represents the message's timestamp, default the column name is "timestamp"
+ * but can be customized by StreamingParser#PROPERTY_TS_PARSER.
+ *
+ * By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser
+ * with property StreamingParser#PROPERTY_TS_PARSER.
+ *
+ * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
+ * the property names.
+ *
  */
 public final class TimedJsonStreamParser extends StreamingParser {
 
@@ -50,51 +60,34 @@ public final class TimedJsonStreamParser extends StreamingParser {
 
     private List<TblColRef> allColumns;
     private final ObjectMapper mapper;
-    private String tsColName = "timestamp";
-    private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser";
+    private String tsColName = null;
+    private String tsParser = null;
+    private String separator = null;
+
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
 
     private AbstractTimeParser streamTimeParser;
 
-    public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
+    public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, String> properties) {
         this.allColumns = allColumns;
-        String[] properties = null;
-        if (!StringUtils.isEmpty(propertiesStr)) {
-            properties = propertiesStr.split(";");
-            for (String prop : properties) {
-                try {
-                    String[] parts = prop.split("=");
-                    if (parts.length == 2) {
-                        switch (parts[0]) {
-                        case "tsColName":
-                            this.tsColName = parts[1];
-                            break;
-                        case "tsParser":
-                            this.tsParser = parts[1];
-                            break;
-                        default:
-                            break;
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("Failed to parse property " + prop);
-                    //ignore
-                }
-            }
+        if (properties == null) {
+            properties = StreamingParser.defaultProperties;
         }
 
-        logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
+        tsColName = properties.get(PROPERTY_TS_COLUMN_NAME);
+        tsParser = properties.get(PROPERTY_TS_PARSER);
+        separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR);
 
         if (!StringUtils.isEmpty(tsParser)) {
             try {
                 Class clazz = Class.forName(tsParser);
-                Constructor constructor = clazz.getConstructor(String[].class);
-                streamTimeParser = (AbstractTimeParser) constructor.newInstance((Object)properties);
+                Constructor constructor = clazz.getConstructor(Map.class);
+                streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties);
             } catch (Exception e) {
-                throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e);
+                throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e);
             }
         } else {
-            throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".");
+            throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".");
         }
         mapper = new ObjectMapper();
         mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
@@ -108,7 +101,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
             Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
             Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
-            String tsStr = String.valueOf(root.get(tsColName));
+            String tsStr = objToString(root.get(tsColName));
             long t = streamTimeParser.parseTime(tsStr);
             ArrayList<String> result = Lists.newArrayList();
 
@@ -116,8 +109,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 String columnName = column.getName().toLowerCase();
 
                 if (populateDerivedTimeColumns(columnName, result, t) == false) {
-                    String x = String.valueOf(root.get(columnName));
-                    result.add(x);
+                    result.add(getValueByKey(columnName, root));
                 }
             }
 
@@ -133,4 +125,35 @@ public final class TimedJsonStreamParser extends StreamingParser {
         return true;
     }
 
+    protected String getValueByKey(String key, Map<String, Object> root) throws IOException {
+        if (root.containsKey(key)) {
+            return objToString(root.get(key));
+        }
+
+        if (key.contains(separator)) {
+            String[] names = key.toLowerCase().split(separator);
+            Map<String, Object> tempMap = null;
+            for (int i = 0; i < names.length - 1; i++) {
+                Object o = root.get(names[i]);
+                if (o instanceof Map) {
+                    tempMap = (Map<String, Object>) o;
+                } else {
+                    throw new IOException("Property '" + names[i] + "' is not embedded format");
+                }
+            }
+            Object finalObject = tempMap.get(names[names.length - 1]);
+            return objToString(finalObject);
+
+        }
+
+        return StringUtils.EMPTY;
+    }
+
+    static String objToString(Object value) {
+        if (value == null)
+            return StringUtils.EMPTY;
+
+        return String.valueOf(value);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index efaa042..b1b4011 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka.diagnose;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -289,8 +291,10 @@ public class KafkaInputAnalyzer extends AbstractApplication {
         String task = optionsHelper.getOptionValue(OPTION_TASK);
         String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
 
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName);
         kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
-        parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), "formatTs=true;tsColName=" + tsColName);
+        parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties);
 
         if ("disorder".equalsIgnoreCase(task)) {
             analyzeDisorder();