You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/10/08 02:26:08 UTC
[46/50] [abbrv] kylin git commit: KYLIN-1919 support embedded json
format
KYLIN-1919 support embedded json format
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa51ce0c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa51ce0c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa51ce0c
Branch: refs/heads/orderedbytes
Commit: aa51ce0c3382d330ba5418b49eb669c964315f96
Parents: 792d4ee
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 29 16:04:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 6 14:44:05 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 2 +-
.../kylin/provision/BuildCubeWithEngine.java | 2 +-
.../kylin/provision/BuildCubeWithStream.java | 16 ++--
.../java/org/apache/kylin/rest/DebugTomcat.java | 3 +-
.../kylin/source/kafka/AbstractTimeParser.java | 4 +-
.../kylin/source/kafka/DateTimeParser.java | 40 ++-------
.../kylin/source/kafka/DefaultTimeParser.java | 4 +-
.../kylin/source/kafka/StreamingParser.java | 41 ++++++++-
.../source/kafka/StringStreamingParser.java | 3 +-
.../source/kafka/TimedJsonStreamParser.java | 95 ++++++++++++--------
.../kafka/diagnose/KafkaInputAnalyzer.java | 6 +-
11 files changed, 133 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9e9df05..be9b2a9 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -156,7 +156,7 @@ public class DeployUtil {
for (ColumnDesc columnDesc : tableDesc.getColumns()) {
tableColumns.add(columnDesc.getRef());
}
- TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
+ TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, null);
StringBuilder sb = new StringBuilder();
for (String json : data) {
List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 31cf0eb..971b293 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -84,7 +84,7 @@ public class BuildCubeWithEngine {
afterClass();
logger.info("Going to exit");
System.exit(0);
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("error", e);
System.exit(1);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 58715f1..f8805a6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -234,6 +234,8 @@ public class BuildCubeWithStream {
segments = cubeManager.getCube(cubeName).getSegments();
Assert.assertTrue(segments.size() == 1);
}
+
+ logger.info("Build is done");
}
@@ -309,20 +311,22 @@ public class BuildCubeWithStream {
}
public static void main(String[] args) throws Exception {
+ BuildCubeWithStream buildCubeWithStream = null;
try {
beforeClass();
-
- BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+ buildCubeWithStream = new BuildCubeWithStream();
buildCubeWithStream.before();
buildCubeWithStream.build();
- logger.info("Build is done");
- buildCubeWithStream.after();
- afterClass();
logger.info("Going to exit");
System.exit(0);
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("error", e);
System.exit(1);
+ } finally {
+ if (buildCubeWithStream != null) {
+ buildCubeWithStream.after();
+ }
+ afterClass();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 7417a05..0f2c500 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -48,7 +48,8 @@ public class DebugTomcat {
System.setProperty("catalina.home", ".");
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
- throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+ System.err.println("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+ System.exit(1);
}
// workaround for job submission from win to linux -- https://issues.apache.org/jira/browse/MAPREDUCE-4052
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
index 96a4ece..26624ef 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
@@ -18,11 +18,13 @@
package org.apache.kylin.source.kafka;
+import java.util.Map;
+
/**
*/
public abstract class AbstractTimeParser {
- public AbstractTimeParser(String[] properties) {
+ public AbstractTimeParser(Map<String, String> properties) {
}
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
index 2bd699d..3382783 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
@@ -18,51 +18,29 @@
package org.apache.kylin.source.kafka;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.kylin.common.util.DateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.ParseException;
+import java.util.Map;
/**
*/
public class DateTimeParser extends AbstractTimeParser {
private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
- private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS;
+ private String tsPattern = null;
private FastDateFormat formatter = null;
//call by reflection
- public DateTimeParser(String[] properties) {
+ public DateTimeParser(Map<String, String> properties) {
super(properties);
- for (String prop : properties) {
- try {
- String[] parts = prop.split("=");
- if (parts.length == 2) {
- switch (parts[0]) {
- case "tsPattern":
- this.tsPattern = parts[1];
- break;
- default:
- break;
- }
- }
- } catch (Exception e) {
- logger.error("Failed to parse property " + prop);
- //ignore
- }
- }
+ tsPattern = properties.get(StreamingParser.PROPERTY_TS_PATTERN);
- if (!StringUtils.isEmpty(tsPattern)) {
- try {
- formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
- } catch (Throwable e) {
- throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
- }
- } else {
+ try {
+ formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+ } catch (Throwable e) {
throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
}
}
@@ -77,8 +55,8 @@ public class DateTimeParser extends AbstractTimeParser {
try {
return formatter.parse(timeStr).getTime();
- } catch (ParseException e) {
- throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e);
+ } catch (Throwable e) {
+ throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
index 85f2bfa..4bcd572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -20,11 +20,13 @@ package org.apache.kylin.source.kafka;
import org.apache.commons.lang3.StringUtils;
+import java.util.Map;
+
/**
*/
public class DefaultTimeParser extends AbstractTimeParser {
- public DefaultTimeParser(String[] properties) {
+ public DefaultTimeParser(Map<String, String> properties) {
super(properties);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 4d840b8..43b2ac5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -20,8 +20,10 @@ package org.apache.kylin.source.kafka;
import java.lang.reflect.Constructor;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import java.nio.ByteBuffer;
import org.apache.kylin.common.util.DateFormat;
@@ -30,12 +32,21 @@ import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params
+ * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, Map properties) as params
*/
public abstract class StreamingParser {
+ private static final Logger logger = LoggerFactory.getLogger(StreamingParser.class);
+ public static final String PROPERTY_TS_COLUMN_NAME = "tsColName";
+ public static final String PROPERTY_TS_PARSER = "tsParser";
+ public static final String PROPERTY_TS_PATTERN = "tsPattern";
+ public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
+
+ public static final Map<String, String> defaultProperties = Maps.newHashMap();
public static final Set derivedTimeColumns = Sets.newHashSet();
static {
derivedTimeColumns.add("minute_start");
@@ -45,6 +56,10 @@ public abstract class StreamingParser {
derivedTimeColumns.add("month_start");
derivedTimeColumns.add("quarter_start");
derivedTimeColumns.add("year_start");
+ defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
+ defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser");
+ defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+ defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_");
}
/**
@@ -57,14 +72,34 @@ public abstract class StreamingParser {
public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
if (!StringUtils.isEmpty(parserName)) {
+ logger.info("Construct StreamingParse {} with properties {}", parserName, parserProperties);
Class clazz = Class.forName(parserName);
- Constructor constructor = clazz.getConstructor(List.class, String.class);
- return (StreamingParser) constructor.newInstance(columns, parserProperties);
+ Map<String, String> properties = parseProperties(parserProperties);
+ Constructor constructor = clazz.getConstructor(List.class, Map.class);
+ return (StreamingParser) constructor.newInstance(columns, properties);
} else {
throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + ".");
}
}
+ public static Map<String, String> parseProperties(String propertiesStr) {
+
+ Map<String, String> result = Maps.newHashMap(defaultProperties);
+ if (!StringUtils.isEmpty(propertiesStr)) {
+ String[] properties = propertiesStr.split(";");
+ for (String prop : properties) {
+ String[] parts = prop.split("=");
+ if (parts.length == 2) {
+ result.put(parts[0], parts[1]);
+ } else {
+ logger.warn("Ignored invalid property expression '" + prop + "'");
+ }
+ }
+ }
+
+ return result;
+ }
+
/**
* Calculate the derived time column value and put to the result list.
* @param columnName the column name, should be in lower case
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index cea8e0b..f74df83 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
@@ -33,7 +34,7 @@ public final class StringStreamingParser extends StreamingParser {
public static final StringStreamingParser instance = new StringStreamingParser(null, null);
- private StringStreamingParser(List<TblColRef> allColumns, String propertiesStr) {
+ private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) {
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 2125c05..d4327c5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.source.kafka;
@@ -42,7 +42,17 @@ import com.fasterxml.jackson.databind.type.SimpleType;
import com.google.common.collect.Lists;
/**
- * each json message with a "timestamp" field
+ * An utility class which parses a JSON streaming message to a list of strings (represent a row in table).
+ *
+ * Each message should have a property whose value represents the message's timestamp, default the column name is "timestamp"
+ * but can be customized by StreamingParser#PROPERTY_TS_PARSER.
+ *
+ * By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser
+ * with property StreamingParser#PROPERTY_TS_PARSER.
+ *
+ * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
+ * the property names.
+ *
*/
public final class TimedJsonStreamParser extends StreamingParser {
@@ -50,51 +60,34 @@ public final class TimedJsonStreamParser extends StreamingParser {
private List<TblColRef> allColumns;
private final ObjectMapper mapper;
- private String tsColName = "timestamp";
- private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser";
+ private String tsColName = null;
+ private String tsParser = null;
+ private String separator = null;
+
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
private AbstractTimeParser streamTimeParser;
- public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
+ public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, String> properties) {
this.allColumns = allColumns;
- String[] properties = null;
- if (!StringUtils.isEmpty(propertiesStr)) {
- properties = propertiesStr.split(";");
- for (String prop : properties) {
- try {
- String[] parts = prop.split("=");
- if (parts.length == 2) {
- switch (parts[0]) {
- case "tsColName":
- this.tsColName = parts[1];
- break;
- case "tsParser":
- this.tsParser = parts[1];
- break;
- default:
- break;
- }
- }
- } catch (Exception e) {
- logger.error("Failed to parse property " + prop);
- //ignore
- }
- }
+ if (properties == null) {
+ properties = StreamingParser.defaultProperties;
}
- logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
+ tsColName = properties.get(PROPERTY_TS_COLUMN_NAME);
+ tsParser = properties.get(PROPERTY_TS_PARSER);
+ separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR);
if (!StringUtils.isEmpty(tsParser)) {
try {
Class clazz = Class.forName(tsParser);
- Constructor constructor = clazz.getConstructor(String[].class);
- streamTimeParser = (AbstractTimeParser) constructor.newInstance((Object)properties);
+ Constructor constructor = clazz.getConstructor(Map.class);
+ streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties);
} catch (Exception e) {
- throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e);
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e);
}
} else {
- throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".");
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".");
}
mapper = new ObjectMapper();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
@@ -108,7 +101,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
root.putAll(message);
- String tsStr = String.valueOf(root.get(tsColName));
+ String tsStr = objToString(root.get(tsColName));
long t = streamTimeParser.parseTime(tsStr);
ArrayList<String> result = Lists.newArrayList();
@@ -116,8 +109,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
String columnName = column.getName().toLowerCase();
if (populateDerivedTimeColumns(columnName, result, t) == false) {
- String x = String.valueOf(root.get(columnName));
- result.add(x);
+ result.add(getValueByKey(columnName, root));
}
}
@@ -133,4 +125,35 @@ public final class TimedJsonStreamParser extends StreamingParser {
return true;
}
+ protected String getValueByKey(String key, Map<String, Object> root) throws IOException {
+ if (root.containsKey(key)) {
+ return objToString(root.get(key));
+ }
+
+ if (key.contains(separator)) {
+ String[] names = key.toLowerCase().split(separator);
+ Map<String, Object> tempMap = null;
+ for (int i = 0; i < names.length - 1; i++) {
+ Object o = root.get(names[i]);
+ if (o instanceof Map) {
+ tempMap = (Map<String, Object>) o;
+ } else {
+ throw new IOException("Property '" + names[i] + "' is not embedded format");
+ }
+ }
+ Object finalObject = tempMap.get(names[names.length - 1]);
+ return objToString(finalObject);
+
+ }
+
+ return StringUtils.EMPTY;
+ }
+
+ static String objToString(Object value) {
+ if (value == null)
+ return StringUtils.EMPTY;
+
+ return String.valueOf(value);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index efaa042..b1b4011 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka.diagnose;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Maps;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -289,8 +291,10 @@ public class KafkaInputAnalyzer extends AbstractApplication {
String task = optionsHelper.getOptionValue(OPTION_TASK);
String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName);
kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
- parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), "formatTs=true;tsColName=" + tsColName);
+ parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties);
if ("disorder".equalsIgnoreCase(task)) {
analyzeDisorder();