You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/17 20:27:57 UTC
[incubator-streampipes-extensions] branch feature/jdbc_rewrite
updated: change connect with fallback option,
added validation and priviliges
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/jdbc_rewrite
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/feature/jdbc_rewrite by this push:
new 5cd20c5 change connect with fallback option, added validation and priviliges
5cd20c5 is described below
commit 5cd20c5da4543ac8ab4dd285ea89138e4c54344c
Author: micklich <fl...@disy.net>
AuthorDate: Wed Jun 17 22:27:34 2020 +0200
change connect with fallback option, added validation and priviliges
---
.../sinks/databases/jvm/jdbcclient/JdbcClient.java | 80 ++++++++++++++++++++--
.../jvm/postgresql/PostgresJdbcClient.java | 1 -
2 files changed, 75 insertions(+), 6 deletions(-)
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index 4c7d325..b56bb20 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -85,7 +85,7 @@ public class JdbcClient {
this.password = password;
this.allowedRegEx = allowedRegEx;
this.logger = logger;
- this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
+ this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/" + databaseName;
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
@@ -93,7 +93,6 @@ public class JdbcClient {
}
connect();
- ensureDatabaseExists();
ensureTableExists();
}
@@ -115,6 +114,14 @@ public class JdbcClient {
// username or password is wrong -- Class 28 Invalid Authorization Specification
else if (e.getSQLState().substring(0, 2).equals("28")) {
throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
+ } else if (e.getSQLState().substring(0, 2).equals("3D")){
+ try {
+ c = DriverManager.getConnection(url.replaceAll(databaseName, ""), user, password);
+ ensureDatabaseExists();
+ } catch (SQLException e2) {
+ closeAll();
+ throw new SpRuntimeException("Neither chosen database name nor username database exists to connect to. Please check settings : \n" + e.getMessage());
+ }
} else {
throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
}
@@ -135,7 +142,12 @@ public class JdbcClient {
st.executeUpdate("CREATE DATABASE " + databaseName + ";");
logger.info("Created new database '" + databaseName + "'");
} catch (SQLException e1) {
- if (!e1.getSQLState().substring(0, 2).equals("42")) {
+ if (e1.getSQLState().equals("42501")){
+ // insufficient_privilege
+ closeAll();
+ throw new SpRuntimeException("Chosen user has no rights to create a database: " + e1.getMessage());
+ } else {
+ closeAll();
throw new SpRuntimeException("Error while creating database: " + e1.getMessage());
}
}
@@ -151,15 +163,20 @@ public class JdbcClient {
protected void ensureTableExists() throws SpRuntimeException {
try {
// Database should exist by now so we can establish a connection
- c = DriverManager.getConnection(url + databaseName, user, password);
+ c = DriverManager.getConnection(url, user, password);
st = c.createStatement();
ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
if (rs.next()) {
- validateTable();
+ //validateTable();
} else {
createTable();
}
tableExists = true;
+
+ if (!userHasPriviligesOnTable()){
+ throw new SpRuntimeException("User \"" + user + "\" has no priviliges to write into table " + tableName);
+ }
+
rs.close();
} catch (SQLException e) {
closeAll();
@@ -424,9 +441,29 @@ public class JdbcClient {
}
}
+
+
protected void validateTable() throws SpRuntimeException {
//TODO: Add validation of an existing table
+ HashMap<String, Integer> columnsMap = new HashMap<>();
+
+ try {
+ ResultSet column = c.getMetaData().getColumns(null, null, tableName, null);
+ while (column.next()) {
+ String columnName = column.getString("COLUMN_NAME");
+ Integer datatype = Integer.parseInt(column.getString("DATA_TYPE"));
+ columnsMap.put(columnName, datatype);
+ }
+ } catch (SQLException throwables) {
+ throwables.printStackTrace();
+ }
+ //todo
+ // create hashmap from event properties with like HashMap<String, Integer> eventMap = new HashMap<>();
+ HashMap<String, Integer> eventMap = new HashMap<>();
+
+ //eventProperties;
if (false) {
+ //todo later (!columnsMap.equals(eventMap))
throw new SpRuntimeException("Table '" + tableName + "' does not match the eventproperties");
}
}
@@ -473,4 +510,37 @@ public class JdbcClient {
throw new SpRuntimeException("Connection is not established.");
}
}
+
+
+ private boolean userHasPriviligesOnTable() {
+ //TODO test with other dbÄs and choose type to allow / only check insert or also update and other terms?
+ boolean output = false;
+ try {
+ c = DriverManager.getConnection(url, user, password);
+ st = c.createStatement();
+ ResultSet rs = c.getMetaData().getTablePrivileges(null, null, tableName);
+ ResultSetMetaData rsmd = rs.getMetaData();
+
+ //todo pic correct colum to check wwith
+ int cols = rsmd.getColumnCount();
+
+ if (!rs.next()){
+ // table does not exist, so user has rights
+ output = true;
+ } else {
+ while (rs.next()) {
+ for (int i = 1; i <= cols; i++) {
+ //todo save readout and compare rights in insert or update yes
+// System.out.println(rs.getString(i));
+ }
+ }
+ }
+ rs.close();
+ } catch (SQLException e) {
+ closeAll();
+ //nothing should go wrong in c and st, rs is checked
+ e.printStackTrace();
+ }
+ return output;
+ }
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index fcbcbe7..1dd1dec 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -75,7 +75,6 @@ public class PostgresJdbcClient extends JdbcClient {
throw new SpRuntimeException("Driver '" + driver + "' not found.");
}
connect();
- ensureDatabaseExists();
ensureSchemaExists();
ensureTableExists();
}