You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/07/07 08:38:06 UTC
[04/50] [abbrv] kylin git commit: minor,
add parserTimeStampField to KafkaConfig (#1405)
minor, add parserTimeStampField to KafkaConfig (#1405)
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b2fc2c22
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b2fc2c22
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b2fc2c22
Branch: refs/heads/KYLIN-2606
Commit: b2fc2c220ba66482447a7631f90363382bcaa422
Parents: 1acd066
Author: 成 <ch...@kyligence.io>
Authored: Wed Jun 28 11:23:51 2017 +0800
Committer: Billy(Yiming) Liu <li...@gmail.com>
Committed: Wed Jun 28 11:23:51 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../kylin/source/kafka/config/KafkaConfig.java | 22 ++++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b2fc2c22/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 3323afb..5bce4e7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -103,7 +103,7 @@ public class KafkaMRInput implements IMRInput {
this.cubeSegment = cubeSegment;
this.conf = conf;
try {
- streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
+ streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns);
} catch (ReflectiveOperationException e) {
throw new IllegalArgumentException(e);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b2fc2c22/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index a096344..547e738 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -58,6 +58,9 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("parserName")
private String parserName;
+ @JsonProperty("parserTimeStampField")
+ private String parserTimeStampField;
+
@Deprecated
@JsonProperty("margin")
private long margin;
@@ -120,6 +123,14 @@ public class KafkaConfig extends RootPersistentEntity {
this.margin = margin;
}
+ public void setParserTimeStampField(String parserTimeStampField) {
+ this.parserTimeStampField = parserTimeStampField;
+ }
+
+ public String getParserTimeStampField() {
+ return this.parserTimeStampField;
+ }
+
public String getParserProperties() {
return parserProperties;
}
@@ -128,6 +139,17 @@ public class KafkaConfig extends RootPersistentEntity {
this.parserProperties = parserProperties;
}
+ public String getAllParserProperties() {
+ StringBuilder sb = new StringBuilder();
+ if (parserProperties != null)
+ sb.append(parserProperties);
+ if (parserTimeStampField != null) {
+ sb.append(";");
+ sb.append(parserTimeStampField);
+ }
+ return sb.toString();
+ }
+
@Override
public KafkaConfig clone() {
try {