You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/07/17 14:57:00 UTC

[bahir-flink] branch master updated: add option to use token authentication for influxdb2

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e2dd7c2  add option to use token authentication for influxdb2
e2dd7c2 is described below

commit e2dd7c25cd87d7fae8c6eb4ce06e373f181481d0
Author: dave <dq...@gmail.com>
AuthorDate: Thu Oct 7 09:12:23 2021 -0400

    add option to use token authentication for influxdb2
---
 .../influxdb/sink/InfluxDBSinkBuilder.java         | 30 ++++++++++++++++--
 .../influxdb/sink/InfluxDBSinkOptions.java         | 25 +++++++++++----
 .../influxdb/sink/InfluxDBSinkBuilderTest.java     | 37 +++++++++++++++++-----
 .../influxdb/util/InfluxDBContainer.java           |  1 +
 4 files changed, 76 insertions(+), 17 deletions(-)

diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
index 70e3bbe..6b9fb5e 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.connectors.influxdb.sink;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_TOKEN;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import org.apache.flink.configuration.Configuration;
@@ -59,6 +61,7 @@ public final class InfluxDBSinkBuilder<IN> {
     private String influxDBUrl;
     private String influxDBUsername;
     private String influxDBPassword;
+    private String influxDBToken;
     private String bucketName;
     private String organizationName;
     private final Configuration configuration;
@@ -67,6 +70,7 @@ public final class InfluxDBSinkBuilder<IN> {
         this.influxDBUrl = null;
         this.influxDBUsername = null;
         this.influxDBPassword = null;
+        this.influxDBToken = null;
         this.bucketName = null;
         this.organizationName = null;
         this.influxDBSchemaSerializer = null;
@@ -109,6 +113,18 @@ public final class InfluxDBSinkBuilder<IN> {
         return this;
     }
 
+    /**
+     * Sets the InfluxDB token.
+     *
+     * @param influxDBToken the token of the InfluxDB instance.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> setInfluxDBToken(final String influxDBToken) {
+        this.influxDBToken = influxDBToken;
+        this.configuration.setString(INFLUXDB_TOKEN, checkNotNull(influxDBToken));
+        return this;
+    }
+
     /**
      * Sets the InfluxDB bucket name.
      *
@@ -190,8 +206,18 @@ public final class InfluxDBSinkBuilder<IN> {
     private void sanityCheck() {
         // Check required settings.
         checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
-        checkNotNull(this.influxDBUsername, "The InfluxDB username is required but not provided.");
-        checkNotNull(this.influxDBPassword, "The InfluxDB password is required but not provided.");
+        // check that either username/password or token is provided for authentication
+        checkArgument(
+                this.influxDBToken != null
+                        || (this.influxDBUsername != null && this.influxDBPassword != null),
+                "Either the InfluxDB username and password or InfluxDB token are required but neither provided"
+        );
+        // check that both username/password and token are not both provided for authentication
+        checkArgument(
+                ! (this.influxDBToken != null
+                        && (this.influxDBUsername != null || this.influxDBPassword != null)),
+                "Either the InfluxDB username and password or InfluxDB token are required but both provided"
+        );
         checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
         checkNotNull(this.organizationName, "The Organization name is required but not provided.");
         checkNotNull(
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
index 97ff44e..90a23f5 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
@@ -60,6 +60,12 @@ public final class InfluxDBSinkOptions {
                     .noDefaultValue()
                     .withDescription("InfluxDB password.");
 
+    public static final ConfigOption<String> INFLUXDB_TOKEN =
+            ConfigOptions.key("sink.influxDB.client.token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB access token.");
+
     public static final ConfigOption<String> INFLUXDB_BUCKET =
             ConfigOptions.key("sink.influxDB.client.bucket")
                     .stringType()
@@ -76,15 +82,20 @@ public final class InfluxDBSinkOptions {
         final String url = configuration.getString(INFLUXDB_URL);
         final String username = configuration.getString(INFLUXDB_USERNAME);
         final String password = configuration.getString(INFLUXDB_PASSWORD);
+        final String token = configuration.getString(INFLUXDB_TOKEN);
         final String bucket = configuration.getString(INFLUXDB_BUCKET);
         final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
-        final InfluxDBClientOptions influxDBClientOptions =
-                InfluxDBClientOptions.builder()
-                        .url(url)
-                        .authenticate(username, password.toCharArray())
-                        .bucket(bucket)
-                        .org(organization)
-                        .build();
+        InfluxDBClientOptions.Builder builder = InfluxDBClientOptions.builder();
+        builder = builder
+                .url(url)
+                .bucket(bucket)
+                .org(organization);
+        if (token != null) {
+            builder = builder.authenticateToken(token.toCharArray());
+        } else if (username != null && password != null) {
+            builder = builder.authenticate(username, password.toCharArray());
+        }
+        final InfluxDBClientOptions influxDBClientOptions = builder.build();
         return InfluxDBClientFactory.create(influxDBClientOptions);
     }
 }
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
index 1d47bb5..f8dc31c 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
@@ -42,10 +42,10 @@ class InfluxDBSinkBuilderTest {
     }
 
     @Test
-    void shouldNotBuildSinkWhenUsernameIsNotProvided() {
-        final NullPointerException exception =
+    void shouldNotBuildSinkWhenTokenNotProvidedAndUsernameIsNotProvided() {
+        final IllegalArgumentException exception =
                 assertThrows(
-                        NullPointerException.class,
+                        IllegalArgumentException.class,
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
@@ -54,14 +54,15 @@ class InfluxDBSinkBuilderTest {
                                         .setInfluxDBOrganization(InfluxDBContainer.organization)
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
-        assertEquals(exception.getMessage(), "The InfluxDB username is required but not provided.");
+        assertEquals(exception.getMessage(),
+                "Either the InfluxDB username and password or InfluxDB token are required but neither provided");
     }
 
     @Test
-    void shouldNotBuildSinkWhenPasswordIsNotProvided() {
-        final NullPointerException exception =
+    void shouldNotBuildSinkWhenTokenNotProvidedAndPasswordIsNotProvided() {
+        final IllegalArgumentException exception =
                 assertThrows(
-                        NullPointerException.class,
+                        IllegalArgumentException.class,
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
@@ -70,7 +71,27 @@ class InfluxDBSinkBuilderTest {
                                         .setInfluxDBOrganization(InfluxDBContainer.organization)
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
-        assertEquals(exception.getMessage(), "The InfluxDB password is required but not provided.");
+        assertEquals(exception.getMessage(),
+                "Either the InfluxDB username and password or InfluxDB token are required but neither provided");
+    }
+
+    @Test
+    void shouldNotBuildSinkWhenTokenProvidedAndUsernamePasswordIsProvided() {
+        final IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () ->
+                                InfluxDBSink.builder()
+                                        .setInfluxDBUrl("http://localhost:8086")
+                                        .setInfluxDBToken(InfluxDBContainer.token)
+                                        .setInfluxDBUsername(InfluxDBContainer.username)
+                                        .setInfluxDBPassword(InfluxDBContainer.password)
+                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+                                        .build());
+        assertEquals(exception.getMessage(),
+                "Either the InfluxDB username and password or InfluxDB token are required but both provided");
     }
 
     @Test
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
index 0f7347a..b8a0d69 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
@@ -45,6 +45,7 @@ public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
 
     public static final String username = "test-user";
     public static final String password = "test-password";
+    public static final String token = "access-token";
     public static final String bucket = "test-bucket";
     public static final String organization = "test-org";
     private static final int retention = 0;