You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/04 08:32:05 UTC
[20/30] kylin git commit: KYLIN-2054 TimedJsonStreamParser should
support other time format
KYLIN-2054 TimedJsonStreamParser should support other time format
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e5007261
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e5007261
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e5007261
Branch: refs/heads/master-hbase1.x
Commit: e500726184b318da2d1f503cd1b159cfe7242347
Parents: 01d5670
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 27 22:21:15 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Sep 28 17:00:46 2016 +0800
----------------------------------------------------------------------
build/bin/kylin.sh | 3 +-
.../org/apache/kylin/common/util/BasicTest.java | 13 +++
source-kafka/pom.xml | 6 +-
.../kylin/source/kafka/AbstractTimeParser.java | 34 ++++++++
.../kylin/source/kafka/DateTimeParser.java | 84 ++++++++++++++++++++
.../kylin/source/kafka/DefaultTimeParser.java | 49 ++++++++++++
.../apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../kylin/source/kafka/SeekOffsetStep.java | 2 +-
.../source/kafka/TimedJsonStreamParser.java | 46 ++++++++---
9 files changed, 222 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 7a9d2a1..e767492 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -31,7 +31,7 @@ function retrieveDependency() {
#retrive $hive_dependency and $hbase_dependency
source ${dir}/find-hive-dependency.sh
source ${dir}/find-hbase-dependency.sh
- source ${dir}/find-kafka-dependency.sh
+ #source ${dir}/find-kafka-dependency.sh
#retrive $KYLIN_EXTRA_START_OPTS
if [ -f "${dir}/setenv.sh" ]
@@ -40,6 +40,7 @@ function retrieveDependency() {
export HBASE_CLASSPATH_PREFIX=${KYLIN_HOME}/conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH_PREFIX}
export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency}
+ #export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency}:${kafka_dependency}
}
# start command
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index ee15832..5eaa011 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -24,6 +24,7 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -33,6 +34,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.time.FastDateFormat;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.LoggerFactory;
@@ -206,6 +208,17 @@ public class BasicTest {
}
}
+ @Test
+ @Ignore("for dev only")
+ public void test3() throws Exception {
+ FastDateFormat formatter = org.apache.kylin.common.util.DateFormat.getDateFormat("MMM dd, yyyy hh:mm:ss aa");
+ System.out.println(formatter.format(new Date()));
+
+ String timeStr = "Jul 20, 2016 9:59:17 AM";
+
+ System.out.println(formatter.parse(timeStr).getTime());
+ }
+
private static String time(long t) {
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Calendar cal = Calendar.getInstance();
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 212f4c6..f91ab8f 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -48,7 +48,11 @@
<artifactId>kafka_2.10</artifactId>
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
<!-- Env & Test -->
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
new file mode 100644
index 0000000..96a4ece
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+/**
+ */
+public abstract class AbstractTimeParser {
+
+ public AbstractTimeParser(String[] properties) {
+ }
+
+ /**
+ * Parse a string time to a long value (epoch time)
+ * @param time
+ * @return
+ */
+ abstract public long parseTime(String time) throws IllegalArgumentException;
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
new file mode 100644
index 0000000..2bd699d
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.kylin.common.util.DateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+
+/**
+ */
+public class DateTimeParser extends AbstractTimeParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
+ private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS;
+
+ private FastDateFormat formatter = null;
+
+ //call by reflection
+ public DateTimeParser(String[] properties) {
+ super(properties);
+ for (String prop : properties) {
+ try {
+ String[] parts = prop.split("=");
+ if (parts.length == 2) {
+ switch (parts[0]) {
+ case "tsPattern":
+ this.tsPattern = parts[1];
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Failed to parse property " + prop);
+ //ignore
+ }
+ }
+
+ if (!StringUtils.isEmpty(tsPattern)) {
+ try {
+ formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+ } catch (Throwable e) {
+ throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+ }
+ } else {
+ throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
+ }
+ }
+
+ /**
+ * Parse a string time to a long value (epoch time)
+ *
+ * @param timeStr
+ * @return
+ */
+ public long parseTime(String timeStr) throws IllegalArgumentException {
+
+ try {
+ return formatter.parse(timeStr).getTime();
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
new file mode 100644
index 0000000..85f2bfa
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ */
+public class DefaultTimeParser extends AbstractTimeParser {
+
+ public DefaultTimeParser(String[] properties) {
+ super(properties);
+ }
+
+ /**
+ * Parse a string time to a long value (epoch time)
+ * @param time
+ * @return
+ */
+ public long parseTime(String time) throws IllegalArgumentException {
+ long t;
+ if (StringUtils.isEmpty(time)) {
+ t = 0;
+ } else {
+ try {
+ t = Long.valueOf(time);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ return t;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 729719a..6358ee1 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -114,7 +114,7 @@ public class KafkaMRInput implements IMRInput {
try {
streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
} catch (ReflectiveOperationException e) {
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException(e);
}
}
Text text = (Text) mapperInput;
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index 9369e6f..e1282d6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -120,7 +120,7 @@ public class SeekOffsetStep extends AbstractExecutable {
} catch (IOException e) {
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
}
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
} else {
CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToRemoveSegs(segment);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 148ae25..2125c05 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -19,6 +19,7 @@
package org.apache.kylin.source.kafka;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
@@ -47,14 +49,18 @@ public final class TimedJsonStreamParser extends StreamingParser {
private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
private List<TblColRef> allColumns;
- private final ObjectMapper mapper = new ObjectMapper();
+ private final ObjectMapper mapper;
private String tsColName = "timestamp";
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+ private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser";
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
+
+ private AbstractTimeParser streamTimeParser;
public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
this.allColumns = allColumns;
+ String[] properties = null;
if (!StringUtils.isEmpty(propertiesStr)) {
- String[] properties = propertiesStr.split(";");
+ properties = propertiesStr.split(";");
for (String prop : properties) {
try {
String[] parts = prop.split("=");
@@ -63,6 +69,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
case "tsColName":
this.tsColName = parts[1];
break;
+ case "tsParser":
+ this.tsParser = parts[1];
+ break;
default:
break;
}
@@ -75,28 +84,39 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
+
+ if (!StringUtils.isEmpty(tsParser)) {
+ try {
+ Class clazz = Class.forName(tsParser);
+ Constructor constructor = clazz.getConstructor(String[].class);
+ streamTimeParser = (AbstractTimeParser) constructor.newInstance((Object)properties);
+ } catch (Exception e) {
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e);
+ }
+ } else {
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".");
+ }
+ mapper = new ObjectMapper();
+ mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+ mapper.disable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE);
+ mapper.enable(DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY);
}
@Override
public StreamingMessage parse(ByteBuffer buffer) {
try {
- Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
- Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+ Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
+ Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
root.putAll(message);
- String tsStr = root.get(tsColName);
- long t;
- if (StringUtils.isEmpty(tsStr)) {
- t = 0;
- } else {
- t = Long.valueOf(tsStr);
- }
+ String tsStr = String.valueOf(root.get(tsColName));
+ long t = streamTimeParser.parseTime(tsStr);
ArrayList<String> result = Lists.newArrayList();
for (TblColRef column : allColumns) {
String columnName = column.getName().toLowerCase();
if (populateDerivedTimeColumns(columnName, result, t) == false) {
- String x = root.get(columnName);
+ String x = String.valueOf(root.get(columnName));
result.add(x);
}
}