You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2019/05/17 10:19:08 UTC
[kylin] 01/02: KYLIN-4001 Allow user-specified time format using
real-time for backend
This is an automated email from the ASF dual-hosted git repository.
magang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit e09d9d7b37a688b5bebd8f93aea41d1eaf3bcfec
Author: ning.guo <35...@qq.com>
AuthorDate: Fri May 17 12:38:47 2019 +0800
KYLIN-4001 Allow user-specified time format using real-time for backend
---
.../stream/core/source/MessageParserInfo.java | 22 ++++++++
.../stream/source/kafka/AbstractTimeParser.java | 38 +++++++++++++
.../kylin/stream/source/kafka/DateTimeParser.java | 55 +++++++++++++++++++
.../kylin/stream/source/kafka/LongTimeParser.java | 63 ++++++++++++++++++++++
.../stream/source/kafka/TimedJsonStreamParser.java | 24 +++++++--
webapp/app/partials/tables/table_detail.html | 8 +++
6 files changed, 207 insertions(+), 3 deletions(-)
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
index 89e36dc..4070ae6 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
@@ -28,6 +28,12 @@ public class MessageParserInfo {
@JsonProperty("ts_col_name")
private String tsColName;
+ @JsonProperty("ts_parser")
+ private String tsParser;
+
+ @JsonProperty("ts_pattern")
+ private String tsPattern;
+
@JsonProperty("format_ts")
private boolean formatTs;
@@ -42,6 +48,22 @@ public class MessageParserInfo {
this.tsColName = tsColName;
}
+ public String getTsParser() {
+ return tsParser;
+ }
+
+ public void setTsParser(String tsParser) {
+ this.tsParser = tsParser;
+ }
+
+ public String getTsPattern() {
+ return tsPattern;
+ }
+
+ public void setTsPattern(String tsPattern) {
+ this.tsPattern = tsPattern;
+ }
+
public boolean isFormatTs() {
return formatTs;
}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java
new file mode 100644
index 0000000..74a5e9b
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public abstract class AbstractTimeParser {
+
+ public AbstractTimeParser(MessageParserInfo parserInfo) {
+ }
+
+ /**
+ * Parse a string time to a long value (epoch time)
+ * @param time
+ * @return
+ */
+ abstract public long parseTime(String time) throws IllegalArgumentException;
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java
new file mode 100644
index 0000000..0ae2239
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public class DateTimeParser extends AbstractTimeParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
+ private String tsPattern = null;
+ private FastDateFormat formatter = null;
+
+
+ public DateTimeParser(MessageParserInfo parserInfo) {
+ super(parserInfo);
+ tsPattern = parserInfo.getTsPattern();
+ try {
+ formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+ } catch (Throwable e) {
+ throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+ }
+ }
+
+ @Override
+ public long parseTime(String timeStr) throws IllegalArgumentException {
+ try {
+ return formatter.parse(timeStr).getTime();
+ } catch (Throwable e) {
+ throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e);
+ }
+ }
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java
new file mode 100644
index 0000000..de88847
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public class LongTimeParser extends AbstractTimeParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(LongTimeParser.class);
+ private String tsPattern = null;
+
+ public LongTimeParser(MessageParserInfo parserInfo) {
+ super(parserInfo);
+ tsPattern = parserInfo.getTsPattern().toUpperCase(Locale.ENGLISH);
+ }
+
+ /**
+ * Parse a string time to a long value (epoch time)
+ *
+ * @param time
+ * @return
+ */
+ public long parseTime(String time) throws IllegalArgumentException {
+ long t;
+ if (StringUtils.isEmpty(time)) {
+ t = 0;
+ } else {
+ try {
+ t = Long.parseLong(time);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ if ("S".equals(tsPattern)) {
+ t = t * 1000;
+ }
+ return t;
+ }
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
index 594e6a4..32e4111 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
@@ -14,12 +14,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.stream.source.kafka;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -62,6 +63,8 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
private List<TblColRef> allColumns;
private boolean formatTs = false;//not used
private String tsColName = "timestamp";
+ private String tsParser = null;
+ private AbstractTimeParser streamTimeParser;
/**
* the path of {"user" : {"name": "kite", "sex":"female"}}
@@ -88,6 +91,21 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
}
logger.info("Using parser field mapping by {}", parserInfo.getColumnToSourceFieldMapping());
}
+ this.tsParser = parserInfo.getTsParser();
+
+ if (!StringUtils.isEmpty(tsParser)) {
+ try {
+ Class clazz = Class.forName(tsParser);
+ Constructor constructor = clazz.getConstructor(MessageParserInfo.class);
+ streamTimeParser = (AbstractTimeParser) constructor.newInstance(parserInfo);
+ } catch (Exception e) {
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", tsPattern " + parserInfo.getTsPattern() + ".", e);
+ }
+ } else {
+ parserInfo.setTsParser("org.apache.kylin.stream.source.kafka.LongTimeParser");
+ parserInfo.setTsPattern("MS");
+ streamTimeParser = new LongTimeParser(parserInfo);
+ }
}
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.disable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE);
@@ -108,7 +126,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
if (StringUtils.isEmpty(tsStr)) {
t = 0;
} else {
- t = Long.valueOf(tsStr);
+ t = streamTimeParser.parseTime(tsStr);
}
ArrayList<String> result = Lists.newArrayList();
@@ -133,7 +151,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
}
return new StreamingMessage(result, new KafkaPartitionPosition(record.partition(), record.offset()), t,
- Collections.<String, Object> emptyMap());
+ Collections.<String, Object>emptyMap());
} catch (IOException e) {
logger.error("error", e);
throw new RuntimeException(e);
diff --git a/webapp/app/partials/tables/table_detail.html b/webapp/app/partials/tables/table_detail.html
index 85026c9..0cf4ed8 100644
--- a/webapp/app/partials/tables/table_detail.html
+++ b/webapp/app/partials/tables/table_detail.html
@@ -196,6 +196,14 @@
</th>
<td>{{currentStreamingConfig.properties['bootstrap.servers']}}</td>
</tr>
+ <tr>
+ <th>TSParse</th>
+ <td>{{currentStreamingConfig.parser_info.ts_parser}}</td>
+ </tr>
+ <tr>
+ <th>TSPattern</th>
+ <td>{{currentStreamingConfig.parser_info.ts_pattern}}</td>
+ </tr>
</tbody>
</table>
</div>