You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/27 14:21:15 UTC
kylin git commit: KYLIN-2054 TimedJsonStreamParser should support
other time format
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2054 [created] 754899710
KYLIN-2054 TimedJsonStreamParser should support other time format
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/75489971
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/75489971
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/75489971
Branch: refs/heads/KYLIN-2054
Commit: 754899710961e773dcaa24242e948c4cd750a501
Parents: 859230d
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 27 22:21:15 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 22:21:15 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BasicTest.java | 13 +++
source-kafka/pom.xml | 6 +-
.../kylin/source/kafka/AbstractTimeParser.java | 34 ++++++++
.../kylin/source/kafka/DateTimeParser.java | 84 ++++++++++++++++++++
.../kylin/source/kafka/DefaultTimeParser.java | 49 ++++++++++++
.../source/kafka/TimedJsonStreamParser.java | 31 ++++++--
6 files changed, 208 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index ee15832..5eaa011 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -24,6 +24,7 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -33,6 +34,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.time.FastDateFormat;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.LoggerFactory;
@@ -206,6 +208,17 @@ public class BasicTest {
}
}
+ @Test
+ @Ignore("for dev only")
+ public void test3() throws Exception {
+ FastDateFormat formatter = org.apache.kylin.common.util.DateFormat.getDateFormat("MMM dd, yyyy hh:mm:ss aa");
+ System.out.println(formatter.format(new Date()));
+
+ String timeStr = "Jul 20, 2016 9:59:17 AM";
+
+ System.out.println(formatter.parse(timeStr).getTime());
+ }
+
private static String time(long t) {
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Calendar cal = Calendar.getInstance();
http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 212f4c6..f91ab8f 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -48,7 +48,11 @@
<artifactId>kafka_2.10</artifactId>
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
<!-- Env & Test -->
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
new file mode 100644
index 0000000..96a4ece
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+/**
+ */
+public abstract class AbstractTimeParser {
+
+ public AbstractTimeParser(String[] properties) {
+ }
+
+ /**
+ * Parse a string time to a long value (epoch time)
+ * @param time
+ * @return
+ */
+ abstract public long parseTime(String time) throws IllegalArgumentException;
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
new file mode 100644
index 0000000..2bd699d
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.kylin.common.util.DateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+
+/**
+ */
+public class DateTimeParser extends AbstractTimeParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
+ private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS;
+
+ private FastDateFormat formatter = null;
+
+ //call by reflection
+ public DateTimeParser(String[] properties) {
+ super(properties);
+ for (String prop : properties) {
+ try {
+ String[] parts = prop.split("=");
+ if (parts.length == 2) {
+ switch (parts[0]) {
+ case "tsPattern":
+ this.tsPattern = parts[1];
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Failed to parse property " + prop);
+ //ignore
+ }
+ }
+
+ if (!StringUtils.isEmpty(tsPattern)) {
+ try {
+ formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+ } catch (Throwable e) {
+ throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+ }
+ } else {
+ throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+ }
+ }
+
+ /**
+ * Parse a string time to a long value (epoch time)
+ *
+ * @param timeStr
+ * @return
+ */
+ public long parseTime(String timeStr) throws IllegalArgumentException {
+
+ try {
+ return formatter.parse(timeStr).getTime();
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
new file mode 100644
index 0000000..85f2bfa
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ */
+public class DefaultTimeParser extends AbstractTimeParser {
+
+ public DefaultTimeParser(String[] properties) {
+ super(properties);
+ }
+
+ /**
+ * Parse a string time to a long value (epoch time)
+ * @param time
+ * @return
+ */
+ public long parseTime(String time) throws IllegalArgumentException {
+ long t;
+ if (StringUtils.isEmpty(time)) {
+ t = 0;
+ } else {
+ try {
+ t = Long.valueOf(time);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ return t;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75489971/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 148ae25..9faf880 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -19,6 +19,7 @@
package org.apache.kylin.source.kafka;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -49,12 +50,16 @@ public final class TimedJsonStreamParser extends StreamingParser {
private List<TblColRef> allColumns;
private final ObjectMapper mapper = new ObjectMapper();
private String tsColName = "timestamp";
+ private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser";
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+ private AbstractTimeParser streamTimeParser;
+
public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
this.allColumns = allColumns;
+ String[] properties = null;
if (!StringUtils.isEmpty(propertiesStr)) {
- String[] properties = propertiesStr.split(";");
+ properties = propertiesStr.split(";");
for (String prop : properties) {
try {
String[] parts = prop.split("=");
@@ -63,6 +68,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
case "tsColName":
this.tsColName = parts[1];
break;
+ case "tsParser":
+ this.tsParser = parts[1];
+ break;
default:
break;
}
@@ -75,21 +83,28 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
+
+ if (!StringUtils.isEmpty(tsParser)) {
+ try {
+ Class clazz = Class.forName(tsParser);
+ Constructor constructor = clazz.getConstructor(String[].class);
+ streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties);
+ } catch (Exception e) {
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e);
+ }
+ } else {
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".");
+ }
}
@Override
public StreamingMessage parse(ByteBuffer buffer) {
try {
Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
- Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+ Map<String, String> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
root.putAll(message);
String tsStr = root.get(tsColName);
- long t;
- if (StringUtils.isEmpty(tsStr)) {
- t = 0;
- } else {
- t = Long.valueOf(tsStr);
- }
+ long t = streamTimeParser.parseTime(tsStr);
ArrayList<String> result = Lists.newArrayList();
for (TblColRef column : allColumns) {