You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/04 08:32:05 UTC

[20/30] kylin git commit: KYLIN-2054 TimedJsonStreamParser should support other time format

KYLIN-2054 TimedJsonStreamParser should support other time format


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e5007261
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e5007261
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e5007261

Branch: refs/heads/master-hbase1.x
Commit: e500726184b318da2d1f503cd1b159cfe7242347
Parents: 01d5670
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 27 22:21:15 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Sep 28 17:00:46 2016 +0800

----------------------------------------------------------------------
 build/bin/kylin.sh                              |  3 +-
 .../org/apache/kylin/common/util/BasicTest.java | 13 +++
 source-kafka/pom.xml                            |  6 +-
 .../kylin/source/kafka/AbstractTimeParser.java  | 34 ++++++++
 .../kylin/source/kafka/DateTimeParser.java      | 84 ++++++++++++++++++++
 .../kylin/source/kafka/DefaultTimeParser.java   | 49 ++++++++++++
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../kylin/source/kafka/SeekOffsetStep.java      |  2 +-
 .../source/kafka/TimedJsonStreamParser.java     | 46 ++++++++---
 9 files changed, 222 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 7a9d2a1..e767492 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -31,7 +31,7 @@ function retrieveDependency() {
     #retrive $hive_dependency and $hbase_dependency
     source ${dir}/find-hive-dependency.sh
     source ${dir}/find-hbase-dependency.sh
-    source ${dir}/find-kafka-dependency.sh
+    #source ${dir}/find-kafka-dependency.sh
 
     #retrive $KYLIN_EXTRA_START_OPTS
     if [ -f "${dir}/setenv.sh" ]
@@ -40,6 +40,7 @@ function retrieveDependency() {
 
     export HBASE_CLASSPATH_PREFIX=${KYLIN_HOME}/conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH_PREFIX}
     export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency}
+    #export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency}:${kafka_dependency}
 }
 
 # start command

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index ee15832..5eaa011 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -24,6 +24,7 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -33,6 +34,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang3.time.FastDateFormat;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
@@ -206,6 +208,17 @@ public class BasicTest {
         }
     }
 
