You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/19 13:34:10 UTC

[incubator-streampipes-extensions] 02/02: [STREAMPIPES-317] Support SSL connections for PostgreSql sink

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit c5511bb73fcfe31945b10477ccb9c46d395244be
Author: bossenti <bo...@posteo.de>
AuthorDate: Fri Mar 19 14:18:51 2021 +0100

    [STREAMPIPES-317] Support SSL connections for PostgreSql sink
---
 .../sinks/databases/jvm/iotdb/IotDb.java           |  1 +
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java | 29 ++++++++++++++++++++--
 .../sinks/databases/jvm/mysql/Mysql.java           |  1 +
 .../sinks/databases/jvm/postgresql/PostgreSql.java |  1 +
 .../jvm/postgresql/PostgreSqlController.java       | 15 ++++++++---
 .../jvm/postgresql/PostgreSqlParameters.java       |  8 +++++-
 .../strings.en                                     |  2 ++
 7 files changed, 50 insertions(+), 7 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
index 177270a..2189a5c 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
@@ -56,6 +56,7 @@ public class IotDb extends JdbcClient implements EventSink<IotDbParameters> {
             ".*",
             "org.apache.iotdb.jdbc.IoTDBDriver",
             "iotdb",
+            false,
             LOG);
 
   }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index 71a357f..6545b4f 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -61,7 +61,7 @@ public class JdbcClient {
      * If no matching type is found, it is interpreted as a String (VARCHAR(255))
      */
     protected enum SqlAttribute {
-        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
+        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("FLOAT"), STRING("VARCHAR(255)"),
         BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
         private final String sqlName;
 
@@ -181,6 +181,7 @@ public class JdbcClient {
                                   String allowedRegEx,
                                   String driver,
                                   String urlName,
+                                  boolean useSSL,
                                   Logger logger) throws SpRuntimeException {
         this.tableName = tableName;
         this.user = user;
@@ -194,7 +195,11 @@ public class JdbcClient {
             throw new SpRuntimeException("Driver '" + driver + "' not found.");
         }
 
-        connect(host, port, urlName, databaseName);
+        if (useSSL) {
+            connectWithSSL(host, port, urlName, databaseName);
+        } else {
+            connect(host, port, urlName, databaseName);
+        }
     }
 
 
@@ -216,6 +221,26 @@ public class JdbcClient {
         }
     }
 
+    /**
+     * WIP
+     * @param host
+     * @param port
+     * @param urlName
+     * @param databaseName
+     * @throws SpRuntimeException
+     */
+    private void connectWithSSL(String host, int port, String urlName, String databaseName) throws SpRuntimeException {
+        String url = "jdbc:" + urlName + "://" + host + ":" + port + "/" + databaseName + "?user=" + user + "&password=" + password + "&ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory&sslmode=require" ;
+        try{
+            c = DriverManager.getConnection(url);
+            ensureDatabaseExists(url, databaseName);
+            ensureTableExists(url, "");
+        } catch (SQLException e ) {
+            throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
+        }
+    }
+
+
 	/**
 	 * If this method returns successfully a database with the given name exists on the server, specified by the url.
 	 *
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 7009970..cb34f1a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -60,6 +60,7 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
                 ".*",
                 "com.mysql.cj.jdbc.Driver",
                 "mysql",
+                false,
                 LOG);
     }
 
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 68ae54d..668cdba 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -47,6 +47,7 @@ public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParame
             "^[a-zA-Z_][a-zA-Z0-9_]*$",
             "org.postgresql.Driver",
             "postgresql",
+            parameters.isSSLEnabled(),
             LOG);
   }
 
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
index b7a5838..ae3d6a6 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
@@ -24,9 +24,7 @@ import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
@@ -39,6 +37,9 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
   private static final String DATABASE_TABLE_KEY = "db_table";
   private static final String DATABASE_USER_KEY = "db_user";
   private static final String DATABASE_PASSWORD_KEY = "db_password";
+  private static final String SSL_MODE = "ssl_mode";
+  private static final String SSL_ENABLED = "ssl_enabled";
+  private static final String SSL_DISABLED = "ssl_disabled";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -55,6 +56,10 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
             .requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
             .requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
             .requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+            .requiredSingleValueSelection(Labels.withId(SSL_MODE),
+                    Options.from(
+                      new Tuple2<>("Yes", SSL_ENABLED),
+                      new Tuple2<>("No", SSL_DISABLED)))
             .build();
   }
 
@@ -68,6 +73,7 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
     String tableName = extractor.singleValueParameter(DATABASE_TABLE_KEY, String.class);
     String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
     String password = extractor.secretValue(DATABASE_PASSWORD_KEY);
+    String sslSelection = extractor.selectedSingleValueInternalName(SSL_MODE, String.class);
 
     PostgreSqlParameters params = new PostgreSqlParameters(graph,
             hostname,
@@ -75,7 +81,8 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
             dbName,
             tableName,
             user,
-            password);
+            password,
+            sslSelection.equals(SSL_ENABLED));
 
     return new ConfiguredEventSink<>(params, PostgreSql::new);
   }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
index d45dfdb..618c595 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
@@ -29,8 +29,9 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
   private String tableName;
   private String user;
   private String password;
+  private boolean sslEnabled;
 
-  public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password) {
+  public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password, Boolean sslEnabled) {
     super(graph);
     this.PostgreSqlHost = PostgreSqlHost;
     this.PostgreSqlPort = PostgreSqlPort;
@@ -38,6 +39,7 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
     this.tableName = tableName;
     this.user = user;
     this.password = password;
+    this.sslEnabled = sslEnabled;
   }
 
   public String getPostgreSqlHost() {
@@ -63,4 +65,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
   public String getPassword() {
     return password;
   }
+
+  public boolean isSSLEnabled() {
+    return sslEnabled;
+  }
 }
diff --git a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
index 7e3e423..3b4125e 100644
--- a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
+++ b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
@@ -19,3 +19,5 @@ db_user.description=The username for the PostgreSQL Server
 db_password.title=Password
 db_password.description=The password for the PostgreSQL Server
 
+ssl_mode.title=SSL Mode
+ssl_mode.description=Should the connection be secured via SSL?