You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/07/17 14:58:04 UTC

[bahir-flink] branch master updated (e2dd7c2 -> d7d3e5d)

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


 discard e2dd7c2  add option to use token authentication for influxdb2
     new d7d3e5d  [BAHIR-284] Add option to use token authentication for influxdb2

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e2dd7c2)
            \
             N -- N -- N   refs/heads/master (d7d3e5d)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[bahir-flink] 01/01: [BAHIR-284] Add option to use token authentication for influxdb2

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit d7d3e5d663299521276beead871d90730cb08270
Author: dave <dq...@gmail.com>
AuthorDate: Thu Oct 7 09:12:23 2021 -0400

    [BAHIR-284] Add option to use token authentication for influxdb2
---
 .../influxdb/sink/InfluxDBSinkBuilder.java         | 30 ++++++++++++++++--
 .../influxdb/sink/InfluxDBSinkOptions.java         | 25 +++++++++++----
 .../influxdb/sink/InfluxDBSinkBuilderTest.java     | 37 +++++++++++++++++-----
 .../influxdb/util/InfluxDBContainer.java           |  1 +
 4 files changed, 76 insertions(+), 17 deletions(-)

diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
index 70e3bbe..6b9fb5e 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.connectors.influxdb.sink;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_TOKEN;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
 import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import org.apache.flink.configuration.Configuration;