+    @Test
+    @Ignore("for dev only")
+    public void test3() throws Exception {
+        FastDateFormat formatter = org.apache.kylin.common.util.DateFormat.getDateFormat("MMM dd, yyyy hh:mm:ss aa");
+        System.out.println(formatter.format(new Date()));
+
+        String timeStr = "Jul 20, 2016 9:59:17 AM";
+
+        System.out.println(formatter.parse(timeStr).getTime());
+    }
+
     private static String time(long t) {
         DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
         Calendar cal = Calendar.getInstance();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 212f4c6..f91ab8f 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -48,7 +48,11 @@
             <artifactId>kafka_2.10</artifactId>
             <scope>provided</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
new file mode 100644
index 0000000..96a4ece
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+/**
+ */
+public abstract class AbstractTimeParser {
+
+    public AbstractTimeParser(String[] properties) {
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     * @param time
+     * @return
+     */
+    abstract public long parseTime(String time) throws IllegalArgumentException;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
new file mode 100644
index 0000000..2bd699d
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.kylin.common.util.DateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+
+/**
+ */
+public class DateTimeParser extends AbstractTimeParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
+    private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS;
+
+    private FastDateFormat formatter = null;
+
+    //call by reflection
+    public DateTimeParser(String[] properties) {
+        super(properties);
+        for (String prop : properties) {
+            try {
+                String[] parts = prop.split("=");
+                if (parts.length == 2) {
+                    switch (parts[0]) {
+                    case "tsPattern":
+                        this.tsPattern = parts[1];
+                        break;
+                    default:
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("Failed to parse property " + prop);
+                //ignore
+            }
+        }
+
+        if (!StringUtils.isEmpty(tsPattern)) {
+            try {
+                formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+            } catch (Throwable e) {
+                throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+            }
+        } else {
+            throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+        }
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     *
+     * @param timeStr
+     * @return
+     */
+    public long parseTime(String timeStr) throws IllegalArgumentException {
+
+        try {
+            return formatter.parse(timeStr).getTime();
+        } catch (ParseException e) {
+            throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
new file mode 100644
index 0000000..85f2bfa
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ */
+public class DefaultTimeParser extends AbstractTimeParser {
+
+    public DefaultTimeParser(String[] properties) {
+        super(properties);
+    }
+
+    /**
+     * Parse a string time to a long value (epoch time)
+     * @param time
+     * @return
+     */
+    public long parseTime(String time) throws IllegalArgumentException {
+        long t;
+        if (StringUtils.isEmpty(time)) {
+            t = 0;
+        } else {
+            try {
+                t = Long.valueOf(time);
+            } catch (NumberFormatException e) {
+                throw new IllegalArgumentException(e);
+            }
+        }
+        return t;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 729719a..6358ee1 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -114,7 +114,7 @@ public class KafkaMRInput implements IMRInput {
                 try {
                     streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
                 } catch (ReflectiveOperationException e) {
-                    throw new IllegalArgumentException();
+                    throw new IllegalArgumentException(e);
                 }
             }
             Text text = (Text) mapperInput;

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index 9369e6f..e1282d6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -120,7 +120,7 @@ public class SeekOffsetStep extends AbstractExecutable {
             } catch (IOException e) {
                 return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
             }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
         } else {
             CubeUpdate cubeBuilder = new CubeUpdate(cube);
             cubeBuilder.setToRemoveSegs(segment);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 148ae25..2125c05 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -47,14 +49,18 @@ public final class TimedJsonStreamParser extends StreamingParser {
     private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
 
     private List<TblColRef> allColumns;
-    private final ObjectMapper mapper = new ObjectMapper();
+    private final ObjectMapper mapper;
     private String tsColName = "timestamp";
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+    private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser";
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
+
+    private AbstractTimeParser streamTimeParser;
 
     public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
         this.allColumns = allColumns;
+        String[] properties = null;
         if (!StringUtils.isEmpty(propertiesStr)) {
-            String[] properties = propertiesStr.split(";");
+            properties = propertiesStr.split(";");
             for (String prop : properties) {
                 try {
                     String[] parts = prop.split("=");
@@ -63,6 +69,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
                         case "tsColName":
                             this.tsColName = parts[1];
                             break;
+                        case "tsParser":
+                            this.tsParser = parts[1];
+                            break;
                         default:
                             break;
                         }
@@ -75,28 +84,39 @@ public final class TimedJsonStreamParser extends StreamingParser {
         }
 
         logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
+
+        if (!StringUtils.isEmpty(tsParser)) {
+            try {
+                Class clazz = Class.forName(tsParser);
+                Constructor constructor = clazz.getConstructor(String[].class);
+                streamTimeParser = (AbstractTimeParser) constructor.newInstance((Object)properties);
+            } catch (Exception e) {
+                throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e);
+            }
+        } else {
+            throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".");
+        }
+        mapper = new ObjectMapper();
+        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+        mapper.disable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE);
+        mapper.enable(DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY);
     }
 
     @Override
     public StreamingMessage parse(ByteBuffer buffer) {
         try {
-            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
-            Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+            Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
+            Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
-            String tsStr = root.get(tsColName);
-            long t;
-            if (StringUtils.isEmpty(tsStr)) {
-                t = 0;
-            } else {
-                t = Long.valueOf(tsStr);
-            }
+            String tsStr = String.valueOf(root.get(tsColName));
+            long t = streamTimeParser.parseTime(tsStr);
             ArrayList<String> result = Lists.newArrayList();
 
             for (TblColRef column : allColumns) {
                 String columnName = column.getName().toLowerCase();
 
                 if (populateDerivedTimeColumns(columnName, result, t) == false) {
-                    String x = root.get(columnName);
+                    String x = String.valueOf(root.get(columnName));
                     result.add(x);
                 }
             }