You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/19 13:34:10 UTC
[incubator-streampipes-extensions] 02/02: [STREAMPIPES-317] Support
SSL connections for PostgreSql sink
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit c5511bb73fcfe31945b10477ccb9c46d395244be
Author: bossenti <bo...@posteo.de>
AuthorDate: Fri Mar 19 14:18:51 2021 +0100
[STREAMPIPES-317] Support SSL connections for PostgreSql sink
---
.../sinks/databases/jvm/iotdb/IotDb.java | 1 +
.../sinks/databases/jvm/jdbcclient/JdbcClient.java | 29 ++++++++++++++++++++--
.../sinks/databases/jvm/mysql/Mysql.java | 1 +
.../sinks/databases/jvm/postgresql/PostgreSql.java | 1 +
.../jvm/postgresql/PostgreSqlController.java | 15 ++++++++---
.../jvm/postgresql/PostgreSqlParameters.java | 8 +++++-
.../strings.en | 2 ++
7 files changed, 50 insertions(+), 7 deletions(-)
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
index 177270a..2189a5c 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
@@ -56,6 +56,7 @@ public class IotDb extends JdbcClient implements EventSink<IotDbParameters> {
".*",
"org.apache.iotdb.jdbc.IoTDBDriver",
"iotdb",
+ false,
LOG);
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index 71a357f..6545b4f 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -61,7 +61,7 @@ public class JdbcClient {
* If no matching type is found, it is interpreted as a String (VARCHAR(255))
*/
protected enum SqlAttribute {
- INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
+ INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("FLOAT"), STRING("VARCHAR(255)"),
BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
private final String sqlName;
@@ -181,6 +181,7 @@ public class JdbcClient {
String allowedRegEx,
String driver,
String urlName,
+ boolean useSSL,
Logger logger) throws SpRuntimeException {
this.tableName = tableName;
this.user = user;
@@ -194,7 +195,11 @@ public class JdbcClient {
throw new SpRuntimeException("Driver '" + driver + "' not found.");
}
- connect(host, port, urlName, databaseName);
+ if (useSSL) {
+ connectWithSSL(host, port, urlName, databaseName);
+ } else {
+ connect(host, port, urlName, databaseName);
+ }
}
@@ -216,6 +221,26 @@ public class JdbcClient {
}
}
+ /**
+ * WIP
+ * @param host
+ * @param port
+ * @param urlName
+ * @param databaseName
+ * @throws SpRuntimeException
+ */
+ private void connectWithSSL(String host, int port, String urlName, String databaseName) throws SpRuntimeException {
+ String url = "jdbc:" + urlName + "://" + host + ":" + port + "/" + databaseName + "?user=" + user + "&password=" + password + "&ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory&sslmode=require" ;
+ try{
+ c = DriverManager.getConnection(url);
+ ensureDatabaseExists(url, databaseName);
+ ensureTableExists(url, "");
+ } catch (SQLException e ) {
+ throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
+ }
+ }
+
+
/**
* If this method returns successfully a database with the given name exists on the server, specified by the url.
*
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 7009970..cb34f1a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -60,6 +60,7 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
".*",
"com.mysql.cj.jdbc.Driver",
"mysql",
+ false,
LOG);
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 68ae54d..668cdba 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -47,6 +47,7 @@ public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParame
"^[a-zA-Z_][a-zA-Z0-9_]*$",
"org.postgresql.Driver",
"postgresql",
+ parameters.isSSLEnabled(),
LOG);
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
index b7a5838..ae3d6a6 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
@@ -24,9 +24,7 @@ import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
@@ -39,6 +37,9 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
private static final String DATABASE_TABLE_KEY = "db_table";
private static final String DATABASE_USER_KEY = "db_user";
private static final String DATABASE_PASSWORD_KEY = "db_password";
+ private static final String SSL_MODE = "ssl_mode";
+ private static final String SSL_ENABLED = "ssl_enabled";
+ private static final String SSL_DISABLED = "ssl_disabled";
@Override
public DataSinkDescription declareModel() {
@@ -55,6 +56,10 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
.requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
.requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
.requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+ .requiredSingleValueSelection(Labels.withId(SSL_MODE),
+ Options.from(
+ new Tuple2<>("Yes", SSL_ENABLED),
+ new Tuple2<>("No", SSL_DISABLED)))
.build();
}
@@ -68,6 +73,7 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
String tableName = extractor.singleValueParameter(DATABASE_TABLE_KEY, String.class);
String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
String password = extractor.secretValue(DATABASE_PASSWORD_KEY);
+ String sslSelection = extractor.selectedSingleValueInternalName(SSL_MODE, String.class);
PostgreSqlParameters params = new PostgreSqlParameters(graph,
hostname,
@@ -75,7 +81,8 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
dbName,
tableName,
user,
- password);
+ password,
+ sslSelection.equals(SSL_ENABLED));
return new ConfiguredEventSink<>(params, PostgreSql::new);
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
index d45dfdb..618c595 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
@@ -29,8 +29,9 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
private String tableName;
private String user;
private String password;
+ private boolean sslEnabled;
- public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password) {
+ public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password, Boolean sslEnabled) {
super(graph);
this.PostgreSqlHost = PostgreSqlHost;
this.PostgreSqlPort = PostgreSqlPort;
@@ -38,6 +39,7 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
this.tableName = tableName;
this.user = user;
this.password = password;
+ this.sslEnabled = sslEnabled;
}
public String getPostgreSqlHost() {
@@ -63,4 +65,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
public String getPassword() {
return password;
}
+
+ public boolean isSSLEnabled() {
+ return sslEnabled;
+ }
}
diff --git a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
index 7e3e423..3b4125e 100644
--- a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
+++ b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
@@ -19,3 +19,5 @@ db_user.description=The username for the PostgreSQL Server
db_password.title=Password
db_password.description=The password for the PostgreSQL Server
+ssl_mode.title=SSL Mode
+ssl_mode.description=Should the connection be secured via SSL?