You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/01/27 02:11:14 UTC

[incubator-seatunnel] branch dev updated: [SeaTunnel#1159] Support auth in InfluxDB Flink plugin (#1160)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7ef09b0  [SeaTunnel#1159] Support auth in InfluxDB Flink plugin (#1160)
7ef09b0 is described below

commit 7ef09b0de02d0409b417b1d62cb5b11a6fb14274
Author: Benedict Jin <as...@apache.org>
AuthorDate: Thu Jan 27 10:11:07 2022 +0800

    [SeaTunnel#1159] Support auth in InfluxDB Flink plugin (#1160)
---
 .../flink/configuration/sink-plugins/InfluxDb.md   | 26 ++++++++++++++++++++++
 .../seatunnel/flink/sink/InfluxDbOutputFormat.java | 14 ++++++++++--
 .../apache/seatunnel/flink/sink/InfluxDbSink.java  |  8 ++++++-
 3 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/docs/en/flink/configuration/sink-plugins/InfluxDb.md b/docs/en/flink/configuration/sink-plugins/InfluxDb.md
index 5523343..86f8ef0 100644
--- a/docs/en/flink/configuration/sink-plugins/InfluxDb.md
+++ b/docs/en/flink/configuration/sink-plugins/InfluxDb.md
@@ -9,6 +9,8 @@ Write data to InfluxDB.
 | name        | type           | required | default value |
 | ----------- | -------------- | -------- | ------------- |
 | server_url  | `String`       | yes      | -             |
+| username    | `String`       | no       | -             |
+| password    | `String`       | no       | -             |
 | database    | `String`       | yes      | -             |
 | measurement | `String`       | yes      | -             |
 | tags        | `List<String>` | yes      | -             |
@@ -18,6 +20,14 @@ Write data to InfluxDB.
 
 The URL of InfluxDB Server.
 
+### username [`String`]
+
+The username of InfluxDB Server.
+
+### password [`String`]
+
+The password of InfluxDB Server.
+
 ### datasource [`String`]
 
 The DataSource name in InfluxDB.
@@ -36,9 +46,25 @@ The list of Field in InfluxDB.
 
 ## Example
 
+### Simple
+
+```hocon
+InfluxDbSink {
+  server_url = "http://127.0.0.1:8086/"
+  database = "influxdb"
+  measurement = "m"
+  tags = ["country", "city"]
+  fields = ["count"]
+}
+```
+
+### Auth
+
 ```hocon
 InfluxDbSink {
   server_url = "http://127.0.0.1:8086/"
+  username = "admin"
+  password = "password"
   database = "influxdb"
   measurement = "m"
   tags = ["country", "city"]
diff --git a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbOutputFormat.java b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbOutputFormat.java
index b8ec229..dddffe9 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbOutputFormat.java
@@ -36,8 +36,18 @@ public class InfluxDbOutputFormat extends RichOutputFormat<Row> {
     private final List<String> tags;
     private final List<String> fields;
 
-    public InfluxDbOutputFormat(String serverURL, String database, String measurement, List<String> tags, List<String> fields) {
-        this.influxDB = InfluxDBFactory.connect(serverURL);
+    public InfluxDbOutputFormat(String serverURL,
+                                String username,
+                                String password,
+                                String database,
+                                String measurement,
+                                List<String> tags,
+                                List<String> fields) {
+        if (username == null || password == null) {
+            this.influxDB = InfluxDBFactory.connect(serverURL);
+        } else {
+            this.influxDB = InfluxDBFactory.connect(serverURL, username, password);
+        }
         this.influxDB.query(new Query("CREATE DATABASE " + database));
         this.influxDB.setDatabase(database);
         this.influxDB.enableBatch(
diff --git a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
index 588cb2e..67a820f 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
@@ -33,6 +33,8 @@ import java.util.List;
 public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
 
     private static final String SERVER_URL = "server_url";
+    private static final String USER_NAME = "username";
+    private static final String PASSWORD = "password";
     private static final String DATABASE = "database";
     private static final String MEASUREMENT = "measurement";
     private static final String TAGS = "tags";
@@ -40,6 +42,8 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
 
     private Config config;
     private String serverURL;
+    private String username;
+    private String password;
     private String database;
     private String measurement;
     private List<String> tags;
@@ -47,7 +51,7 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
 
     @Override
     public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
-        return dataSet.output(new InfluxDbOutputFormat(serverURL, database, measurement, tags, fields));
+        return dataSet.output(new InfluxDbOutputFormat(serverURL, username, password, database, measurement, tags, fields));
     }
 
     @Override
@@ -68,6 +72,8 @@ public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
     @Override
     public void prepare(FlinkEnvironment env) {
         this.serverURL = config.getString(SERVER_URL);
+        this.username = config.hasPath(USER_NAME) ? config.getString(USER_NAME) : null;
+        this.password = config.hasPath(PASSWORD) ? config.getString(PASSWORD) : null;
         this.database = config.getString(DATABASE);
         this.measurement = config.getString(MEASUREMENT);
         this.tags = config.getStringList(TAGS);