You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/15 08:39:06 UTC

[incubator-streampipes-extensions] branch feature/postgis-sink created (now 58aebd1)

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a change to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git.


      at 58aebd1  extendet iotdb with standard parameters for new options (need testing)

This branch includes the following new commits:

     new 21d885d  env file
     new ac4d636  extended db client
     new aa62c4d  postgres sink extension with new options
     new e268347  completed documentation
     new 58aebd1  extendet iotdb with standard parameters for new options (need testing)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampipes-extensions] 04/05: completed documentation

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit e268347185ac47225f8dafd3b38ebb1dd8db22b4
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:38:03 2020 +0200

    completed documentation
---
 .../documentation.md                                              | 8 +++++++-
 .../strings.en                                                    | 5 +++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/documentation.md b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/documentation.md
index b219701..10d920c 100644
--- a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/documentation.md
+++ b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/documentation.md
@@ -18,7 +18,7 @@
 
 ## PostgreSQL
 
-<p align="center"> 
+<p align="center">
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
 </p>
 
@@ -62,6 +62,12 @@ The username for the PostgreSQL Server.
 
 The password for the PostgreSQL Server.
 
+### Schema name
+The name of the schema where events will be stored (will be created if it does not exist)
+
+### Drop Table
+Deletes the Tables if it already exists and creates a new new table with the same name
+
 ## Output
 
 (not applicable for data sinks)
diff --git a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
index 7e3e423..5b6860c 100644
--- a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
+++ b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.postgresql/strings.en
@@ -19,3 +19,8 @@ db_user.description=The username for the PostgreSQL Server
 db_password.title=Password
 db_password.description=The password for the PostgreSQL Server
 
+db_schema.title=Schema Name
+db_schema.description=The name of the schema where events will be stored (will be created if it does not exist)
+
+db_replaceTable.title=Replace Table if Exists
+db_replaceTable.description=Drops the table if exists and create a new table
\ No newline at end of file


[incubator-streampipes-extensions] 01/05: env file

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 21d885d3a5ad41b561db05d1da9aaf2734442df7
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:36:21 2020 +0200

    env file
---
 streampipes-sinks-databases-jvm/development/env | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/development/env b/streampipes-sinks-databases-jvm/development/env
index 6a41c19..ea12c66 100644
--- a/streampipes-sinks-databases-jvm/development/env
+++ b/streampipes-sinks-databases-jvm/development/env
@@ -14,6 +14,6 @@
 # limitations under the License.
 
 # Those parameters are used by IntelliJ to set the default consul parameters for development
-SP_PORT=7015
-SP_HOST=host.docker.internal
+SP_PORT=8006
+SP_HOST=172.17.0.1
 SP_DEBUG=true


[incubator-streampipes-extensions] 02/05: extended db client

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit ac4d6360aa6ca9baa04a9f74b5f483120d157108
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:37:09 2020 +0200

    extended db client
