You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/21 10:32:09 UTC

[incubator-streampipes-extensions] branch feature/postgis-sink updated: added PostgresJBDC-Client and rechanges IoT-DB

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/feature/postgis-sink by this push:
     new 4c629c1  added PostgresJBDC-Client and rechanges IoT-DB
4c629c1 is described below

commit 4c629c10fd0b9f86923a5e90ecf00ee04b65b090
Author: micklich <fl...@disy.net>
AuthorDate: Thu May 21 12:31:53 2020 +0200

    added PostgresJBDC-Client and rechanges IoT-DB
---
 .../sinks/databases/jvm/iotdb/IotDb.java           |   4 +-
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java | 131 ++-------------------
 .../sinks/databases/jvm/postgresql/PostgreSql.java |   3 +-
 .../PostgresJdbcClient.java}                       |  46 ++++----
 4 files changed, 36 insertions(+), 148 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
index 4fefa22..177270a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
@@ -56,9 +56,7 @@ public class IotDb extends JdbcClient implements EventSink<IotDbParameters> {
             ".*",
             "org.apache.iotdb.jdbc.IoTDBDriver",
             "iotdb",
-            LOG,
-            "public",
-            false);
+            LOG);
 
   }
 
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index 50a4e8a..ff3f539 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -36,13 +36,10 @@ public class JdbcClient {
     private String allowedRegEx;
 
     protected String tableName;
-    protected String schemaName;
     protected String user;
     protected String password;
 
     protected boolean tableExists = false;
-    protected boolean schemaExists = false;
-    protected boolean isToDropTable = false;
 
     protected Logger logger;
 
@@ -64,7 +61,7 @@ public class JdbcClient {
      * If no matching type is found, it is interpreted as a String (VARCHAR(255))
      */
     protected enum SqlAttribute {
-        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
+        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"), BOOLEAN("BOOLEAN");
         private final String sqlName;
 
         SqlAttribute(String s) {
@@ -183,17 +180,13 @@ public class JdbcClient {
                                   String allowedRegEx,
                                   String driver,
                                   String urlName,
-                                  Logger logger,
-                                  String schemaName,
-                                  boolean isToDropTable) throws SpRuntimeException {
+                                  Logger logger) throws SpRuntimeException {
         this.tableName = tableName;
         this.user = user;
         this.password = password;
         this.allowedRegEx = allowedRegEx;
         this.logger = logger;
         this.eventProperties = eventProperties;
-        this.schemaName = schemaName;
-        this.isToDropTable = isToDropTable;
         try {
             Class.forName(driver);
         } catch (ClassNotFoundException e) {
@@ -216,19 +209,9 @@ public class JdbcClient {
         try {
             c = DriverManager.getConnection(url, user, password);
             ensureDatabaseExists(url, databaseName);
-            ensureSchemaExists(url,databaseName);
             ensureTableExists(url, databaseName);
         } catch (SQLException e) {
-          // host or port is wrong -- Class 08  Connection Exception
-          if (e.getSQLState().substring(0, 2).equals("08")) {
-            throw new SpRuntimeException("Connection can't be established. Check host or port setting: \n" + e.getMessage());
-          }
-          // username or password is wrong -- Class 28  Invalid Authorization Specification
-          else if(e.getSQLState().substring(0, 2).equals("28")) {
-            throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
-          } else {
             throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
-          }
         }
     }
 
@@ -255,43 +238,6 @@ public class JdbcClient {
 		closeAll();
 	}
 
-  /**
-   * If this method returns successfully a schema with the name in {@link JdbcClient#schemaName} exists in the database
-   * with the given database name exists on the server, specified by the url.
-   *
-   * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
-   * @param databaseName The database in which the table should exist
-   *
-   * @throws SpRuntimeException If the table does not exist and could not be created
-   */
-  protected void ensureSchemaExists(String url, String databaseName) throws SpRuntimeException {
-    try {
-      // Database should exist by now so we can establish a connection
-      c = DriverManager.getConnection(url + databaseName, user, password);
-      st = c.createStatement();
-      ResultSet rs = c.getMetaData().getSchemas();
-
-      boolean isItExisting = false;
-      while (rs.next()) {
-        String schema = rs.getString("TABLE_SCHEM");
-        if (schema.toLowerCase().equals(schemaName.toLowerCase())){
-          isItExisting = true;
-        }
-      }
-
-      if (!isItExisting) {
-        createSchema(); }
-
-      schemaExists = true;
-      rs.close();
-    } catch (SQLException e) {
-      closeAll();
-      throw new SpRuntimeException(e.getMessage());
-    }
-  }
-
-
-
 	/**
 	 * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
 	 * with the given database name exists on the server, specified by the url.
@@ -306,18 +252,12 @@ public class JdbcClient {
 			c = DriverManager.getConnection(url + databaseName, user, password);
 			st = c.createStatement();
 			ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
-      while (rs.next()) {
-        // same table names can exists in different schmemas
-        if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())){
-          if (isToDropTable){
-            createTable();
-          }
-          validateTable();
-        } else {
-          createTable();
-        }
-      }
-      tableExists = true;
+			if (rs.next()) {
+				validateTable();
+			} else {
+				createTable();
+			}
+			tableExists = true;
 			rs.close();
 		} catch (SQLException e) {
 			closeAll();
@@ -357,13 +297,6 @@ public class JdbcClient {
 		if (event == null) {
 			throw new SpRuntimeException("event is null");
 		}
-
-    if (!schemaExists) {
-      // Creates the schema
-      createSchema();
-      schemaExists = true;
-    }
-
 		if (!tableExists) {
 			// Creates the table
 			createTable();
@@ -443,8 +376,6 @@ public class JdbcClient {
         StringBuilder statement1 = new StringBuilder("INSERT INTO ");
         StringBuilder statement2 = new StringBuilder("VALUES ( ");
         checkRegEx(tableName, "Tablename");
-        checkRegEx(schemaName, "Tablename");
-        statement1.append(schemaName).append(".");
         statement1.append(tableName).append(" ( ");
 
         // Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
@@ -491,25 +422,6 @@ public class JdbcClient {
         return index;
     }
 
-  /**
-   * Creates a schema with the name {@link JdbcClient#schemaName}
-   *
-   * @throws SpRuntimeException If the {@link JdbcClient#schemaName}  is not allowed, if executeUpdate throws an SQLException
-   */
-  protected void createSchema() throws SpRuntimeException {
-      checkConnected();
-      checkRegEx(tableName, "Tablename");
-
-      StringBuilder statement = new StringBuilder("CREATE SCHEMA ");
-      statement.append(schemaName).append(";");
-      try {
-        st.executeUpdate(statement.toString());
-      } catch (SQLException e) {
-        throw new SpRuntimeException(e.getMessage());
-      }
-    }
-
-
     /**
      * Creates a table with the name {@link JdbcClient#tableName} and the
      * properties {@link JdbcClient#eventProperties}. Calls
@@ -524,35 +436,14 @@ public class JdbcClient {
         checkConnected();
         checkRegEx(tableName, "Tablename");
 
-        if (isToDropTable){
-          StringBuilder statement = new StringBuilder("DROP TABLE IF EXISTS ");
-          statement.append(schemaName);
-          statement.append(".");
-          statement.append(tableName);
-          statement.append(";");
-
-          try {
-            st.executeUpdate(statement.toString());
-          } catch (SQLException e) {
-            throw new SpRuntimeException(e.getMessage());
-          }
-        }
-
-        StringBuilder statement = new StringBuilder("CREATE TABLE ");
-        statement.append(schemaName);
-        statement.append(".");
-        statement.append(tableName).append(" ( ");
+        StringBuilder statement = new StringBuilder("CREATE TABLE \"");
+        statement.append(tableName).append("\" ( ");
         statement.append(extractEventProperties(eventProperties)).append(" );");
 
         try {
             st.executeUpdate(statement.toString());
         } catch (SQLException e) {
-          e.getErrorCode();
-          if (e.getSQLState().equals("42P07")) {
-            throw new SpRuntimeException("Table already exists. Change option \"DROP TABLE\" to prevent this error. Error Message: " + e.getMessage());
-          } else {
-            throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
-          }
+            throw new SpRuntimeException(e.getMessage());
         }
     }
 
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 2f0b401..c8e52aa 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -21,11 +21,10 @@ package org.apache.streampipes.sinks.databases.jvm.postgresql;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
-public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParameters> {
+public class PostgreSql extends PostgresJdbcClient implements EventSink<PostgreSqlParameters> {
 
   private static Logger LOG;
 
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
similarity index 93%
copy from streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
copy to streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index 50a4e8a..fccc723 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.sinks.databases.jvm.jdbcclient;
+package org.apache.streampipes.sinks.databases.jvm.postgresql;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.logging.api.Logger;
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Map;
 
 
-public class JdbcClient {
+public class PostgresJdbcClient {
     private String allowedRegEx;
 
     protected String tableName;
@@ -170,7 +170,7 @@ public class JdbcClient {
     }
 
 
-    public JdbcClient() {
+    public PostgresJdbcClient() {
     }
 
     protected void initializeJdbc(List<EventProperty> eventProperties,
@@ -205,8 +205,8 @@ public class JdbcClient {
 
 
     /**
-     * Connects to the HadoopFileSystem Server and initilizes {@link JdbcClient#c} and
-     * {@link JdbcClient#st}
+     * Connects to the HadoopFileSystem Server and initilizes {@link PostgresJdbcClient#c} and
+     * {@link PostgresJdbcClient#st}
      *
      * @throws SpRuntimeException When the connection could not be established (because of a
      *                            wrong identification, missing database etc.)
@@ -256,7 +256,7 @@ public class JdbcClient {
 	}
 
   /**
-   * If this method returns successfully a schema with the name in {@link JdbcClient#schemaName} exists in the database
+   * If this method returns successfully a schema with the name in {@link PostgresJdbcClient#schemaName} exists in the database
    * with the given database name exists on the server, specified by the url.
    *
    * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
@@ -293,7 +293,7 @@ public class JdbcClient {
 
 
 	/**
-	 * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
+	 * If this method returns successfully a table with the name in {@link PostgresJdbcClient#tableName} exists in the database
 	 * with the given database name exists on the server, specified by the url.
 	 *
 	 * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
@@ -327,7 +327,7 @@ public class JdbcClient {
 
     /**
      * Clears, fills and executes the saved prepared statement {@code ps} with the data found in
-     * event. To fill in the values it calls {@link JdbcClient#fillPreparedStatement(Map)}.
+     * event. To fill in the values it calls {@link PostgresJdbcClient#fillPreparedStatement(Map)}.
      *
      * @param event Data to be saved in the SQL table
      * @throws SQLException       When the statement cannot be executed
@@ -397,9 +397,9 @@ public class JdbcClient {
     }
 
     /**
-     * Fills a prepared statement with the actual values base on {@link JdbcClient#parameters}. If
-     * {@link JdbcClient#parameters} is empty or not complete (which should only happen once in the
-     * begining), it calls {@link JdbcClient#generatePreparedStatement(Map)} to generate a new one.
+     * Fills a prepared statement with the actual values base on {@link PostgresJdbcClient#parameters}. If
+     * {@link PostgresJdbcClient#parameters} is empty or not complete (which should only happen once in the
+     * begining), it calls {@link PostgresJdbcClient#generatePreparedStatement(Map)} to generate a new one.
      *
      * @param event
      * @param pre
@@ -427,7 +427,7 @@ public class JdbcClient {
     }
 
     /**
-     * Initializes the variables {@link JdbcClient#parameters} and {@link JdbcClient#ps}
+     * Initializes the variables {@link PostgresJdbcClient#parameters} and {@link PostgresJdbcClient#ps}
      * according to the parameter event.
      *
      * @param event The event which is getting analyzed
@@ -492,9 +492,9 @@ public class JdbcClient {
     }
 
   /**
-   * Creates a schema with the name {@link JdbcClient#schemaName}
+   * Creates a schema with the name {@link PostgresJdbcClient#schemaName}
    *
-   * @throws SpRuntimeException If the {@link JdbcClient#schemaName}  is not allowed, if executeUpdate throws an SQLException
+   * @throws SpRuntimeException If the {@link PostgresJdbcClient#schemaName}  is not allowed, if executeUpdate throws an SQLException
    */
   protected void createSchema() throws SpRuntimeException {
       checkConnected();
@@ -511,13 +511,13 @@ public class JdbcClient {
 
 
     /**
-     * Creates a table with the name {@link JdbcClient#tableName} and the
-     * properties {@link JdbcClient#eventProperties}. Calls
-     * {@link JdbcClient#extractEventProperties(List)} internally with the
-     * {@link JdbcClient#eventProperties} to extract all possible columns.
+     * Creates a table with the name {@link PostgresJdbcClient#tableName} and the
+     * properties {@link PostgresJdbcClient#eventProperties}. Calls
+     * {@link PostgresJdbcClient#extractEventProperties(List)} internally with the
+     * {@link PostgresJdbcClient#eventProperties} to extract all possible columns.
      *
-     * @throws SpRuntimeException If the {@link JdbcClient#tableName}  is not allowed, if
-     *                            executeUpdate throws an SQLException or if {@link JdbcClient#extractEventProperties(List)}
+     * @throws SpRuntimeException If the {@link PostgresJdbcClient#tableName}  is not allowed, if
+     *                            executeUpdate throws an SQLException or if {@link PostgresJdbcClient#extractEventProperties(List)}
      *                            throws an exception
      */
 	protected void createTable() throws SpRuntimeException {
@@ -558,11 +558,11 @@ public class JdbcClient {
 
     /**
      * Creates a SQL-Query with the given Properties (SQL-Injection safe). Calls
-     * {@link JdbcClient#extractEventProperties(List, String)} with an empty string
+     * {@link PostgresJdbcClient#extractEventProperties(List, String)} with an empty string
      *
      * @param properties The list of properties which should be included in the query
      * @return A StringBuilder with the query which needs to be executed in order to create the table
-     * @throws SpRuntimeException See {@link JdbcClient#extractEventProperties(List, String)} for details
+     * @throws SpRuntimeException See {@link PostgresJdbcClient#extractEventProperties(List, String)} for details
      */
     private StringBuilder extractEventProperties(List<EventProperty> properties)
             throws SpRuntimeException {
@@ -621,7 +621,7 @@ public class JdbcClient {
      *
      * @param input String which is getting matched with the regEx
      * @param regExIdentifier Information about the use of the input. Gets included in the exception message
-     * @throws SpRuntimeException If {@code input} does not match with {@link JdbcClient#allowedRegEx}
+     * @throws SpRuntimeException If {@code input} does not match with {@link PostgresJdbcClient#allowedRegEx}
      *                            or if the length of {@code input} is 0
      */
     protected final void checkRegEx(String input, String regExIdentifier) throws SpRuntimeException {