You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/01/27 02:11:14 UTC
[incubator-seatunnel] branch dev updated: [SeaTunnel#1159] Support auth in InfluxDB Flink plugin (#1160)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7ef09b0 [SeaTunnel#1159] Support auth in InfluxDB Flink plugin (#1160)
7ef09b0 is described below
commit 7ef09b0de02d0409b417b1d62cb5b11a6fb14274
Author: Benedict Jin <as...@apache.org>
AuthorDate: Thu Jan 27 10:11:07 2022 +0800
[SeaTunnel#1159] Support auth in InfluxDB Flink plugin (#1160)
---
.../flink/configuration/sink-plugins/InfluxDb.md | 26 ++++++++++++++++++++++
.../seatunnel/flink/sink/InfluxDbOutputFormat.java | 14 ++++++++++--
.../apache/seatunnel/flink/sink/InfluxDbSink.java | 8 ++++++-
3 files changed, 45 insertions(+), 3 deletions(-)
diff --git a/docs/en/flink/configuration/sink-plugins/InfluxDb.md b/docs/en/flink/configuration/sink-plugins/InfluxDb.md
index 5523343..86f8ef0 100644
--- a/docs/en/flink/configuration/sink-plugins/InfluxDb.md
+++ b/docs/en/flink/configuration/sink-plugins/InfluxDb.md
@@ -9,6 +9,8 @@ Write data to InfluxDB.
| name | type | required | default value |
| ----------- | -------------- | -------- | ------------- |
| server_url | `String` | yes | - |
+| username | `String` | no | - |
+| password | `String` | no | - |
| database | `String` | yes | - |
| measurement | `String` | yes | - |
| tags | `List<String>` | yes | - |
@@ -18,6 +20,14 @@ Write data to InfluxDB.
The URL of InfluxDB Server.
+### username [`String`]
+
+The username of InfluxDB Server.
+
+### password [`String`]
+
+The password of InfluxDB Server.
+
### datasource [`String`]
The DataSource name in InfluxDB.
@@ -36,9 +46,25 @@ The list of Field in InfluxDB.
## Example
+### Simple
+
+```hocon
+InfluxDbSink {
+ server_url = "http://127.0.0.1:8086/"
+ database = "influxdb"
+ measurement = "m"
+ tags = ["country", "city"]
+ fields = ["count"]
+}
+```
+
+### Auth
+
```hocon
InfluxDbSink {
server_url = "http://127.0.0.1:8086/"
+ username = "admin"
+ password = "password"
database = "influxdb"
measurement = "m"
tags = ["country", "city"]
diff --git a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbOutputFormat.java b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbOutputFormat.java
index b8ec229..dddffe9 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbOutputFormat.java
@@ -36,8 +36,18 @@ public class InfluxDbOutputFormat extends RichOutputFormat<Row> {
private final List<String> tags;
private final List<String> fields;
- public InfluxDbOutputFormat(String serverURL, String database, String measurement, List<String> tags, List<String> fields) {
- this.influxDB = InfluxDBFactory.connect(serverURL);
+ public InfluxDbOutputFormat(String serverURL,
+ String username,
+ String password,
+ String database,
+ String measurement,
+ List<String> tags,
+ List<String> fields) {
+ if (username == null || password == null) {
+ this.influxDB = InfluxDBFactory.connect(serverURL);
+ } else {
+ this.influxDB = InfluxDBFactory.connect(serverURL, username, password);
+ }
this.influxDB.query(new Query("CREATE DATABASE " + database));
this.influxDB.setDatabase(database);
this.influxDB.enableBatch(
diff --git a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
index 588cb2e..67a820f 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
@@ -33,6 +33,8 @@ import java.util.List;
public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
private static final String SERVER_URL = "server_url";
+ private static final String USER_NAME = "username";
+ private static final String PASSWORD = "password";
private static final String DATABASE = "database";
private static final String MEASUREMENT = "measurement";
private static final String TAGS = "tags";
@@ -40,6 +42,8 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
private Config config;
private String serverURL;
+ private String username;
+ private String password;
private String database;
private String measurement;
private List<String> tags;
@@ -47,7 +51,7 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
@Override
public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
- return dataSet.output(new InfluxDbOutputFormat(serverURL, database, measurement, tags, fields));
+ return dataSet.output(new InfluxDbOutputFormat(serverURL, username, password, database, measurement, tags, fields));
}
@Override
@@ -68,6 +72,8 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
@Override
public void prepare(FlinkEnvironment env) {
this.serverURL = config.getString(SERVER_URL);
+ this.username = config.hasPath(USER_NAME) ? config.getString(USER_NAME) : null;
+ this.password = config.hasPath(PASSWORD) ? config.getString(PASSWORD) : null;
this.database = config.getString(DATABASE);
this.measurement = config.getString(MEASUREMENT);
this.tags = config.getStringList(TAGS);