---
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java | 131 +++++++++++++++++++--
 1 file changed, 120 insertions(+), 11 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index ff3f539..50a4e8a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -36,10 +36,13 @@ public class JdbcClient {
     private String allowedRegEx;
 
     protected String tableName;
+    protected String schemaName;
     protected String user;
     protected String password;
 
     protected boolean tableExists = false;
+    protected boolean schemaExists = false;
+    protected boolean isToDropTable = false;
 
     protected Logger logger;
 
@@ -61,7 +64,7 @@ public class JdbcClient {
      * If no matching type is found, it is interpreted as a String (VARCHAR(255))
      */
     protected enum SqlAttribute {
-        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"), BOOLEAN("BOOLEAN");
+        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
         private final String sqlName;
 
         SqlAttribute(String s) {
@@ -180,13 +183,17 @@ public class JdbcClient {
                                   String allowedRegEx,
                                   String driver,
                                   String urlName,
-                                  Logger logger) throws SpRuntimeException {
+                                  Logger logger,
+                                  String schemaName,
+                                  boolean isToDropTable) throws SpRuntimeException {
         this.tableName = tableName;
         this.user = user;
         this.password = password;
         this.allowedRegEx = allowedRegEx;
         this.logger = logger;
         this.eventProperties = eventProperties;
+        this.schemaName = schemaName;
+        this.isToDropTable = isToDropTable;
         try {
             Class.forName(driver);
         } catch (ClassNotFoundException e) {
@@ -209,9 +216,19 @@ public class JdbcClient {
         try {
             c = DriverManager.getConnection(url, user, password);
             ensureDatabaseExists(url, databaseName);
+            ensureSchemaExists(url,databaseName);
             ensureTableExists(url, databaseName);
         } catch (SQLException e) {
+          // host or port is wrong -- Class 08  Connection Exception
+          if (e.getSQLState().substring(0, 2).equals("08")) {
+            throw new SpRuntimeException("Connection can't be established. Check host or port setting: \n" + e.getMessage());
+          }
+          // username or password is wrong -- Class 28  Invalid Authorization Specification
+          else if(e.getSQLState().substring(0, 2).equals("28")) {
+            throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
+          } else {
             throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
+          }
         }
     }
 
@@ -238,6 +255,43 @@ public class JdbcClient {
 		closeAll();
 	}
 
+  /**
+   * If this method returns successfully a schema with the name in {@link JdbcClient#schemaName} exists in the database
+   * with the given database name exists on the server, specified by the url.
+   *
+   * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
+   * @param databaseName The database in which the table should exist
+   *
+   * @throws SpRuntimeException If the table does not exist and could not be created
+   */
+  protected void ensureSchemaExists(String url, String databaseName) throws SpRuntimeException {
+    try {
+      // Database should exist by now so we can establish a connection
+      c = DriverManager.getConnection(url + databaseName, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getSchemas();
+
+      boolean isItExisting = false;
+      while (rs.next()) {
+        String schema = rs.getString("TABLE_SCHEM");
+        if (schema.toLowerCase().equals(schemaName.toLowerCase())){
+          isItExisting = true;
+        }
+      }
+
+      if (!isItExisting) {
+        createSchema(); }
+
+      schemaExists = true;
+      rs.close();
+    } catch (SQLException e) {
+      closeAll();
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+
+
 	/**
 	 * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
 	 * with the given database name exists on the server, specified by the url.
@@ -252,12 +306,18 @@ public class JdbcClient {
 			c = DriverManager.getConnection(url + databaseName, user, password);
 			st = c.createStatement();
 			ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
-			if (rs.next()) {
-				validateTable();
-			} else {
-				createTable();
-			}
-			tableExists = true;
+      while (rs.next()) {
+        // same table names can exists in different schmemas
+        if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())){
+          if (isToDropTable){
+            createTable();
+          }
+          validateTable();
+        } else {
+          createTable();
+        }
+      }
+      tableExists = true;
 			rs.close();
 		} catch (SQLException e) {
 			closeAll();
@@ -297,6 +357,13 @@ public class JdbcClient {
 		if (event == null) {
 			throw new SpRuntimeException("event is null");
 		}
+
+    if (!schemaExists) {
+      // Creates the schema
+      createSchema();
+      schemaExists = true;
+    }
+
 		if (!tableExists) {
 			// Creates the table
 			createTable();
@@ -376,6 +443,8 @@ public class JdbcClient {
         StringBuilder statement1 = new StringBuilder("INSERT INTO ");
         StringBuilder statement2 = new StringBuilder("VALUES ( ");
         checkRegEx(tableName, "Tablename");
+        checkRegEx(schemaName, "Tablename");
+        statement1.append(schemaName).append(".");
         statement1.append(tableName).append(" ( ");
 
         // Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
@@ -422,6 +491,25 @@ public class JdbcClient {
         return index;
     }
 
+  /**
+   * Creates a schema with the name {@link JdbcClient#schemaName}
+   *
+   * @throws SpRuntimeException If the {@link JdbcClient#schemaName}  is not allowed, if executeUpdate throws an SQLException
+   */
+  protected void createSchema() throws SpRuntimeException {
+      checkConnected();
+      checkRegEx(tableName, "Tablename");
+
+      StringBuilder statement = new StringBuilder("CREATE SCHEMA ");
+      statement.append(schemaName).append(";");
+      try {
+        st.executeUpdate(statement.toString());
+      } catch (SQLException e) {
+        throw new SpRuntimeException(e.getMessage());
+      }
+    }
+
+
     /**
      * Creates a table with the name {@link JdbcClient#tableName} and the
      * properties {@link JdbcClient#eventProperties}. Calls
@@ -436,14 +524,35 @@ public class JdbcClient {
         checkConnected();
         checkRegEx(tableName, "Tablename");
 
-        StringBuilder statement = new StringBuilder("CREATE TABLE \"");
-        statement.append(tableName).append("\" ( ");
+        if (isToDropTable){
+          StringBuilder statement = new StringBuilder("DROP TABLE IF EXISTS ");
+          statement.append(schemaName);
+          statement.append(".");
+          statement.append(tableName);
+          statement.append(";");
+
+          try {
+            st.executeUpdate(statement.toString());
+          } catch (SQLException e) {
+            throw new SpRuntimeException(e.getMessage());
+          }
+        }
+
+        StringBuilder statement = new StringBuilder("CREATE TABLE ");
+        statement.append(schemaName);
+        statement.append(".");
+        statement.append(tableName).append(" ( ");
         statement.append(extractEventProperties(eventProperties)).append(" );");
 
         try {
             st.executeUpdate(statement.toString());
         } catch (SQLException e) {
-            throw new SpRuntimeException(e.getMessage());
+          e.getErrorCode();
+          if (e.getSQLState().equals("42P07")) {
+            throw new SpRuntimeException("Table already exists. Change option \"DROP TABLE\" to prevent this error. Error Message: " + e.getMessage());
+          } else {
+            throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+          }
         }
     }
 


[incubator-streampipes-extensions] 03/05: postgres sink extension with new options

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit aa62c4d582052cd71a10cb043872dcddb957331d
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:37:43 2020 +0200

    postgres sink extension with new options
---
 .../sinks/databases/jvm/postgresql/PostgreSql.java |  4 +++-
 .../jvm/postgresql/PostgreSqlController.java       | 24 +++++++++++++++++++++-
 .../jvm/postgresql/PostgreSqlParameters.java       | 10 ++++++++-
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 68ae54d..2f0b401 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -47,7 +47,9 @@ public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParame
             "^[a-zA-Z_][a-zA-Z0-9_]*$",
             "org.postgresql.Driver",
             "postgresql",
-            LOG);
+            LOG,
+            parameters.getSchemaName(),
+            parameters.isToDropTable());
   }
 
   @Override
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
index b7a5838..356bab6 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
@@ -38,7 +39,12 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
   private static final String DATABASE_NAME_KEY = "db_name";
   private static final String DATABASE_TABLE_KEY = "db_table";
   private static final String DATABASE_USER_KEY = "db_user";
+  private static final String DATABASE_SCHEMA_KEY = "db_schema";
   private static final String DATABASE_PASSWORD_KEY = "db_password";
+  private static final String DATABASE_REPLACETABLE_KEY = "db_replaceTable";
+
+  private static final String YES = "Yes";
+  private static final String NO = "No";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -54,7 +60,11 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
             .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
             .requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
             .requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
+            .requiredTextParameter(Labels.withId(DATABASE_SCHEMA_KEY))
             .requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+            .requiredSingleValueSelection(
+                Labels.withId(DATABASE_REPLACETABLE_KEY),
+                Options.from(YES, NO))
             .build();
   }
 
@@ -62,12 +72,22 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
   public ConfiguredEventSink<PostgreSqlParameters> onInvocation(DataSinkInvocation graph,
                                                                 DataSinkParameterExtractor extractor) {
 
+
+
     String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, String.class);
     Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, Integer.class);
     String dbName = extractor.singleValueParameter(DATABASE_NAME_KEY, String.class);
     String tableName = extractor.singleValueParameter(DATABASE_TABLE_KEY, String.class);
     String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
     String password = extractor.secretValue(DATABASE_PASSWORD_KEY);
+    String schemaName = extractor.singleValueParameter(DATABASE_SCHEMA_KEY, String.class);
+    String valueOption = extractor.selectedSingleValue(DATABASE_REPLACETABLE_KEY, String.class);
+
+
+    boolean IsToDropTable = false;
+    if (valueOption.equals(YES)) {
+      IsToDropTable = true;
+    }
 
     PostgreSqlParameters params = new PostgreSqlParameters(graph,
             hostname,
@@ -75,7 +95,9 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
             dbName,
             tableName,
             user,
-            password);
+            password,
+            schemaName,
+            IsToDropTable);
 
     return new ConfiguredEventSink<>(params, PostgreSql::new);
   }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
index d45dfdb..2e50fdd 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
@@ -29,8 +29,10 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
   private String tableName;
   private String user;
   private String password;
+  private String schemaName;
+  private boolean isToDropTable;
 
-  public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password) {
+  public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password, String schemaName, boolean dropTable) {
     super(graph);
     this.PostgreSqlHost = PostgreSqlHost;
     this.PostgreSqlPort = PostgreSqlPort;
@@ -38,6 +40,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
     this.tableName = tableName;
     this.user = user;
     this.password = password;
+    this.schemaName = schemaName;
+    this.isToDropTable = dropTable;
   }
 
   public String getPostgreSqlHost() {
@@ -63,4 +67,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
   public String getPassword() {
     return password;
   }
+
+  public String getSchemaName() { return schemaName; }
+
+  public boolean isToDropTable() { return isToDropTable; }
 }


[incubator-streampipes-extensions] 05/05: extendet iotdb with standard parameters for new options (need testing)

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 58aebd146323fa8c5d023916bab4cc1e266648cf
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:38:43 2020 +0200

    extendet iotdb with standard parameters for new options (need testing)
---
 .../java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
index 177270a..4fefa22 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
@@ -56,7 +56,9 @@ public class IotDb extends JdbcClient implements EventSink<IotDbParameters> {
             ".*",
             "org.apache.iotdb.jdbc.IoTDBDriver",
             "iotdb",
-            LOG);
+            LOG,
+            "public",
+            false);
 
   }