You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/15 08:39:09 UTC

[incubator-streampipes-extensions] 03/05: postgres sink extension with new options

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit aa62c4d582052cd71a10cb043872dcddb957331d
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:37:43 2020 +0200

    postgres sink extension with new options
---
 .../sinks/databases/jvm/postgresql/PostgreSql.java |  4 +++-
 .../jvm/postgresql/PostgreSqlController.java       | 24 +++++++++++++++++++++-
 .../jvm/postgresql/PostgreSqlParameters.java       | 10 ++++++++-
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 68ae54d..2f0b401 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -47,7 +47,9 @@ public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParame
             "^[a-zA-Z_][a-zA-Z0-9_]*$",
             "org.postgresql.Driver",
             "postgresql",
-            LOG);
+            LOG,
+            parameters.getSchemaName(),
+            parameters.isToDropTable());
   }
 
   @Override
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
index b7a5838..356bab6 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
@@ -38,7 +39,12 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
   private static final String DATABASE_NAME_KEY = "db_name";
   private static final String DATABASE_TABLE_KEY = "db_table";
   private static final String DATABASE_USER_KEY = "db_user";
+  private static final String DATABASE_SCHEMA_KEY = "db_schema";
   private static final String DATABASE_PASSWORD_KEY = "db_password";
+  private static final String DATABASE_REPLACETABLE_KEY = "db_replaceTable";
+
+  private static final String YES = "Yes";
+  private static final String NO = "No";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -54,7 +60,11 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
             .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
             .requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
             .requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
+            .requiredTextParameter(Labels.withId(DATABASE_SCHEMA_KEY))
             .requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+            .requiredSingleValueSelection(
+                Labels.withId(DATABASE_REPLACETABLE_KEY),
+                Options.from(YES, NO))
             .build();
   }
 
@@ -62,12 +72,22 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
   public ConfiguredEventSink<PostgreSqlParameters> onInvocation(DataSinkInvocation graph,
                                                                 DataSinkParameterExtractor extractor) {
 
+
+
     String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, String.class);
     Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, Integer.class);
     String dbName = extractor.singleValueParameter(DATABASE_NAME_KEY, String.class);
     String tableName = extractor.singleValueParameter(DATABASE_TABLE_KEY, String.class);
     String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
     String password = extractor.secretValue(DATABASE_PASSWORD_KEY);
+    String schemaName = extractor.singleValueParameter(DATABASE_SCHEMA_KEY, String.class);
+    String valueOption = extractor.selectedSingleValue(DATABASE_REPLACETABLE_KEY, String.class);
+
+
+    boolean IsToDropTable = false;
+    if (valueOption.equals(YES)) {
+      IsToDropTable = true;
+    }
 
     PostgreSqlParameters params = new PostgreSqlParameters(graph,
             hostname,
@@ -75,7 +95,9 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
             dbName,
             tableName,
             user,
-            password);
+            password,
+            schemaName,
+            IsToDropTable);
 
     return new ConfiguredEventSink<>(params, PostgreSql::new);
   }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
index d45dfdb..2e50fdd 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
@@ -29,8 +29,10 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
   private String tableName;
   private String user;
   private String password;
+  private String schemaName;
+  private boolean isToDropTable;
 
-  public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password) {
+  public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password, String schemaName, boolean dropTable) {
     super(graph);
     this.PostgreSqlHost = PostgreSqlHost;
     this.PostgreSqlPort = PostgreSqlPort;
@@ -38,6 +40,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
     this.tableName = tableName;
     this.user = user;
     this.password = password;
+    this.schemaName = schemaName;
+    this.isToDropTable = dropTable;
   }
 
   public String getPostgreSqlHost() {
@@ -63,4 +67,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
   public String getPassword() {
     return password;
   }
+
+  public String getSchemaName() { return schemaName; }
+
+  public boolean isToDropTable() { return isToDropTable; }
 }