You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2019/05/17 10:19:08 UTC

[kylin] 01/02: KYLIN-4001 Allow user-specified time format using real-time for backend

This is an automated email from the ASF dual-hosted git repository.

magang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e09d9d7b37a688b5bebd8f93aea41d1eaf3bcfec
Author: ning.guo <35...@qq.com>
AuthorDate: Fri May 17 12:38:47 2019 +0800

    KYLIN-4001 Allow user-specified time format using real-time for backend
---
 .../stream/core/source/MessageParserInfo.java      | 22 ++++++++
 .../stream/source/kafka/AbstractTimeParser.java    | 38 +++++++++++++
 .../kylin/stream/source/kafka/DateTimeParser.java  | 55 +++++++++++++++++++
 .../kylin/stream/source/kafka/LongTimeParser.java  | 63 ++++++++++++++++++++++
 .../stream/source/kafka/TimedJsonStreamParser.java | 24 +++++++--
 webapp/app/partials/tables/table_detail.html       |  8 +++
 6 files changed, 207 insertions(+), 3 deletions(-)

diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
index 89e36dc..4070ae6 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
@@ -28,6 +28,12 @@ public class MessageParserInfo {
     @JsonProperty("ts_col_name")
     private String tsColName;
 
+    @JsonProperty("ts_parser")
+    private String tsParser;
+
+    @JsonProperty("ts_pattern")
+    private String tsPattern;
+
     @JsonProperty("format_ts")
     private boolean formatTs;
 
@@ -42,6 +48,22 @@ public class MessageParserInfo {
         this.tsColName = tsColName;
     }
 
+    public String getTsParser() {
+        return tsParser;
+    }
+
+    public void setTsParser(String tsParser) {
+        this.tsParser = tsParser;
+    }
+
+    public String getTsPattern() {
+        return tsPattern;
+    }
+
+    public void setTsPattern(String tsPattern) {
+        this.tsPattern = tsPattern;
+    }
+
     public boolean isFormatTs() {
         return formatTs;
     }
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java
new file mode 100644
index 0000000..74a5e9b
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public abstract class AbstractTimeParser {
+
+    public AbstractTimeParser(MessageParserInfo parserInfo) {
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     * @param time
+     * @return
+     */
+    abstract public long parseTime(String time) throws IllegalArgumentException;
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java
new file mode 100644
index 0000000..0ae2239
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public class DateTimeParser extends AbstractTimeParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
+    private String tsPattern  = null;
+    private FastDateFormat formatter = null;
+
+
+    public DateTimeParser(MessageParserInfo parserInfo) {
+        super(parserInfo);
+        tsPattern = parserInfo.getTsPattern();
+        try {
+            formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+        } catch (Throwable e) {
+            throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+        }
+    }
+
+    @Override
+    public long parseTime(String timeStr) throws IllegalArgumentException {
+        try {
+            return formatter.parse(timeStr).getTime();
+        } catch (Throwable e) {
+            throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e);
+        }
+    }
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java
new file mode 100644
index 0000000..de88847
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public class LongTimeParser extends AbstractTimeParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(LongTimeParser.class);
+    private String tsPattern = null;
+
+    public LongTimeParser(MessageParserInfo parserInfo) {
+        super(parserInfo);
+        tsPattern = parserInfo.getTsPattern().toUpperCase(Locale.ENGLISH);
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     *
+     * @param time
+     * @return
+     */
+    public long parseTime(String time) throws IllegalArgumentException {
+        long t;
+        if (StringUtils.isEmpty(time)) {
+            t = 0;
+        } else {
+            try {
+                t = Long.parseLong(time);
+            } catch (NumberFormatException e) {
+                throw new IllegalArgumentException(e);
+            }
+        }
+        if ("S".equals(tsPattern)) {
+            t = t * 1000;
+        }
+        return t;
+    }
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
index 594e6a4..32e4111 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
@@ -14,12 +14,13 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.stream.source.kafka;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -62,6 +63,8 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
     private List<TblColRef> allColumns;
     private boolean formatTs = false;//not used
     private String tsColName = "timestamp";
+    private String tsParser = null;
+    private AbstractTimeParser streamTimeParser;
 
     /**
      * the path of {"user" : {"name": "kite", "sex":"female"}}
@@ -88,6 +91,21 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
                 }
                 logger.info("Using parser field mapping by {}", parserInfo.getColumnToSourceFieldMapping());
             }
+            this.tsParser = parserInfo.getTsParser();
+
+            if (!StringUtils.isEmpty(tsParser)) {
+                try {
+                    Class clazz = Class.forName(tsParser);
+                    Constructor constructor = clazz.getConstructor(MessageParserInfo.class);
+                    streamTimeParser = (AbstractTimeParser) constructor.newInstance(parserInfo);
+                } catch (Exception e) {
+                    throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", tsPattern " + parserInfo.getTsPattern() + ".", e);
+                }
+            } else {
+                parserInfo.setTsParser("org.apache.kylin.stream.source.kafka.LongTimeParser");
+                parserInfo.setTsPattern("MS");
+                streamTimeParser = new LongTimeParser(parserInfo);
+            }
         }
         mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
         mapper.disable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE);
@@ -108,7 +126,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
             if (StringUtils.isEmpty(tsStr)) {
                 t = 0;
             } else {
-                t = Long.valueOf(tsStr);
+                t = streamTimeParser.parseTime(tsStr);
             }
             ArrayList<String> result = Lists.newArrayList();
 
@@ -133,7 +151,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
             }
 
             return new StreamingMessage(result, new KafkaPartitionPosition(record.partition(), record.offset()), t,
-                    Collections.<String, Object> emptyMap());
+                    Collections.<String, Object>emptyMap());
         } catch (IOException e) {
             logger.error("error", e);
             throw new RuntimeException(e);
diff --git a/webapp/app/partials/tables/table_detail.html b/webapp/app/partials/tables/table_detail.html
index 85026c9..0cf4ed8 100644
--- a/webapp/app/partials/tables/table_detail.html
+++ b/webapp/app/partials/tables/table_detail.html
@@ -196,6 +196,14 @@
                   </th>
                   <td>{{currentStreamingConfig.properties['bootstrap.servers']}}</td>
                 </tr>
+                <tr>
+                  <th>TSParse</th>
+                  <td>{{currentStreamingConfig.parser_info.ts_parser}}</td>
+                </tr>
+                <tr>
+                  <th>TSPattern</th>
+                  <td>{{currentStreamingConfig.parser_info.ts_pattern}}</td>
+                </tr>
               </tbody>
             </table>
           </div>