You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/15 08:39:09 UTC
[incubator-streampipes-extensions] 03/05: postgres sink extension
with new options
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit aa62c4d582052cd71a10cb043872dcddb957331d
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:37:43 2020 +0200
postgres sink extension with new options
---
.../sinks/databases/jvm/postgresql/PostgreSql.java | 4 +++-
.../jvm/postgresql/PostgreSqlController.java | 24 +++++++++++++++++++++-
.../jvm/postgresql/PostgreSqlParameters.java | 10 ++++++++-
3 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 68ae54d..2f0b401 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -47,7 +47,9 @@ public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParame
"^[a-zA-Z_][a-zA-Z0-9_]*$",
"org.postgresql.Driver",
"postgresql",
- LOG);
+ LOG,
+ parameters.getSchemaName(),
+ parameters.isToDropTable());
}
@Override
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
index b7a5838..356bab6 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
@@ -38,7 +39,12 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
private static final String DATABASE_NAME_KEY = "db_name";
private static final String DATABASE_TABLE_KEY = "db_table";
private static final String DATABASE_USER_KEY = "db_user";
+ private static final String DATABASE_SCHEMA_KEY = "db_schema";
private static final String DATABASE_PASSWORD_KEY = "db_password";
+ private static final String DATABASE_REPLACETABLE_KEY = "db_replaceTable";
+
+ private static final String YES = "Yes";
+ private static final String NO = "No";
@Override
public DataSinkDescription declareModel() {
@@ -54,7 +60,11 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
.requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
.requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
.requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
+ .requiredTextParameter(Labels.withId(DATABASE_SCHEMA_KEY))
.requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+ .requiredSingleValueSelection(
+ Labels.withId(DATABASE_REPLACETABLE_KEY),
+ Options.from(YES, NO))
.build();
}
@@ -62,12 +72,22 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
public ConfiguredEventSink<PostgreSqlParameters> onInvocation(DataSinkInvocation graph,
DataSinkParameterExtractor extractor) {
+
+
String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, String.class);
Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, Integer.class);
String dbName = extractor.singleValueParameter(DATABASE_NAME_KEY, String.class);
String tableName = extractor.singleValueParameter(DATABASE_TABLE_KEY, String.class);
String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
String password = extractor.secretValue(DATABASE_PASSWORD_KEY);
+ String schemaName = extractor.singleValueParameter(DATABASE_SCHEMA_KEY, String.class);
+ String valueOption = extractor.selectedSingleValue(DATABASE_REPLACETABLE_KEY, String.class);
+
+
+ boolean IsToDropTable = false;
+ if (valueOption.equals(YES)) {
+ IsToDropTable = true;
+ }
PostgreSqlParameters params = new PostgreSqlParameters(graph,
hostname,
@@ -75,7 +95,9 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
dbName,
tableName,
user,
- password);
+ password,
+ schemaName,
+ IsToDropTable);
return new ConfiguredEventSink<>(params, PostgreSql::new);
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
index d45dfdb..2e50fdd 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
@@ -29,8 +29,10 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
private String tableName;
private String user;
private String password;
+ private String schemaName;
+ private boolean isToDropTable;
- public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password) {
+ public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password, String schemaName, boolean dropTable) {
super(graph);
this.PostgreSqlHost = PostgreSqlHost;
this.PostgreSqlPort = PostgreSqlPort;
@@ -38,6 +40,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
this.tableName = tableName;
this.user = user;
this.password = password;
+ this.schemaName = schemaName;
+ this.isToDropTable = dropTable;
}
public String getPostgreSqlHost() {
@@ -63,4 +67,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
public String getPassword() {
return password;
}
+
+ public String getSchemaName() { return schemaName; }
+
+ public boolean isToDropTable() { return isToDropTable; }
}