You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2019/05/17 10:19:07 UTC

[kylin] branch master updated (ee95242 -> 876d420)

This is an automated email from the ASF dual-hosted git repository.

magang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from ee95242  Small performance improvements
     new e09d9d7  KYLIN-4001 Allow user-specified time format using real-time for backend
     new 876d420  KYLIN-4001 Allow user-specified time format using real-time for ui

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../stream/core/source/MessageParserInfo.java      | 22 +++++++++++++++++++
 .../stream}/source/kafka/AbstractTimeParser.java   |  8 ++++---
 .../kylin/stream}/source/kafka/DateTimeParser.java | 25 ++++++++--------------
 .../kylin/stream/source/kafka/LongTimeParser.java  | 24 +++++++++++++++------
 .../stream/source/kafka/TimedJsonStreamParser.java | 24 ++++++++++++++++++---
 webapp/app/js/controllers/sourceMeta.js            |  8 ++++++-
 .../partials/tables/loadStreamingTableConfig.html  | 16 ++++++++++++--
 webapp/app/partials/tables/table_detail.html       |  8 +++++++
 8 files changed, 104 insertions(+), 31 deletions(-)
 copy {source-kafka/src/main/java/org/apache/kylin => stream-source-kafka/src/main/java/org/apache/kylin/stream}/source/kafka/AbstractTimeParser.java (83%)
 copy {source-kafka/src/main/java/org/apache/kylin => stream-source-kafka/src/main/java/org/apache/kylin/stream}/source/kafka/DateTimeParser.java (82%)
 copy source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java => stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java (67%)


[kylin] 01/02: KYLIN-4001 Allow user-specified time format using real-time for backend

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

magang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e09d9d7b37a688b5bebd8f93aea41d1eaf3bcfec
Author: ning.guo <35...@qq.com>
AuthorDate: Fri May 17 12:38:47 2019 +0800

    KYLIN-4001 Allow user-specified time format using real-time for backend
---
 .../stream/core/source/MessageParserInfo.java      | 22 ++++++++
 .../stream/source/kafka/AbstractTimeParser.java    | 38 +++++++++++++
 .../kylin/stream/source/kafka/DateTimeParser.java  | 55 +++++++++++++++++++
 .../kylin/stream/source/kafka/LongTimeParser.java  | 63 ++++++++++++++++++++++
 .../stream/source/kafka/TimedJsonStreamParser.java | 24 +++++++--
 webapp/app/partials/tables/table_detail.html       |  8 +++
 6 files changed, 207 insertions(+), 3 deletions(-)

diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
index 89e36dc..4070ae6 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
@@ -28,6 +28,12 @@ public class MessageParserInfo {
     @JsonProperty("ts_col_name")
     private String tsColName;
 
+    @JsonProperty("ts_parser")
+    private String tsParser;
+
+    @JsonProperty("ts_pattern")
+    private String tsPattern;
+
     @JsonProperty("format_ts")
     private boolean formatTs;
 
@@ -42,6 +48,22 @@ public class MessageParserInfo {
         this.tsColName = tsColName;
     }
 
+    public String getTsParser() {
+        return tsParser;
+    }
+
+    public void setTsParser(String tsParser) {
+        this.tsParser = tsParser;
+    }
+
+    public String getTsPattern() {
+        return tsPattern;
+    }
+
+    public void setTsPattern(String tsPattern) {
+        this.tsPattern = tsPattern;
+    }
+
     public boolean isFormatTs() {
         return formatTs;
     }
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java
new file mode 100644
index 0000000..74a5e9b
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public abstract class AbstractTimeParser {
+
+    public AbstractTimeParser(MessageParserInfo parserInfo) {
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     * @param time
+     * @return
+     */
+    abstract public long parseTime(String time) throws IllegalArgumentException;
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java
new file mode 100644
index 0000000..0ae2239
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public class DateTimeParser extends AbstractTimeParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
+    private String tsPattern  = null;
+    private FastDateFormat formatter = null;
+
+
+    public DateTimeParser(MessageParserInfo parserInfo) {
+        super(parserInfo);
+        tsPattern = parserInfo.getTsPattern();
+        try {
+            formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+        } catch (Throwable e) {
+            throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+        }
+    }
+
+    @Override
+    public long parseTime(String timeStr) throws IllegalArgumentException {
+        try {
+            return formatter.parse(timeStr).getTime();
+        } catch (Throwable e) {
+            throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e);
+        }
+    }
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java
new file mode 100644
index 0000000..de88847
--- /dev/null
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.stream.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+
+/**
+ * Created by guoning on 2019-04-29.
+ */
+public class LongTimeParser extends AbstractTimeParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(LongTimeParser.class);
+    private String tsPattern = null;
+
+    public LongTimeParser(MessageParserInfo parserInfo) {
+        super(parserInfo);
+        tsPattern = parserInfo.getTsPattern().toUpperCase(Locale.ENGLISH);
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     *
+     * @param time
+     * @return
+     */
+    public long parseTime(String time) throws IllegalArgumentException {
+        long t;
+        if (StringUtils.isEmpty(time)) {
+            t = 0;
+        } else {
+            try {
+                t = Long.parseLong(time);
+            } catch (NumberFormatException e) {
+                throw new IllegalArgumentException(e);
+            }
+        }
+        if ("S".equals(tsPattern)) {
+            t = t * 1000;
+        }
+        return t;
+    }
+}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
index 594e6a4..32e4111 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
@@ -14,12 +14,13 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.stream.source.kafka;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -62,6 +63,8 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
     private List<TblColRef> allColumns;
     private boolean formatTs = false;//not used
     private String tsColName = "timestamp";
+    private String tsParser = null;
+    private AbstractTimeParser streamTimeParser;
 
     /**
      * the path of {"user" : {"name": "kite", "sex":"female"}}
@@ -88,6 +91,21 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
                 }
                 logger.info("Using parser field mapping by {}", parserInfo.getColumnToSourceFieldMapping());
             }
+            this.tsParser = parserInfo.getTsParser();
+
+            if (!StringUtils.isEmpty(tsParser)) {
+                try {
+                    Class clazz = Class.forName(tsParser);
+                    Constructor constructor = clazz.getConstructor(MessageParserInfo.class);
+                    streamTimeParser = (AbstractTimeParser) constructor.newInstance(parserInfo);
+                } catch (Exception e) {
+                    throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", tsPattern " + parserInfo.getTsPattern() + ".", e);
+                }
+            } else {
+                parserInfo.setTsParser("org.apache.kylin.stream.source.kafka.LongTimeParser");
+                parserInfo.setTsPattern("MS");
+                streamTimeParser = new LongTimeParser(parserInfo);
+            }
         }
         mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
         mapper.disable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE);
@@ -108,7 +126,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
             if (StringUtils.isEmpty(tsStr)) {
                 t = 0;
             } else {
-                t = Long.valueOf(tsStr);
+                t = streamTimeParser.parseTime(tsStr);
             }
             ArrayList<String> result = Lists.newArrayList();
 
@@ -133,7 +151,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
             }
 
             return new StreamingMessage(result, new KafkaPartitionPosition(record.partition(), record.offset()), t,
-                    Collections.<String, Object> emptyMap());
+                    Collections.<String, Object>emptyMap());
         } catch (IOException e) {
             logger.error("error", e);
             throw new RuntimeException(e);
diff --git a/webapp/app/partials/tables/table_detail.html b/webapp/app/partials/tables/table_detail.html
index 85026c9..0cf4ed8 100644
--- a/webapp/app/partials/tables/table_detail.html
+++ b/webapp/app/partials/tables/table_detail.html
@@ -196,6 +196,14 @@
                   </th>
                   <td>{{currentStreamingConfig.properties['bootstrap.servers']}}</td>
                 </tr>
+                <tr>
+                  <th>TSParse</th>
+                  <td>{{currentStreamingConfig.parser_info.ts_parser}}</td>
+                </tr>
+                <tr>
+                  <th>TSPattern</th>
+                  <td>{{currentStreamingConfig.parser_info.ts_pattern}}</td>
+                </tr>
               </tbody>
             </table>
           </div>


[kylin] 02/02: KYLIN-4001 Allow user-specified time format using real-time for ui

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

magang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 876d420812a09270455bfbb78204010c81cab612
Author: ning.guo <35...@qq.com>
AuthorDate: Fri May 17 12:39:03 2019 +0800

    KYLIN-4001 Allow user-specified time format using real-time for ui
---
 webapp/app/js/controllers/sourceMeta.js                  |  8 +++++++-
 webapp/app/partials/tables/loadStreamingTableConfig.html | 16 ++++++++++++++--
 2 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index 4ee196f..32a5c6b 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -965,6 +965,8 @@ KylinApp
             dataTypeArr: tableConfig.dataTypes,
             TSColumnArr: [],
             TSColumnSelected: '',
+            TSParser: 'org.apache.kylin.stream.source.kafka.LongTimeParser',
+            TSPattern: 'MS',
             errMsg: ''
           };
           $scope.tableData = {
@@ -1020,6 +1022,8 @@ KylinApp
         dataTypeArr: tableConfig.dataTypes,
         TSColumnArr: [],
         TSColumnSelected: '',
+        TSParser: '',
+        TSPattern: '',
         errMsg: '',
         lambda: false
       };
@@ -1251,7 +1255,9 @@ KylinApp
         }
         // Set ts column
         $scope.streamingConfig.parser_info.ts_col_name = $scope.streaming.TSColumnSelected;
-        $scope.streamingConfig.parser_info.field_mapping = {}
+        $scope.streamingConfig.parser_info.ts_parser = $scope.streaming.TSParser;
+        $scope.streamingConfig.parser_info.ts_pattern = $scope.streaming.TSPattern;
+        $scope.streamingConfig.parser_info.field_mapping = {};
         $scope.tableData.columns.forEach(function(col) {
           if (col.comment) {
             $scope.streamingConfig.parser_info.field_mapping[col.name] = col.comment.replace(/\|/g, '.') || ''
diff --git a/webapp/app/partials/tables/loadStreamingTableConfig.html b/webapp/app/partials/tables/loadStreamingTableConfig.html
index ed77f50..2dbdcac 100644
--- a/webapp/app/partials/tables/loadStreamingTableConfig.html
+++ b/webapp/app/partials/tables/loadStreamingTableConfig.html
@@ -70,7 +70,19 @@
         <div class="form-group required">
           <label for="TSColumn" class="col-sm-3 control-label font-color-default">TSColumn</label>
           <div class="col-sm-6">
-            <select chosen ng-model="streaming.TSColumnSelected" ng-options="TSColumn as TSColumn for TSColumn in streaming.TSColumnArr" data-placeholder="select a column name" style="width: 120px !important;" class="chosen-select" ng-change="updateTSColumn(TSColumn)"></select>
+            <select id="TSColumn" chosen ng-model="streaming.TSColumnSelected" ng-options="TSColumn as TSColumn for TSColumn in streaming.TSColumnArr" data-placeholder="select a column name" style="width: 120px !important;" class="chosen-select" ng-change="updateTSColumn(TSColumn)"></select>
+          </div>
+        </div>
+        <div class="form-group">
+          <label for="TSParser" class="col-sm-3 control-label font-color-default">TSParser</label>
+          <div class="col-sm-9">
+            <input type="text" class="form-control" id="TSParser" ng-model="streaming.TSParser" name="TSParser" placeholder="Input TSParser eg org.apache.kylin.stream.source.kafka.TimedJsonStreamParser" required>
+          </div>
+        </div>
+        <div class="form-group">
+          <label for="TSPattern" class="col-sm-3 control-label font-color-default">TSPattern</label>
+          <div class="col-sm-9">
+            <input type="text" class="form-control" id="TSPattern" ng-model="streaming.TSPattern" name="TSPattern" placeholder="Input TSPattern eg. yyyy-MM-dd HH:mm:ss">
           </div>
         </div>
 
@@ -137,4 +149,4 @@
       </div>
     </div>
   </div>
-</form>
\ No newline at end of file
+</form>