You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/07/07 08:38:06 UTC

[04/50] [abbrv] kylin git commit: minor, add parserTimeStampField to KafkaConfig (#1405)

minor, add parserTimeStampField to KafkaConfig (#1405)



Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b2fc2c22
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b2fc2c22
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b2fc2c22

Branch: refs/heads/KYLIN-2606
Commit: b2fc2c220ba66482447a7631f90363382bcaa422
Parents: 1acd066
Author: 成 <ch...@kyligence.io>
Authored: Wed Jun 28 11:23:51 2017 +0800
Committer: Billy(Yiming) Liu <li...@gmail.com>
Committed: Wed Jun 28 11:23:51 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../kylin/source/kafka/config/KafkaConfig.java  | 22 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b2fc2c22/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 3323afb..5bce4e7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -103,7 +103,7 @@ public class KafkaMRInput implements IMRInput {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
             try {
-                streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
+                streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns);
             } catch (ReflectiveOperationException e) {
                 throw new IllegalArgumentException(e);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b2fc2c22/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index a096344..547e738 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -58,6 +58,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("parserName")
     private String parserName;
 
+    @JsonProperty("parserTimeStampField")
+    private String parserTimeStampField;
+
     @Deprecated
     @JsonProperty("margin")
     private long margin;
@@ -120,6 +123,14 @@ public class KafkaConfig extends RootPersistentEntity {
         this.margin = margin;
     }
 
+    public void setParserTimeStampField(String parserTimeStampField) {
+        this.parserTimeStampField = parserTimeStampField;
+    }
+
+    public String getParserTimeStampField() {
+        return this.parserTimeStampField;
+    }
+
     public String getParserProperties() {
         return parserProperties;
     }
@@ -128,6 +139,17 @@ public class KafkaConfig extends RootPersistentEntity {
         this.parserProperties = parserProperties;
     }
 
+    public String getAllParserProperties() {
+        StringBuilder sb = new StringBuilder();
+        if (parserProperties != null)
+            sb.append(parserProperties);
+        if (parserTimeStampField != null) {
+            sb.append(";");
+            sb.append(parserTimeStampField);
+        }
+        return sb.toString();
+    }
+
     @Override
     public KafkaConfig clone() {
         try {