You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/07/17 14:57:00 UTC
[bahir-flink] branch master updated: add option to use token authentication for influxdb2
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new e2dd7c2 add option to use token authentication for influxdb2
e2dd7c2 is described below
commit e2dd7c25cd87d7fae8c6eb4ce06e373f181481d0
Author: dave <dq...@gmail.com>
AuthorDate: Thu Oct 7 09:12:23 2021 -0400
add option to use token authentication for influxdb2
---
.../influxdb/sink/InfluxDBSinkBuilder.java | 30 ++++++++++++++++--
.../influxdb/sink/InfluxDBSinkOptions.java | 25 +++++++++++----
.../influxdb/sink/InfluxDBSinkBuilderTest.java | 37 +++++++++++++++++-----
.../influxdb/util/InfluxDBContainer.java | 1 +
4 files changed, 76 insertions(+), 17 deletions(-)
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
index 70e3bbe..6b9fb5e 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.connectors.influxdb.sink;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_TOKEN;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.configuration.Configuration;
@@ -59,6 +61,7 @@ public final class InfluxDBSinkBuilder<IN> {
private String influxDBUrl;
private String influxDBUsername;
private String influxDBPassword;
+ private String influxDBToken;
private String bucketName;
private String organizationName;
private final Configuration configuration;
@@ -67,6 +70,7 @@ public final class InfluxDBSinkBuilder<IN> {
this.influxDBUrl = null;
this.influxDBUsername = null;
this.influxDBPassword = null;
+ this.influxDBToken = null;
this.bucketName = null;
this.organizationName = null;
this.influxDBSchemaSerializer = null;
@@ -109,6 +113,18 @@ public final class InfluxDBSinkBuilder<IN> {
return this;
}
+ /**
+ * Sets the InfluxDB token.
+ *
+ * @param influxDBToken the token of the InfluxDB instance.
+ * @return this InfluxDBSinkBuilder.
+ */
+ public InfluxDBSinkBuilder<IN> setInfluxDBToken(final String influxDBToken) {
+ this.influxDBToken = influxDBToken;
+ this.configuration.setString(INFLUXDB_TOKEN, checkNotNull(influxDBToken));
+ return this;
+ }
+
/**
* Sets the InfluxDB bucket name.
*
@@ -190,8 +206,18 @@ public final class InfluxDBSinkBuilder<IN> {
private void sanityCheck() {
// Check required settings.
checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
- checkNotNull(this.influxDBUsername, "The InfluxDB username is required but not provided.");
- checkNotNull(this.influxDBPassword, "The InfluxDB password is required but not provided.");
+ // check that either username/password or token is provided for authentication
+ checkArgument(
+ this.influxDBToken != null
+ || (this.influxDBUsername != null && this.influxDBPassword != null),
+ "Either the InfluxDB username and password or InfluxDB token are required but neither provided"
+ );
+ // check that both username/password and token are not both provided for authentication
+ checkArgument(
+ ! (this.influxDBToken != null
+ && (this.influxDBUsername != null || this.influxDBPassword != null)),
+ "Either the InfluxDB username and password or InfluxDB token are required but both provided"
+ );
checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
checkNotNull(this.organizationName, "The Organization name is required but not provided.");
checkNotNull(
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
index 97ff44e..90a23f5 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
@@ -60,6 +60,12 @@ public final class InfluxDBSinkOptions {
.noDefaultValue()
.withDescription("InfluxDB password.");
+ public static final ConfigOption<String> INFLUXDB_TOKEN =
+ ConfigOptions.key("sink.influxDB.client.token")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("InfluxDB access token.");
+
public static final ConfigOption<String> INFLUXDB_BUCKET =
ConfigOptions.key("sink.influxDB.client.bucket")
.stringType()
@@ -76,15 +82,20 @@ public final class InfluxDBSinkOptions {
final String url = configuration.getString(INFLUXDB_URL);
final String username = configuration.getString(INFLUXDB_USERNAME);
final String password = configuration.getString(INFLUXDB_PASSWORD);
+ final String token = configuration.getString(INFLUXDB_TOKEN);
final String bucket = configuration.getString(INFLUXDB_BUCKET);
final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
- final InfluxDBClientOptions influxDBClientOptions =
- InfluxDBClientOptions.builder()
- .url(url)
- .authenticate(username, password.toCharArray())
- .bucket(bucket)
- .org(organization)
- .build();
+ InfluxDBClientOptions.Builder builder = InfluxDBClientOptions.builder();
+ builder = builder
+ .url(url)
+ .bucket(bucket)
+ .org(organization);
+ if (token != null) {
+ builder = builder.authenticateToken(token.toCharArray());
+ } else if (username != null && password != null) {
+ builder = builder.authenticate(username, password.toCharArray());
+ }
+ final InfluxDBClientOptions influxDBClientOptions = builder.build();
return InfluxDBClientFactory.create(influxDBClientOptions);
}
}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
index 1d47bb5..f8dc31c 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
@@ -42,10 +42,10 @@ class InfluxDBSinkBuilderTest {
}
@Test
- void shouldNotBuildSinkWhenUsernameIsNotProvided() {
- final NullPointerException exception =
+ void shouldNotBuildSinkWhenTokenNotProvidedAndUsernameIsNotProvided() {
+ final IllegalArgumentException exception =
assertThrows(
- NullPointerException.class,
+ IllegalArgumentException.class,
() ->
InfluxDBSink.builder()
.setInfluxDBUrl("http://localhost:8086")
@@ -54,14 +54,15 @@ class InfluxDBSinkBuilderTest {
.setInfluxDBOrganization(InfluxDBContainer.organization)
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.build());
- assertEquals(exception.getMessage(), "The InfluxDB username is required but not provided.");
+ assertEquals(exception.getMessage(),
+ "Either the InfluxDB username and password or InfluxDB token are required but neither provided");
}
@Test
- void shouldNotBuildSinkWhenPasswordIsNotProvided() {
- final NullPointerException exception =
+ void shouldNotBuildSinkWhenTokenNotProvidedAndPasswordIsNotProvided() {
+ final IllegalArgumentException exception =
assertThrows(
- NullPointerException.class,
+ IllegalArgumentException.class,
() ->
InfluxDBSink.builder()
.setInfluxDBUrl("http://localhost:8086")
@@ -70,7 +71,27 @@ class InfluxDBSinkBuilderTest {
.setInfluxDBOrganization(InfluxDBContainer.organization)
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.build());
- assertEquals(exception.getMessage(), "The InfluxDB password is required but not provided.");
+ assertEquals(exception.getMessage(),
+ "Either the InfluxDB username and password or InfluxDB token are required but neither provided");
+ }
+
+ @Test
+ void shouldNotBuildSinkWhenTokenProvidedAndUsernamePasswordIsProvided() {
+ final IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ InfluxDBSink.builder()
+ .setInfluxDBUrl("http://localhost:8086")
+ .setInfluxDBToken(InfluxDBContainer.token)
+ .setInfluxDBUsername(InfluxDBContainer.username)
+ .setInfluxDBPassword(InfluxDBContainer.password)
+ .setInfluxDBBucket(InfluxDBContainer.bucket)
+ .setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+ .build());
+ assertEquals(exception.getMessage(),
+ "Either the InfluxDB username and password or InfluxDB token are required but both provided");
}
@Test
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
index 0f7347a..b8a0d69 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
@@ -45,6 +45,7 @@ public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
public static final String username = "test-user";
public static final String password = "test-password";
+ public static final String token = "access-token";
public static final String bucket = "test-bucket";
public static final String organization = "test-org";
private static final int retention = 0;