You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/17 20:27:57 UTC

[incubator-streampipes-extensions] branch feature/jdbc_rewrite updated: change connect with fallback option, added validation and priviliges

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/jdbc_rewrite
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/feature/jdbc_rewrite by this push:
     new 5cd20c5  change connect with fallback option, added validation  and priviliges
5cd20c5 is described below

commit 5cd20c5da4543ac8ab4dd285ea89138e4c54344c
Author: micklich <fl...@disy.net>
AuthorDate: Wed Jun 17 22:27:34 2020 +0200

    change connect with fallback option, added validation  and priviliges
---
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java | 80 ++++++++++++++++++++--
 .../jvm/postgresql/PostgresJdbcClient.java         |  1 -
 2 files changed, 75 insertions(+), 6 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index 4c7d325..b56bb20 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -85,7 +85,7 @@ public class JdbcClient {
     this.password = password;
     this.allowedRegEx = allowedRegEx;
     this.logger = logger;
-    this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
+    this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/" + databaseName;
     try {
       Class.forName(driver);
     } catch (ClassNotFoundException e) {
@@ -93,7 +93,6 @@ public class JdbcClient {
     }
 
     connect();
-    ensureDatabaseExists();
     ensureTableExists();
   }
 
@@ -115,6 +114,14 @@ public class JdbcClient {
       // username or password is wrong -- Class 28  Invalid Authorization Specification
       else if (e.getSQLState().substring(0, 2).equals("28")) {
         throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
+      } else if (e.getSQLState().substring(0, 2).equals("3D")){
+        try {
+          c = DriverManager.getConnection(url.replaceAll(databaseName, ""), user, password);
+          ensureDatabaseExists();
+        } catch (SQLException e2) {
+          closeAll();
+          throw new SpRuntimeException("Neither chosen database name nor username database exists to connect to. Please check settings  : \n" + e.getMessage());
+        }
       } else {
         throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
       }
@@ -135,7 +142,12 @@ public class JdbcClient {
       st.executeUpdate("CREATE DATABASE " + databaseName + ";");
       logger.info("Created new database '" + databaseName + "'");
     } catch (SQLException e1) {
-      if (!e1.getSQLState().substring(0, 2).equals("42")) {
+      if (e1.getSQLState().equals("42501")){
+        // insufficient_privilege
+        closeAll();
+        throw new SpRuntimeException("Chosen user has no rights to create a database: " + e1.getMessage());
+      } else {
+        closeAll();
         throw new SpRuntimeException("Error while creating database: " + e1.getMessage());
       }
     }
@@ -151,15 +163,20 @@ public class JdbcClient {
   protected void ensureTableExists() throws SpRuntimeException {
     try {
       // Database should exist by now so we can establish a connection
-      c = DriverManager.getConnection(url + databaseName, user, password);
+      c = DriverManager.getConnection(url, user, password);
       st = c.createStatement();
       ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
       if (rs.next()) {
-        validateTable();
+        //validateTable();
       } else {
         createTable();
       }
       tableExists = true;
+
+      if (!userHasPriviligesOnTable()){
+        throw new SpRuntimeException("User \"" + user + "\" has no priviliges to write into table " + tableName);
+      }
+
       rs.close();
     } catch (SQLException e) {
       closeAll();
@@ -424,9 +441,29 @@ public class JdbcClient {
     }
   }
 
+
+
   protected void validateTable() throws SpRuntimeException {
     //TODO: Add validation of an existing table
+    HashMap<String, Integer> columnsMap = new HashMap<>();
+
+    try {
+      ResultSet column = c.getMetaData().getColumns(null, null, tableName, null);
+      while (column.next()) {
+        String columnName = column.getString("COLUMN_NAME");
+        Integer datatype = Integer.parseInt(column.getString("DATA_TYPE"));
+        columnsMap.put(columnName, datatype);
+      }
+    } catch (SQLException throwables) {
+      throwables.printStackTrace();
+    }
+    //todo
+    // create hashmap from event properties with like HashMap<String, Integer> eventMap = new HashMap<>();
+    HashMap<String, Integer> eventMap = new HashMap<>();
+
+    //eventProperties;
     if (false) {
+      //todo later (!columnsMap.equals(eventMap))
       throw new SpRuntimeException("Table '" + tableName + "' does not match the eventproperties");
     }
   }
@@ -473,4 +510,37 @@ public class JdbcClient {
       throw new SpRuntimeException("Connection is not established.");
     }
   }
+
+
+  private boolean userHasPriviligesOnTable() {
+    //TODO test with other dbÄs and choose type to allow / only check insert or also update and other terms?
+    boolean output = false;
+    try {
+      c = DriverManager.getConnection(url, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getTablePrivileges(null, null, tableName);
+      ResultSetMetaData rsmd = rs.getMetaData();
+
+      //todo pic correct colum to check wwith
+      int cols = rsmd.getColumnCount();
+
+      if (!rs.next()){
+        // table does not exist, so user has rights
+        output = true;
+      } else {
+        while (rs.next()) {
+          for (int i = 1; i <= cols; i++) {
+            //todo save readout and compare rights in insert or update yes
+//            System.out.println(rs.getString(i));
+          }
+        }
+      }
+      rs.close();
+    } catch (SQLException e) {
+      closeAll();
+      //nothing should go wrong in c and st, rs is checked
+      e.printStackTrace();
+    }
+    return output;
+  }
 }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index fcbcbe7..1dd1dec 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -75,7 +75,6 @@ public class PostgresJdbcClient extends JdbcClient {
       throw new SpRuntimeException("Driver '" + driver + "' not found.");
     }
     connect();
-    ensureDatabaseExists();
     ensureSchemaExists();
     ensureTableExists();
   }