You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/17 07:04:20 UTC

[incubator-seatunnel] branch dev updated: [Improve][SourceConnector] Unifie InfluxDB source fields to schema (#3897)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 85a984a64 [Improve][SourceConnector] Unifie InfluxDB source fields to schema (#3897)
85a984a64 is described below

commit 85a984a64fc1d529098b6c356207869b4edcbd25
Author: wfrong <10...@qq.com>
AuthorDate: Tue Jan 17 15:04:15 2023 +0800

    [Improve][SourceConnector] Unifie InfluxDB source fields to schema (#3897)
    
    * Unifie InfluxDB source fields to schema
---
 docs/en/connector-v2/source/InfluxDB.md            | 39 ++++++++++++----------
 release-note.md                                    |  1 +
 .../seatunnel/influxdb/source/InfluxDBSource.java  |  6 ++--
 .../influxdb/source/InfluxDBSourceFactory.java     |  4 ++-
 .../src/test/resources/influxdb-to-influxdb.conf   | 22 ++++++------
 5 files changed, 40 insertions(+), 32 deletions(-)

diff --git a/docs/en/connector-v2/source/InfluxDB.md b/docs/en/connector-v2/source/InfluxDB.md
index 67c9c362a..6b737870b 100644
--- a/docs/en/connector-v2/source/InfluxDB.md
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -25,7 +25,7 @@ supports query SQL and can achieve projection effect.
 |--------------------|--------|----------|---------------|
 | url                | string | yes      | -             |
 | sql                | string | yes      | -             |
-| fields             | config | yes      | -             |
+| schema             | config | yes      | -             |
 | database           | string | yes      |               |
 | username           | string | no       | -             |
 | password           | string | no       | -             |
@@ -51,19 +51,20 @@ The query sql used to search data
 select name,age from test
 ```
 
-### fields [string]
+### schema [config]
 
-the fields of the InfluxDB when you select
-
-the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+#### fields [Config]
 
+The schema information of upstream data.
 e.g.
 
 ```
-fields{
-    name=STRING
-    age=INT
+schema {
+    fields {
+        name = string
+        age = int
     }
+  }
 ```
 
 ### database [string]
@@ -147,11 +148,12 @@ source {
         lower_bound = 1
         partition_num = 4
         split_column = "value"
-        fields {
-            label = STRING
-            value = INT
-            rt = STRING
-            time = BIGINT
+        schema {
+            fields {
+                label = STRING
+                value = INT
+                rt = STRING
+                time = BIGINT
             }
     }
 
@@ -166,11 +168,12 @@ source {
         url = "http://influxdb-host:8086"
         sql = "select label, value, rt, time from test"
         database = "test"
-        fields {
-            label = STRING
-            value = INT
-            rt = STRING
-            time = BIGINT
+        schema {
+            fields {
+                label = STRING
+                value = INT
+                rt = STRING
+                time = BIGINT
             }
     }
 
diff --git a/release-note.md b/release-note.md
index 8e237b609..3bb2e7018 100644
--- a/release-note.md
+++ b/release-note.md
@@ -19,6 +19,7 @@
 - [JDBC]Improve option check rule
 - [JDBC]Support SAP HANA. (#3017)
 - [MongoDB]Add source query capability #3697
+- [InfluxDB]Unifie InfluxDB source fields to schema #3897
 - [File]Fix file source connector option rule bug #3804
 - [File]Add lzo compression way
 - [Kafka]Fix Source failed to parse offset format #3810
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 7497577ad..37e578294 100644
--- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -71,7 +71,7 @@ public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, InfluxDBSou
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key());
+        CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key(), SeaTunnelSchema.SCHEMA.key());
         if (!result.isSuccess()) {
             throw new InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                 String.format("PluginName: %s, PluginType: %s, Message: %s",
@@ -82,8 +82,8 @@ public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, InfluxDBSou
         }
         try {
             this.sourceConfig = SourceConfig.loadConfig(config);
-            SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(config);
-            this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
+            Config schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
+            this.typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
             this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
         } catch (Exception e) {
             throw new InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
index 440d69bc3..5e3afc9c2 100644
--- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 
 import com.google.auto.service.AutoService;
 
@@ -50,7 +51,8 @@ public class InfluxDBSourceFactory implements TableSourceFactory {
             .required(
                 URL,
                 SQL,
-                DATABASES
+                DATABASES,
+                SeaTunnelSchema.SCHEMA
             )
             .bundled(USERNAME, PASSWORD)
             .bundled(LOWER_BOUND, UPPER_BOUND, PARTITION_NUM, SPLIT_COLUMN)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
index f95af29a2..59dc74384 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
@@ -29,17 +29,19 @@ source {
         lower_bound = 0
         partition_num = 4
         split_column = "c_int"
-        fields {
-            label = STRING
-            c_string = STRING
-            c_double = DOUBLE
-            c_bigint = BIGINT
-            c_float = FLOAT
-            c_int = INT
-            c_smallint = SMALLINT
-            c_boolean = BOOLEAN
-            time = BIGINT
+        schema {
+            fields {
+                label = STRING
+                c_string = STRING
+                c_double = DOUBLE
+                c_bigint = BIGINT
+                c_float = FLOAT
+                c_int = INT
+                c_smallint = SMALLINT
+                c_boolean = BOOLEAN
+                time = BIGINT
             }
+         }
     }
 }