@@ -59,6 +61,7 @@ public final class InfluxDBSinkBuilder<IN> {
     private String influxDBUrl;
     private String influxDBUsername;
     private String influxDBPassword;
+    private String influxDBToken;
     private String bucketName;
     private String organizationName;
     private final Configuration configuration;
@@ -67,6 +70,7 @@ public final class InfluxDBSinkBuilder<IN> {
         this.influxDBUrl = null;
         this.influxDBUsername = null;
         this.influxDBPassword = null;
+        this.influxDBToken = null;
         this.bucketName = null;
         this.organizationName = null;
         this.influxDBSchemaSerializer = null;
@@ -109,6 +113,18 @@ public final class InfluxDBSinkBuilder<IN> {
         return this;
     }
 
+    /**
+     * Sets the InfluxDB token.
+     *
+     * @param influxDBToken the token of the InfluxDB instance.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> setInfluxDBToken(final String influxDBToken) {
+        this.influxDBToken = influxDBToken;
+        this.configuration.setString(INFLUXDB_TOKEN, checkNotNull(influxDBToken));
+        return this;
+    }
+
     /**
      * Sets the InfluxDB bucket name.
      *
@@ -190,8 +206,18 @@ public final class InfluxDBSinkBuilder<IN> {
     private void sanityCheck() {
         // Check required settings.
         checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
-        checkNotNull(this.influxDBUsername, "The InfluxDB username is required but not provided.");
-        checkNotNull(this.influxDBPassword, "The InfluxDB password is required but not provided.");
+        // check that either username/password or token is provided for authentication
+        checkArgument(
+                this.influxDBToken != null
+                        || (this.influxDBUsername != null && this.influxDBPassword != null),
+                "Either the InfluxDB username and password or InfluxDB token are required but neither provided"
+        );
+        // check that both username/password and token are not both provided for authentication
+        checkArgument(
+                ! (this.influxDBToken != null
+                        && (this.influxDBUsername != null || this.influxDBPassword != null)),
+                "Either the InfluxDB username and password or InfluxDB token are required but both provided"
+        );
         checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
         checkNotNull(this.organizationName, "The Organization name is required but not provided.");
         checkNotNull(
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
index 97ff44e..90a23f5 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
@@ -60,6 +60,12 @@ public final class InfluxDBSinkOptions {
                     .noDefaultValue()
                     .withDescription("InfluxDB password.");
 
+    public static final ConfigOption<String> INFLUXDB_TOKEN =
+            ConfigOptions.key("sink.influxDB.client.token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB access token.");
+
     public static final ConfigOption<String> INFLUXDB_BUCKET =
             ConfigOptions.key("sink.influxDB.client.bucket")
                     .stringType()
@@ -76,15 +82,20 @@ public final class InfluxDBSinkOptions {
         final String url = configuration.getString(INFLUXDB_URL);
         final String username = configuration.getString(INFLUXDB_USERNAME);
         final String password = configuration.getString(INFLUXDB_PASSWORD);
+        final String token = configuration.getString(INFLUXDB_TOKEN);
         final String bucket = configuration.getString(INFLUXDB_BUCKET);
         final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
-        final InfluxDBClientOptions influxDBClientOptions =
-                InfluxDBClientOptions.builder()
-                        .url(url)
-                        .authenticate(username, password.toCharArray())
-                        .bucket(bucket)
-                        .org(organization)
-                        .build();
+        InfluxDBClientOptions.Builder builder = InfluxDBClientOptions.builder();
+        builder = builder
+                .url(url)
+                .bucket(bucket)
+                .org(organization);
+        if (token != null) {
+            builder = builder.authenticateToken(token.toCharArray());
+        } else if (username != null && password != null) {
+            builder = builder.authenticate(username, password.toCharArray());
+        }
+        final InfluxDBClientOptions influxDBClientOptions = builder.build();
         return InfluxDBClientFactory.create(influxDBClientOptions);
     }
 }
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
index 1d47bb5..f8dc31c 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
@@ -42,10 +42,10 @@ class InfluxDBSinkBuilderTest {
     }
 
     @Test
-    void shouldNotBuildSinkWhenUsernameIsNotProvided() {
-        final NullPointerException exception =
+    void shouldNotBuildSinkWhenTokenNotProvidedAndUsernameIsNotProvided() {
+        final IllegalArgumentException exception =
                 assertThrows(
-                        NullPointerException.class,
+                        IllegalArgumentException.class,
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
@@ -54,14 +54,15 @@ class InfluxDBSinkBuilderTest {
                                         .setInfluxDBOrganization(InfluxDBContainer.organization)
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
-        assertEquals(exception.getMessage(), "The InfluxDB username is required but not provided.");
+        assertEquals(exception.getMessage(),
+                "Either the InfluxDB username and password or InfluxDB token are required but neither provided");
     }
 
     @Test
-    void shouldNotBuildSinkWhenPasswordIsNotProvided() {
-        final NullPointerException exception =
+    void shouldNotBuildSinkWhenTokenNotProvidedAndPasswordIsNotProvided() {
+        final IllegalArgumentException exception =
                 assertThrows(
-                        NullPointerException.class,
+                        IllegalArgumentException.class,
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
@@ -70,7 +71,27 @@ class InfluxDBSinkBuilderTest {
                                         .setInfluxDBOrganization(InfluxDBContainer.organization)
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
-        assertEquals(exception.getMessage(), "The InfluxDB password is required but not provided.");
+        assertEquals(exception.getMessage(),
+                "Either the InfluxDB username and password or InfluxDB token are required but neither provided");
+    }
+
+    @Test
+    void shouldNotBuildSinkWhenTokenProvidedAndUsernamePasswordIsProvided() {
+        final IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () ->
+                                InfluxDBSink.builder()
+                                        .setInfluxDBUrl("http://localhost:8086")
+                                        .setInfluxDBToken(InfluxDBContainer.token)
+                                        .setInfluxDBUsername(InfluxDBContainer.username)
+                                        .setInfluxDBPassword(InfluxDBContainer.password)
+                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+                                        .build());
+        assertEquals(exception.getMessage(),
+                "Either the InfluxDB username and password or InfluxDB token are required but both provided");
     }
 
     @Test
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
index 0f7347a..b8a0d69 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
@@ -45,6 +45,7 @@ public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
 
     public static final String username = "test-user";
     public static final String password = "test-password";
+    public static final String token = "access-token";
     public static final String bucket = "test-bucket";
     public static final String organization = "test-org";
     private static final int retention = 0;