You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/15 08:39:08 UTC
[incubator-streampipes-extensions] 02/05: extended db client
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit ac4d6360aa6ca9baa04a9f74b5f483120d157108
Author: micklich <fl...@disy.net>
AuthorDate: Fri May 15 10:37:09 2020 +0200
extended db client
---
.../sinks/databases/jvm/jdbcclient/JdbcClient.java | 131 +++++++++++++++++++--
1 file changed, 120 insertions(+), 11 deletions(-)
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index ff3f539..50a4e8a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -36,10 +36,13 @@ public class JdbcClient {
private String allowedRegEx;
protected String tableName;
+ protected String schemaName;
protected String user;
protected String password;
protected boolean tableExists = false;
+ protected boolean schemaExists = false;
+ protected boolean isToDropTable = false;
protected Logger logger;
@@ -61,7 +64,7 @@ public class JdbcClient {
* If no matching type is found, it is interpreted as a String (VARCHAR(255))
*/
protected enum SqlAttribute {
- INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"), BOOLEAN("BOOLEAN");
+ INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
private final String sqlName;
SqlAttribute(String s) {
@@ -180,13 +183,17 @@ public class JdbcClient {
String allowedRegEx,
String driver,
String urlName,
- Logger logger) throws SpRuntimeException {
+ Logger logger,
+ String schemaName,
+ boolean isToDropTable) throws SpRuntimeException {
this.tableName = tableName;
this.user = user;
this.password = password;
this.allowedRegEx = allowedRegEx;
this.logger = logger;
this.eventProperties = eventProperties;
+ this.schemaName = schemaName;
+ this.isToDropTable = isToDropTable;
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
@@ -209,9 +216,19 @@ public class JdbcClient {
try {
c = DriverManager.getConnection(url, user, password);
ensureDatabaseExists(url, databaseName);
+ ensureSchemaExists(url,databaseName);
ensureTableExists(url, databaseName);
} catch (SQLException e) {
+ // host or port is wrong -- Class 08 Connection Exception
+ if (e.getSQLState().substring(0, 2).equals("08")) {
+ throw new SpRuntimeException("Connection can't be established. Check host or port setting: \n" + e.getMessage());
+ }
+ // username or password is wrong -- Class 28 Invalid Authorization Specification
+ else if(e.getSQLState().substring(0, 2).equals("28")) {
+ throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
+ } else {
throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
+ }
}
}
@@ -238,6 +255,43 @@ public class JdbcClient {
closeAll();
}
+ /**
+ * If this method returns successfully a schema with the name in {@link JdbcClient#schemaName} exists in the database
+ * with the given database name exists on the server, specified by the url.
+ *
+ * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
+ * @param databaseName The database in which the table should exist
+ *
+ * @throws SpRuntimeException If the table does not exist and could not be created
+ */
+ protected void ensureSchemaExists(String url, String databaseName) throws SpRuntimeException {
+ try {
+ // Database should exist by now so we can establish a connection
+ c = DriverManager.getConnection(url + databaseName, user, password);
+ st = c.createStatement();
+ ResultSet rs = c.getMetaData().getSchemas();
+
+ boolean isItExisting = false;
+ while (rs.next()) {
+ String schema = rs.getString("TABLE_SCHEM");
+ if (schema.toLowerCase().equals(schemaName.toLowerCase())){
+ isItExisting = true;
+ }
+ }
+
+ if (!isItExisting) {
+ createSchema(); }
+
+ schemaExists = true;
+ rs.close();
+ } catch (SQLException e) {
+ closeAll();
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+
+
/**
* If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
* with the given database name exists on the server, specified by the url.
@@ -252,12 +306,18 @@ public class JdbcClient {
c = DriverManager.getConnection(url + databaseName, user, password);
st = c.createStatement();
ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
- if (rs.next()) {
- validateTable();
- } else {
- createTable();
- }
- tableExists = true;
+ while (rs.next()) {
+ // same table names can exists in different schmemas
+ if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())){
+ if (isToDropTable){
+ createTable();
+ }
+ validateTable();
+ } else {
+ createTable();
+ }
+ }
+ tableExists = true;
rs.close();
} catch (SQLException e) {
closeAll();
@@ -297,6 +357,13 @@ public class JdbcClient {
if (event == null) {
throw new SpRuntimeException("event is null");
}
+
+ if (!schemaExists) {
+ // Creates the schema
+ createSchema();
+ schemaExists = true;
+ }
+
if (!tableExists) {
// Creates the table
createTable();
@@ -376,6 +443,8 @@ public class JdbcClient {
StringBuilder statement1 = new StringBuilder("INSERT INTO ");
StringBuilder statement2 = new StringBuilder("VALUES ( ");
checkRegEx(tableName, "Tablename");
+ checkRegEx(schemaName, "Tablename");
+ statement1.append(schemaName).append(".");
statement1.append(tableName).append(" ( ");
// Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
@@ -422,6 +491,25 @@ public class JdbcClient {
return index;
}
+ /**
+ * Creates a schema with the name {@link JdbcClient#schemaName}
+ *
+ * @throws SpRuntimeException If the {@link JdbcClient#schemaName} is not allowed, if executeUpdate throws an SQLException
+ */
+ protected void createSchema() throws SpRuntimeException {
+ checkConnected();
+ checkRegEx(tableName, "Tablename");
+
+ StringBuilder statement = new StringBuilder("CREATE SCHEMA ");
+ statement.append(schemaName).append(";");
+ try {
+ st.executeUpdate(statement.toString());
+ } catch (SQLException e) {
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+
/**
* Creates a table with the name {@link JdbcClient#tableName} and the
* properties {@link JdbcClient#eventProperties}. Calls
@@ -436,14 +524,35 @@ public class JdbcClient {
checkConnected();
checkRegEx(tableName, "Tablename");
- StringBuilder statement = new StringBuilder("CREATE TABLE \"");
- statement.append(tableName).append("\" ( ");
+ if (isToDropTable){
+ StringBuilder statement = new StringBuilder("DROP TABLE IF EXISTS ");
+ statement.append(schemaName);
+ statement.append(".");
+ statement.append(tableName);
+ statement.append(";");
+
+ try {
+ st.executeUpdate(statement.toString());
+ } catch (SQLException e) {
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+ StringBuilder statement = new StringBuilder("CREATE TABLE ");
+ statement.append(schemaName);
+ statement.append(".");
+ statement.append(tableName).append(" ( ");
statement.append(extractEventProperties(eventProperties)).append(" );");
try {
st.executeUpdate(statement.toString());
} catch (SQLException e) {
- throw new SpRuntimeException(e.getMessage());
+ e.getErrorCode();
+ if (e.getSQLState().equals("42P07")) {
+ throw new SpRuntimeException("Table already exists. Change option \"DROP TABLE\" to prevent this error. Error Message: " + e.getMessage());
+ } else {
+ throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+ }
}
}