You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/17 07:04:20 UTC
[incubator-seatunnel] branch dev updated: [Improve][SourceConnector] Unifie InfluxDB source fields to schema (#3897)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 85a984a64 [Improve][SourceConnector] Unifie InfluxDB source fields to schema (#3897)
85a984a64 is described below
commit 85a984a64fc1d529098b6c356207869b4edcbd25
Author: wfrong <10...@qq.com>
AuthorDate: Tue Jan 17 15:04:15 2023 +0800
[Improve][SourceConnector] Unifie InfluxDB source fields to schema (#3897)
* Unifie InfluxDB source fields to schema
---
docs/en/connector-v2/source/InfluxDB.md | 39 ++++++++++++----------
release-note.md | 1 +
.../seatunnel/influxdb/source/InfluxDBSource.java | 6 ++--
.../influxdb/source/InfluxDBSourceFactory.java | 4 ++-
.../src/test/resources/influxdb-to-influxdb.conf | 22 ++++++------
5 files changed, 40 insertions(+), 32 deletions(-)
diff --git a/docs/en/connector-v2/source/InfluxDB.md b/docs/en/connector-v2/source/InfluxDB.md
index 67c9c362a..6b737870b 100644
--- a/docs/en/connector-v2/source/InfluxDB.md
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -25,7 +25,7 @@ supports query SQL and can achieve projection effect.
|--------------------|--------|----------|---------------|
| url | string | yes | - |
| sql | string | yes | - |
-| fields | config | yes | - |
+| schema | config | yes | - |
| database | string | yes | |
| username | string | no | - |
| password | string | no | - |
@@ -51,19 +51,20 @@ The query sql used to search data
select name,age from test
```
-### fields [string]
+### schema [config]
-the fields of the InfluxDB when you select
-
-the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+#### fields [Config]
+The schema information of upstream data.
e.g.
```
-fields{
- name=STRING
- age=INT
+schema {
+ fields {
+ name = string
+ age = int
}
+ }
```
### database [string]
@@ -147,11 +148,12 @@ source {
lower_bound = 1
partition_num = 4
split_column = "value"
- fields {
- label = STRING
- value = INT
- rt = STRING
- time = BIGINT
+ schema {
+ fields {
+ label = STRING
+ value = INT
+ rt = STRING
+ time = BIGINT
}
}
@@ -166,11 +168,12 @@ source {
url = "http://influxdb-host:8086"
sql = "select label, value, rt, time from test"
database = "test"
- fields {
- label = STRING
- value = INT
- rt = STRING
- time = BIGINT
+ schema {
+ fields {
+ label = STRING
+ value = INT
+ rt = STRING
+ time = BIGINT
}
}
diff --git a/release-note.md b/release-note.md
index 8e237b609..3bb2e7018 100644
--- a/release-note.md
+++ b/release-note.md
@@ -19,6 +19,7 @@
- [JDBC]Improve option check rule
- [JDBC]Support SAP HANA. (#3017)
- [MongoDB]Add source query capability #3697
+- [InfluxDB]Unifie InfluxDB source fields to schema #3897
- [File]Fix file source connector option rule bug #3804
- [File]Add lzo compression way
- [Kafka]Fix Source failed to parse offset format #3810
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 7497577ad..37e578294 100644
--- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -71,7 +71,7 @@ public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, InfluxDBSou
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key());
+ CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key(), SeaTunnelSchema.SCHEMA.key());
if (!result.isSuccess()) {
throw new InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
@@ -82,8 +82,8 @@ public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, InfluxDBSou
}
try {
this.sourceConfig = SourceConfig.loadConfig(config);
- SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(config);
- this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
+ Config schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
+ this.typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
} catch (Exception e) {
throw new InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
index 440d69bc3..5e3afc9c2 100644
--- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import com.google.auto.service.AutoService;
@@ -50,7 +51,8 @@ public class InfluxDBSourceFactory implements TableSourceFactory {
.required(
URL,
SQL,
- DATABASES
+ DATABASES,
+ SeaTunnelSchema.SCHEMA
)
.bundled(USERNAME, PASSWORD)
.bundled(LOWER_BOUND, UPPER_BOUND, PARTITION_NUM, SPLIT_COLUMN)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
index f95af29a2..59dc74384 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
@@ -29,17 +29,19 @@ source {
lower_bound = 0
partition_num = 4
split_column = "c_int"
- fields {
- label = STRING
- c_string = STRING
- c_double = DOUBLE
- c_bigint = BIGINT
- c_float = FLOAT
- c_int = INT
- c_smallint = SMALLINT
- c_boolean = BOOLEAN
- time = BIGINT
+ schema {
+ fields {
+ label = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
}
+ }
}
}