You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/21 10:32:09 UTC
[incubator-streampipes-extensions] branch feature/postgis-sink
updated: added PostgresJBDC-Client and rechanges IoT-DB
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/feature/postgis-sink by this push:
new 4c629c1 added PostgresJBDC-Client and rechanges IoT-DB
4c629c1 is described below
commit 4c629c10fd0b9f86923a5e90ecf00ee04b65b090
Author: micklich <fl...@disy.net>
AuthorDate: Thu May 21 12:31:53 2020 +0200
added PostgresJBDC-Client and rechanges IoT-DB
---
.../sinks/databases/jvm/iotdb/IotDb.java | 4 +-
.../sinks/databases/jvm/jdbcclient/JdbcClient.java | 131 ++-------------------
.../sinks/databases/jvm/postgresql/PostgreSql.java | 3 +-
.../PostgresJdbcClient.java} | 46 ++++----
4 files changed, 36 insertions(+), 148 deletions(-)
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
index 4fefa22..177270a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
@@ -56,9 +56,7 @@ public class IotDb extends JdbcClient implements EventSink<IotDbParameters> {
".*",
"org.apache.iotdb.jdbc.IoTDBDriver",
"iotdb",
- LOG,
- "public",
- false);
+ LOG);
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index 50a4e8a..ff3f539 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -36,13 +36,10 @@ public class JdbcClient {
private String allowedRegEx;
protected String tableName;
- protected String schemaName;
protected String user;
protected String password;
protected boolean tableExists = false;
- protected boolean schemaExists = false;
- protected boolean isToDropTable = false;
protected Logger logger;
@@ -64,7 +61,7 @@ public class JdbcClient {
* If no matching type is found, it is interpreted as a String (VARCHAR(255))
*/
protected enum SqlAttribute {
- INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
+ INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"), BOOLEAN("BOOLEAN");
private final String sqlName;
SqlAttribute(String s) {
@@ -183,17 +180,13 @@ public class JdbcClient {
String allowedRegEx,
String driver,
String urlName,
- Logger logger,
- String schemaName,
- boolean isToDropTable) throws SpRuntimeException {
+ Logger logger) throws SpRuntimeException {
this.tableName = tableName;
this.user = user;
this.password = password;
this.allowedRegEx = allowedRegEx;
this.logger = logger;
this.eventProperties = eventProperties;
- this.schemaName = schemaName;
- this.isToDropTable = isToDropTable;
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
@@ -216,19 +209,9 @@ public class JdbcClient {
try {
c = DriverManager.getConnection(url, user, password);
ensureDatabaseExists(url, databaseName);
- ensureSchemaExists(url,databaseName);
ensureTableExists(url, databaseName);
} catch (SQLException e) {
- // host or port is wrong -- Class 08 Connection Exception
- if (e.getSQLState().substring(0, 2).equals("08")) {
- throw new SpRuntimeException("Connection can't be established. Check host or port setting: \n" + e.getMessage());
- }
- // username or password is wrong -- Class 28 Invalid Authorization Specification
- else if(e.getSQLState().substring(0, 2).equals("28")) {
- throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
- } else {
throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
- }
}
}
@@ -255,43 +238,6 @@ public class JdbcClient {
closeAll();
}
- /**
- * If this method returns successfully a schema with the name in {@link JdbcClient#schemaName} exists in the database
- * with the given database name exists on the server, specified by the url.
- *
- * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
- * @param databaseName The database in which the table should exist
- *
- * @throws SpRuntimeException If the table does not exist and could not be created
- */
- protected void ensureSchemaExists(String url, String databaseName) throws SpRuntimeException {
- try {
- // Database should exist by now so we can establish a connection
- c = DriverManager.getConnection(url + databaseName, user, password);
- st = c.createStatement();
- ResultSet rs = c.getMetaData().getSchemas();
-
- boolean isItExisting = false;
- while (rs.next()) {
- String schema = rs.getString("TABLE_SCHEM");
- if (schema.toLowerCase().equals(schemaName.toLowerCase())){
- isItExisting = true;
- }
- }
-
- if (!isItExisting) {
- createSchema(); }
-
- schemaExists = true;
- rs.close();
- } catch (SQLException e) {
- closeAll();
- throw new SpRuntimeException(e.getMessage());
- }
- }
-
-
-
/**
* If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
* with the given database name exists on the server, specified by the url.
@@ -306,18 +252,12 @@ public class JdbcClient {
c = DriverManager.getConnection(url + databaseName, user, password);
st = c.createStatement();
ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
- while (rs.next()) {
- // same table names can exists in different schmemas
- if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())){
- if (isToDropTable){
- createTable();
- }
- validateTable();
- } else {
- createTable();
- }
- }
- tableExists = true;
+ if (rs.next()) {
+ validateTable();
+ } else {
+ createTable();
+ }
+ tableExists = true;
rs.close();
} catch (SQLException e) {
closeAll();
@@ -357,13 +297,6 @@ public class JdbcClient {
if (event == null) {
throw new SpRuntimeException("event is null");
}
-
- if (!schemaExists) {
- // Creates the schema
- createSchema();
- schemaExists = true;
- }
-
if (!tableExists) {
// Creates the table
createTable();
@@ -443,8 +376,6 @@ public class JdbcClient {
StringBuilder statement1 = new StringBuilder("INSERT INTO ");
StringBuilder statement2 = new StringBuilder("VALUES ( ");
checkRegEx(tableName, "Tablename");
- checkRegEx(schemaName, "Tablename");
- statement1.append(schemaName).append(".");
statement1.append(tableName).append(" ( ");
// Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
@@ -491,25 +422,6 @@ public class JdbcClient {
return index;
}
- /**
- * Creates a schema with the name {@link JdbcClient#schemaName}
- *
- * @throws SpRuntimeException If the {@link JdbcClient#schemaName} is not allowed, if executeUpdate throws an SQLException
- */
- protected void createSchema() throws SpRuntimeException {
- checkConnected();
- checkRegEx(tableName, "Tablename");
-
- StringBuilder statement = new StringBuilder("CREATE SCHEMA ");
- statement.append(schemaName).append(";");
- try {
- st.executeUpdate(statement.toString());
- } catch (SQLException e) {
- throw new SpRuntimeException(e.getMessage());
- }
- }
-
-
/**
* Creates a table with the name {@link JdbcClient#tableName} and the
* properties {@link JdbcClient#eventProperties}. Calls
@@ -524,35 +436,14 @@ public class JdbcClient {
checkConnected();
checkRegEx(tableName, "Tablename");
- if (isToDropTable){
- StringBuilder statement = new StringBuilder("DROP TABLE IF EXISTS ");
- statement.append(schemaName);
- statement.append(".");
- statement.append(tableName);
- statement.append(";");
-
- try {
- st.executeUpdate(statement.toString());
- } catch (SQLException e) {
- throw new SpRuntimeException(e.getMessage());
- }
- }
-
- StringBuilder statement = new StringBuilder("CREATE TABLE ");
- statement.append(schemaName);
- statement.append(".");
- statement.append(tableName).append(" ( ");
+ StringBuilder statement = new StringBuilder("CREATE TABLE \"");
+ statement.append(tableName).append("\" ( ");
statement.append(extractEventProperties(eventProperties)).append(" );");
try {
st.executeUpdate(statement.toString());
} catch (SQLException e) {
- e.getErrorCode();
- if (e.getSQLState().equals("42P07")) {
- throw new SpRuntimeException("Table already exists. Change option \"DROP TABLE\" to prevent this error. Error Message: " + e.getMessage());
- } else {
- throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
- }
+ throw new SpRuntimeException(e.getMessage());
}
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 2f0b401..c8e52aa 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -21,11 +21,10 @@ package org.apache.streampipes.sinks.databases.jvm.postgresql;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
-public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParameters> {
+public class PostgreSql extends PostgresJdbcClient implements EventSink<PostgreSqlParameters> {
private static Logger LOG;
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
similarity index 93%
copy from streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
copy to streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index 50a4e8a..fccc723 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.sinks.databases.jvm.jdbcclient;
+package org.apache.streampipes.sinks.databases.jvm.postgresql;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.logging.api.Logger;
@@ -32,7 +32,7 @@ import java.util.List;
import java.util.Map;
-public class JdbcClient {
+public class PostgresJdbcClient {
private String allowedRegEx;
protected String tableName;
@@ -170,7 +170,7 @@ public class JdbcClient {
}
- public JdbcClient() {
+ public PostgresJdbcClient() {
}
protected void initializeJdbc(List<EventProperty> eventProperties,
@@ -205,8 +205,8 @@ public class JdbcClient {
/**
- * Connects to the HadoopFileSystem Server and initilizes {@link JdbcClient#c} and
- * {@link JdbcClient#st}
+ * Connects to the HadoopFileSystem Server and initilizes {@link PostgresJdbcClient#c} and
+ * {@link PostgresJdbcClient#st}
*
* @throws SpRuntimeException When the connection could not be established (because of a
* wrong identification, missing database etc.)
@@ -256,7 +256,7 @@ public class JdbcClient {
}
/**
- * If this method returns successfully a schema with the name in {@link JdbcClient#schemaName} exists in the database
+ * If this method returns successfully a schema with the name in {@link PostgresJdbcClient#schemaName} exists in the database
* with the given database name exists on the server, specified by the url.
*
* @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
@@ -293,7 +293,7 @@ public class JdbcClient {
/**
- * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
+ * If this method returns successfully a table with the name in {@link PostgresJdbcClient#tableName} exists in the database
* with the given database name exists on the server, specified by the url.
*
* @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
@@ -327,7 +327,7 @@ public class JdbcClient {
/**
* Clears, fills and executes the saved prepared statement {@code ps} with the data found in
- * event. To fill in the values it calls {@link JdbcClient#fillPreparedStatement(Map)}.
+ * event. To fill in the values it calls {@link PostgresJdbcClient#fillPreparedStatement(Map)}.
*
* @param event Data to be saved in the SQL table
* @throws SQLException When the statement cannot be executed
@@ -397,9 +397,9 @@ public class JdbcClient {
}
/**
- * Fills a prepared statement with the actual values base on {@link JdbcClient#parameters}. If
- * {@link JdbcClient#parameters} is empty or not complete (which should only happen once in the
- * begining), it calls {@link JdbcClient#generatePreparedStatement(Map)} to generate a new one.
+ * Fills a prepared statement with the actual values base on {@link PostgresJdbcClient#parameters}. If
+ * {@link PostgresJdbcClient#parameters} is empty or not complete (which should only happen once in the
+ * begining), it calls {@link PostgresJdbcClient#generatePreparedStatement(Map)} to generate a new one.
*
* @param event
* @param pre
@@ -427,7 +427,7 @@ public class JdbcClient {
}
/**
- * Initializes the variables {@link JdbcClient#parameters} and {@link JdbcClient#ps}
+ * Initializes the variables {@link PostgresJdbcClient#parameters} and {@link PostgresJdbcClient#ps}
* according to the parameter event.
*
* @param event The event which is getting analyzed
@@ -492,9 +492,9 @@ public class JdbcClient {
}
/**
- * Creates a schema with the name {@link JdbcClient#schemaName}
+ * Creates a schema with the name {@link PostgresJdbcClient#schemaName}
*
- * @throws SpRuntimeException If the {@link JdbcClient#schemaName} is not allowed, if executeUpdate throws an SQLException
+ * @throws SpRuntimeException If the {@link PostgresJdbcClient#schemaName} is not allowed, if executeUpdate throws an SQLException
*/
protected void createSchema() throws SpRuntimeException {
checkConnected();
@@ -511,13 +511,13 @@ public class JdbcClient {
/**
- * Creates a table with the name {@link JdbcClient#tableName} and the
- * properties {@link JdbcClient#eventProperties}. Calls
- * {@link JdbcClient#extractEventProperties(List)} internally with the
- * {@link JdbcClient#eventProperties} to extract all possible columns.
+ * Creates a table with the name {@link PostgresJdbcClient#tableName} and the
+ * properties {@link PostgresJdbcClient#eventProperties}. Calls
+ * {@link PostgresJdbcClient#extractEventProperties(List)} internally with the
+ * {@link PostgresJdbcClient#eventProperties} to extract all possible columns.
*
- * @throws SpRuntimeException If the {@link JdbcClient#tableName} is not allowed, if
- * executeUpdate throws an SQLException or if {@link JdbcClient#extractEventProperties(List)}
+ * @throws SpRuntimeException If the {@link PostgresJdbcClient#tableName} is not allowed, if
+ * executeUpdate throws an SQLException or if {@link PostgresJdbcClient#extractEventProperties(List)}
* throws an exception
*/
protected void createTable() throws SpRuntimeException {
@@ -558,11 +558,11 @@ public class JdbcClient {
/**
* Creates a SQL-Query with the given Properties (SQL-Injection safe). Calls
- * {@link JdbcClient#extractEventProperties(List, String)} with an empty string
+ * {@link PostgresJdbcClient#extractEventProperties(List, String)} with an empty string
*
* @param properties The list of properties which should be included in the query
* @return A StringBuilder with the query which needs to be executed in order to create the table
- * @throws SpRuntimeException See {@link JdbcClient#extractEventProperties(List, String)} for details
+ * @throws SpRuntimeException See {@link PostgresJdbcClient#extractEventProperties(List, String)} for details
*/
private StringBuilder extractEventProperties(List<EventProperty> properties)
throws SpRuntimeException {
@@ -621,7 +621,7 @@ public class JdbcClient {
*
* @param input String which is getting matched with the regEx
* @param regExIdentifier Information about the use of the input. Gets included in the exception message
- * @throws SpRuntimeException If {@code input} does not match with {@link JdbcClient#allowedRegEx}
+ * @throws SpRuntimeException If {@code input} does not match with {@link PostgresJdbcClient#allowedRegEx}
* or if the length of {@code input} is 0
*/
protected final void checkRegEx(String input, String regExIdentifier) throws SpRuntimeException {