You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/15 08:39:08 UTC

[incubator-streampipes-extensions] 02/05: extended db client

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit ac4d6360aa6ca9baa04a9f74b5f483120d157108
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:37:09 2020 +0200

    extended db client
---
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java | 131 +++++++++++++++++++--
 1 file changed, 120 insertions(+), 11 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index ff3f539..50a4e8a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -36,10 +36,13 @@ public class JdbcClient {
     private String allowedRegEx;
 
     protected String tableName;
+    protected String schemaName;
     protected String user;
     protected String password;
 
     protected boolean tableExists = false;
+    protected boolean schemaExists = false;
+    protected boolean isToDropTable = false;
 
     protected Logger logger;
 
@@ -61,7 +64,7 @@ public class JdbcClient {
      * If no matching type is found, it is interpreted as a String (VARCHAR(255))
      */
     protected enum SqlAttribute {
-        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"), BOOLEAN("BOOLEAN");
+        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
         private final String sqlName;
 
         SqlAttribute(String s) {
@@ -180,13 +183,17 @@ public class JdbcClient {
                                   String allowedRegEx,
                                   String driver,
                                   String urlName,
-                                  Logger logger) throws SpRuntimeException {
+                                  Logger logger,
+                                  String schemaName,
+                                  boolean isToDropTable) throws SpRuntimeException {
         this.tableName = tableName;
         this.user = user;
         this.password = password;
         this.allowedRegEx = allowedRegEx;
         this.logger = logger;
         this.eventProperties = eventProperties;
+        this.schemaName = schemaName;
+        this.isToDropTable = isToDropTable;
         try {
             Class.forName(driver);
         } catch (ClassNotFoundException e) {
@@ -209,9 +216,19 @@ public class JdbcClient {
         try {
             c = DriverManager.getConnection(url, user, password);
             ensureDatabaseExists(url, databaseName);
+            ensureSchemaExists(url,databaseName);
             ensureTableExists(url, databaseName);
         } catch (SQLException e) {
+          // host or port is wrong -- Class 08  Connection Exception
+          if (e.getSQLState().substring(0, 2).equals("08")) {
+            throw new SpRuntimeException("Connection can't be established. Check host or port setting: \n" + e.getMessage());
+          }
+          // username or password is wrong -- Class 28  Invalid Authorization Specification
+          else if(e.getSQLState().substring(0, 2).equals("28")) {
+            throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
+          } else {
             throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
+          }
         }
     }
 
@@ -238,6 +255,43 @@ public class JdbcClient {
 		closeAll();
 	}
 
+  /**
+   * If this method returns successfully a schema with the name in {@link JdbcClient#schemaName} exists in the database
+   * with the given database name exists on the server, specified by the url.
+   *
+   * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
+   * @param databaseName The database in which the table should exist
+   *
+   * @throws SpRuntimeException If the table does not exist and could not be created
+   */
+  protected void ensureSchemaExists(String url, String databaseName) throws SpRuntimeException {
+    try {
+      // Database should exist by now so we can establish a connection
+      c = DriverManager.getConnection(url + databaseName, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getSchemas();
+
+      boolean isItExisting = false;
+      while (rs.next()) {
+        String schema = rs.getString("TABLE_SCHEM");
+        if (schema.toLowerCase().equals(schemaName.toLowerCase())){
+          isItExisting = true;
+        }
+      }
+
+      if (!isItExisting) {
+        createSchema(); }
+
+      schemaExists = true;
+      rs.close();
+    } catch (SQLException e) {
+      closeAll();
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+
+
 	/**
 	 * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
 	 * with the given database name exists on the server, specified by the url.
@@ -252,12 +306,18 @@ public class JdbcClient {
 			c = DriverManager.getConnection(url + databaseName, user, password);
 			st = c.createStatement();
 			ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
-			if (rs.next()) {
-				validateTable();
-			} else {
-				createTable();
-			}
-			tableExists = true;
+      while (rs.next()) {
+        // same table names can exists in different schmemas
+        if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())){
+          if (isToDropTable){
+            createTable();
+          }
+          validateTable();
+        } else {
+          createTable();
+        }
+      }
+      tableExists = true;
 			rs.close();
 		} catch (SQLException e) {
 			closeAll();
@@ -297,6 +357,13 @@ public class JdbcClient {
 		if (event == null) {
 			throw new SpRuntimeException("event is null");
 		}
+
+    if (!schemaExists) {
+      // Creates the schema
+      createSchema();
+      schemaExists = true;
+    }
+
 		if (!tableExists) {
 			// Creates the table
 			createTable();
@@ -376,6 +443,8 @@ public class JdbcClient {
         StringBuilder statement1 = new StringBuilder("INSERT INTO ");
         StringBuilder statement2 = new StringBuilder("VALUES ( ");
         checkRegEx(tableName, "Tablename");
+        checkRegEx(schemaName, "Tablename");
+        statement1.append(schemaName).append(".");
         statement1.append(tableName).append(" ( ");
 
         // Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
@@ -422,6 +491,25 @@ public class JdbcClient {
         return index;
     }
 
+  /**
+   * Creates a schema with the name {@link JdbcClient#schemaName}
+   *
+   * @throws SpRuntimeException If the {@link JdbcClient#schemaName}  is not allowed, if executeUpdate throws an SQLException
+   */
+  protected void createSchema() throws SpRuntimeException {
+      checkConnected();
+      checkRegEx(tableName, "Tablename");
+
+      StringBuilder statement = new StringBuilder("CREATE SCHEMA ");
+      statement.append(schemaName).append(";");
+      try {
+        st.executeUpdate(statement.toString());
+      } catch (SQLException e) {
+        throw new SpRuntimeException(e.getMessage());
+      }
+    }
+
+
     /**
      * Creates a table with the name {@link JdbcClient#tableName} and the
      * properties {@link JdbcClient#eventProperties}. Calls
@@ -436,14 +524,35 @@ public class JdbcClient {
         checkConnected();
         checkRegEx(tableName, "Tablename");
 
-        StringBuilder statement = new StringBuilder("CREATE TABLE \"");
-        statement.append(tableName).append("\" ( ");
+        if (isToDropTable){
+          StringBuilder statement = new StringBuilder("DROP TABLE IF EXISTS ");
+          statement.append(schemaName);
+          statement.append(".");
+          statement.append(tableName);
+          statement.append(";");
+
+          try {
+            st.executeUpdate(statement.toString());
+          } catch (SQLException e) {
+            throw new SpRuntimeException(e.getMessage());
+          }
+        }
+
+        StringBuilder statement = new StringBuilder("CREATE TABLE ");
+        statement.append(schemaName);
+        statement.append(".");
+        statement.append(tableName).append(" ( ");
         statement.append(extractEventProperties(eventProperties)).append(" );");
 
         try {
             st.executeUpdate(statement.toString());
         } catch (SQLException e) {
-            throw new SpRuntimeException(e.getMessage());
+          e.getErrorCode();
+          if (e.getSQLState().equals("42P07")) {
+            throw new SpRuntimeException("Table already exists. Change option \"DROP TABLE\" to prevent this error. Error Message: " + e.getMessage());
+          } else {
+            throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+          }
         }
     }