You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/27 14:21:15 UTC

kylin git commit: KYLIN-2054 TimedJsonStreamParser should support other time format

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2054 [created] 754899710


KYLIN-2054 TimedJsonStreamParser should support other time format

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/75489971
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/75489971
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/75489971

Branch: refs/heads/KYLIN-2054
Commit: 754899710961e773dcaa24242e948c4cd750a501
Parents: 859230d
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 27 22:21:15 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 22:21:15 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java | 13 +++
 source-kafka/pom.xml                            |  6 +-
 .../kylin/source/kafka/AbstractTimeParser.java  | 34 ++++++++
 .../kylin/source/kafka/DateTimeParser.java      | 84 ++++++++++++++++++++
 .../kylin/source/kafka/DefaultTimeParser.java   | 49 ++++++++++++
 .../source/kafka/TimedJsonStreamParser.java     | 31 ++++++--
 6 files changed, 208 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index ee15832..5eaa011 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -24,6 +24,7 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -33,6 +34,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang3.time.FastDateFormat;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
@@ -206,6 +208,17 @@ public class BasicTest {
         }
     }
 
+    @Test
+    @Ignore("for dev only")
+    public void test3() throws Exception {
+        FastDateFormat formatter = org.apache.kylin.common.util.DateFormat.getDateFormat("MMM dd, yyyy hh:mm:ss aa");
+        System.out.println(formatter.format(new Date()));
+
+        String timeStr = "Jul 20, 2016 9:59:17 AM";
+
+        System.out.println(formatter.parse(timeStr).getTime());
+    }
+
     private static String time(long t) {
         DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
         Calendar cal = Calendar.getInstance();

http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 212f4c6..f91ab8f 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -48,7 +48,11 @@
             <artifactId>kafka_2.10</artifactId>
             <scope>provided</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
new file mode 100644
index 0000000..96a4ece
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+/**
+ */
+public abstract class AbstractTimeParser {
+
+    public AbstractTimeParser(String[] properties) {
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     * @param time
+     * @return
+     */
+    abstract public long parseTime(String time) throws IllegalArgumentException;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
new file mode 100644
index 0000000..2bd699d
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.kylin.common.util.DateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+
+/**
+ */
+public class DateTimeParser extends AbstractTimeParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
+    private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS;
+
+    private FastDateFormat formatter = null;
+
+    //call by reflection
+    public DateTimeParser(String[] properties) {
+        super(properties);
+        for (String prop : properties) {
+            try {
+                String[] parts = prop.split("=");
+                if (parts.length == 2) {
+                    switch (parts[0]) {
+                    case "tsPattern":
+                        this.tsPattern = parts[1];
+                        break;
+                    default:
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("Failed to parse property " + prop);
+                //ignore
+            }
+        }
+
+        if (!StringUtils.isEmpty(tsPattern)) {
+            try {
+                formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+            } catch (Throwable e) {
+                throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+            }
+        } else {
+            throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+        }
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     *
+     * @param timeStr
+     * @return
+     */
+    public long parseTime(String timeStr) throws IllegalArgumentException {
+
+        try {
+            return formatter.parse(timeStr).getTime();
+        } catch (ParseException e) {
+            throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
new file mode 100644
index 0000000..85f2bfa
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ */
+public class DefaultTimeParser extends AbstractTimeParser {
+
+    public DefaultTimeParser(String[] properties) {
+        super(properties);
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     * @param time
+     * @return
+     */
+    public long parseTime(String time) throws IllegalArgumentException {
+        long t;
+        if (StringUtils.isEmpty(time)) {
+            t = 0;
+        } else {
+            try {
+                t = Long.valueOf(time);
+            } catch (NumberFormatException e) {
+                throw new IllegalArgumentException(e);
+            }
+        }
+        return t;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 148ae25..9faf880 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -49,12 +50,16 @@ public final class TimedJsonStreamParser extends StreamingParser {
     private List<TblColRef> allColumns;
     private final ObjectMapper mapper = new ObjectMapper();
     private String tsColName = "timestamp";
+    private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser";
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
 
+    private AbstractTimeParser streamTimeParser;
+
     public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
         this.allColumns = allColumns;
+        String[] properties = null;
         if (!StringUtils.isEmpty(propertiesStr)) {
-            String[] properties = propertiesStr.split(";");
+            properties = propertiesStr.split(";");
             for (String prop : properties) {
                 try {
                     String[] parts = prop.split("=");
@@ -63,6 +68,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
                         case "tsColName":
                             this.tsColName = parts[1];
                             break;
+                        case "tsParser":
+                            this.tsParser = parts[1];
+                            break;
                         default:
                             break;
                         }
@@ -75,21 +83,28 @@ public final class TimedJsonStreamParser extends StreamingParser {
         }
 
         logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
+
+        if (!StringUtils.isEmpty(tsParser)) {
+            try {
+                Class clazz = Class.forName(tsParser);
+                Constructor constructor = clazz.getConstructor(String[].class);
+                streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties);
+            } catch (Exception e) {
+                throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e);
+            }
+        } else {
+            throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".");
+        }
     }
 
     @Override
     public StreamingMessage parse(ByteBuffer buffer) {
         try {
             Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
-            Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+            Map<String, String> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
             String tsStr = root.get(tsColName);
-            long t;
-            if (StringUtils.isEmpty(tsStr)) {
-                t = 0;
-            } else {
-                t = Long.valueOf(tsStr);
-            }
+            long t = streamTimeParser.parseTime(tsStr);
             ArrayList<String> result = Lists.newArrayList();
 
             for (TblColRef column : allColumns) {