You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/06 15:53:19 UTC
[incubator-streampipes-extensions] 01/01: jdbc client for extended,
added Postgresql sink and PostgresjdbcClient
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/jdbc_rewrite
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 9e279c78b88daa908704cd62f6c5d913635c15e5
Author: micklich <fl...@disy.net>
AuthorDate: Sat Jun 6 17:52:56 2020 +0200
jdbc client for extended, added Postgresql sink and PostgresjdbcClient
---
notes.txt | 117 +++
.../sinks/databases/jvm/iotdb/IotDb.java | 18 +-
.../sinks/databases/jvm/jdbcclient/JdbcClient.java | 929 +++++++++------------
.../databases/jvm/jdbcclient/Parameterinfo.java | 33 +
.../databases/jvm/jdbcclient/SqlAttribute.java | 124 +++
.../sinks/databases/jvm/mysql/Mysql.java | 8 +-
.../sinks/databases/jvm/postgresql/PostgreSql.java | 7 +-
.../jvm/postgresql/PostgreSqlController.java | 24 +-
.../jvm/postgresql/PostgreSqlParameters.java | 10 +-
.../jvm/postgresql/PostgresJdbcClient.java | 367 ++++++++
10 files changed, 1100 insertions(+), 537 deletions(-)
diff --git a/notes.txt b/notes.txt
new file mode 100644
index 0000000..8d29bc5
--- /dev/null
+++ b/notes.txt
@@ -0,0 +1,117 @@
+ //========== Testing
+
+ Double lat_ka = 49.00689;
+ Double lng_ka = 8.403653;
+
+ Double lat_NY = 40.730610;
+ Double lng_NY = -73.935242;
+
+ SpLengthCalculator tester1 = new SpLengthCalculator(10);
+
+ tester1.calcGeodesicDistance(lat_ka, lng_ka, lat_NY, lng_NY);
+ tester1.convertUnit(SpLengthCalculator.ValidLengthUnits.KM);
+
+
+
+ double result_ka_ny = tester1.getLengthValue();
+
+ Double lat_ra = 48.8591174;
+ Double lng_ra = 8.2059096;
+
+ SpLengthCalculator tester2 = new SpLengthCalculator(10);
+
+ tester2.calcGeodesicDistance(lat_ka, lng_ka, lat_ra, lng_ra);
+ tester2.convertUnit(SpLengthCalculator.ValidLengthUnits.KM);
+
+ double result_ka_ra = tester2.getLengthValue();
+
+
+
+
+
+
+
+
+// private boolean createInfoGeofenceTable(String url, String databaseName) throws SpRuntimeException {
+// boolean returnValue = false;
+//
+// try {
+// // Database should exist by now so we can establish a connection
+// c = DriverManager.getConnection(url + databaseName, user, password);
+// st = c.createStatement();
+// ResultSet rs = c.getMetaData().getTables(null, null, geofenceInfoTable, null);
+// while (rs.next()) {
+// // same table names can exists in different schmemas
+// if (!rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+//
+// StringBuilder statement = new StringBuilder("CREATE TABLE ");
+// statement.append(schemaName);
+// statement.append(".");
+// statement.append(geofenceInfoTable).append(" ( ");
+// statement.append("name TEXT NOT NULL UNIQUE,");
+// statement.append("geom GEOMETRY,");
+// statement.append("FOREIGN KEY (name) REFERENCES )");
+// statement.append(schema + "." + geofenceMainTable);
+// statement.append("(name) ON UPDATE CASCADE ON DELETE CASCADE");
+// statement.append(" );");
+//
+// try {
+// st.executeUpdate(statement.toString());
+// returnValue = true;
+// } catch (SQLException e) {
+// e.getErrorCode();
+// if (e.getSQLState().equals("42P07")) {
+// throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+// }
+// }
+// }
+// }
+// } catch (SQLException e) {
+// closeAll();
+// throw new SpRuntimeException(e.getMessage());
+// }
+// return returnValue;
+// }
+
+
+// @Override
+// protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
+// try {
+// // Database should exist by now so we can establish a connection
+// c = DriverManager.getConnection(url + databaseName, user, password);
+// st = c.createStatement();
+// ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+// while (rs.next()) {
+// // same table names can exists in different schmemas
+// if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+// if (isToDropTable) {
+// createTable();
+// }
+// validateTable();
+// } else {
+// createTable();
+// }
+// }
+// tableExists = true;
+// rs.close();
+// } catch (SQLException e) {
+// closeAll();
+// throw new SpRuntimeException(e.getMessage());
+// }
+// }
+
+
+
+
+
+
+
+INSERT INTO geofence.geofences (created_at, geofencename) VALUES ('2020-06-01T11:09:09.456' , 'blubber');
+
+
+CREATE TABLE geofence.geofences ( id SERIAL PRIMARY KEY,created_at TIMESTAMP,updated_at TIMESTAMP,geofencename TEXT NOT NULL UNIQUEgeom GEOMETRY);
+
+
+UPDATE geofence.geofences SET updated_at = 2020-06-01T11:46:14.038,geom = ST_GeomFromText('POINT (11.7689 -21.3234)' ,4326)WHERE name = blubber;
+
+UPDATE geofence.geofences SET updated_at = '2020-06-01T11:58:21.276',geom = ST_GeomFromText('POINT (39.3197 15.4693)' ,4326) WHERE geofencename = blubber;
\ No newline at end of file
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
index 177270a..a21e672 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
@@ -24,6 +24,8 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.Parameterinfo;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.SqlAttribute;
import org.apache.streampipes.vocabulary.XSD;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -110,27 +112,15 @@ public class IotDb extends JdbcClient implements EventSink<IotDbParameters> {
}
}
- @Override
- protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
- checkRegEx(tableName, "Storage Group name");
- try {
- Statement statement = c.createStatement();
- statement.execute("SET STORAGE GROUP TO " + tableName);
- } catch (SQLException e) {
- // Storage group already exists
- //TODO: Catch other exceptions
- }
- }
+
/**
* Needs to be reimplemented since the IoTDB JDBC implementation does not support the methods used in the
* JDBC-Client class
- *
- * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
* @throws SpRuntimeException
*/
@Override
- protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
+ protected void ensureTableExists() throws SpRuntimeException {
int index = 1;
parameters.put("timestamp", new Parameterinfo(index++, SqlAttribute.LONG));
for (EventProperty eventProperty : eventProperties) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index 71a357f..ee67bac 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -33,543 +33,446 @@ import java.util.Map;
public class JdbcClient {
- private String allowedRegEx;
-
- protected String tableName;
- protected String user;
- protected String password;
-
- protected boolean tableExists = false;
-
- protected Logger logger;
-
- protected Connection c = null;
- protected Statement st = null;
- protected PreparedStatement ps = null;
-
- /**
- * The list of properties extracted from the graph
- */
- protected List<EventProperty> eventProperties;
- /**
- * The parameters in the prepared statement {@code ps} together with their index and data type
- */
- protected HashMap<String, Parameterinfo> parameters = new HashMap<>();
-
- /**
- * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
- * If no matching type is found, it is interpreted as a String (VARCHAR(255))
- */
- protected enum SqlAttribute {
- INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
- BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
- private final String sqlName;
-
- SqlAttribute(String s) {
- sqlName = s;
- }
-
- /**
- * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
- * interpreted as a String (VARCHAR(255))
- *
- * @param o The object which should be identified
- * @return An {@link SqlAttribute} of the identified type
- */
- public static SqlAttribute getFromObject(final Object o) {
- SqlAttribute r;
- if (o instanceof Integer) {
- r = SqlAttribute.INTEGER;
- } else if (o instanceof Long) {
- r = SqlAttribute.LONG;
- } else if (o instanceof Float) {
- r = SqlAttribute.FLOAT;
- } else if (o instanceof Double) {
- r = SqlAttribute.DOUBLE;
- } else if (o instanceof Boolean) {
- r = SqlAttribute.BOOLEAN;
- } else {
- r = SqlAttribute.STRING;
- }
- return r;
- }
-
- public static SqlAttribute getFromUri(final String s) {
- SqlAttribute r;
- if (s.equals(XSD._integer.toString())) {
- r = SqlAttribute.INTEGER;
- } else if (s.equals(XSD._long.toString())) {
- r = SqlAttribute.LONG;
- } else if (s.equals(XSD._float.toString())) {
- r = SqlAttribute.FLOAT;
- } else if (s.equals(XSD._double.toString())) {
- r = SqlAttribute.DOUBLE;
- } else if (s.equals(XSD._boolean.toString())) {
- r = SqlAttribute.BOOLEAN;
- } else {
- r = SqlAttribute.STRING;
- }
- return r;
- }
-
- /**
- * Sets the value in the prepardStatement {@code ps}
- *
- * @param p The needed info about the parameter (index and type)
- * @param value The value of the object, which should be filled in the
- * @param ps The prepared statement, which will be filled
- * @throws SpRuntimeException When the data type in {@code p} is unknown
- * @throws SQLException When the setters of the statement throw an
- * exception (e.g. {@code setInt()})
- */
- public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
- throws SQLException, SpRuntimeException {
- switch (p.type) {
- case INTEGER:
- ps.setInt(p.index, (Integer) value);
- break;
- case LONG:
- ps.setLong(p.index, (Long) value);
- break;
- case FLOAT:
- ps.setFloat(p.index, (Float) value);
- break;
- case DOUBLE:
- ps.setDouble(p.index, (Double) value);
- break;
- case BOOLEAN:
- ps.setBoolean(p.index, (Boolean) value);
- break;
- case STRING:
- ps.setString(p.index, value.toString());
- break;
- default:
- throw new SpRuntimeException("Unknown SQL datatype");
- }
- }
-
- @Override
- public String toString() {
- return sqlName;
- }
+ protected String allowedRegEx;
+ protected String url;
+
+ protected String databaseName;
+ protected String tableName;
+ protected String user;
+ protected String password;
+
+ protected boolean tableExists = false;
+
+ protected Logger logger;
+
+ protected Connection c = null;
+ protected Statement st = null;
+ protected PreparedStatement ps = null;
+
+ /**
+ * The list of properties extracted from the graph
+ */
+ protected List<EventProperty> eventProperties;
+ /**
+ * The parameters in the prepared statement {@code ps} together with their index and data type
+ */
+ protected HashMap<String, Parameterinfo> parameters = new HashMap<>();
+
+ /**
+ * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
+ * If no matching type is found, it is interpreted as a String (VARCHAR(255))
+ */
+
+
+ public JdbcClient() {
+ }
+
+ protected void initializeJdbc(List<EventProperty> eventProperties,
+ String host,
+ Integer port,
+ String databaseName,
+ String tableName,
+ String user,
+ String password,
+ String allowedRegEx,
+ String driver,
+ String urlName,
+ Logger logger) throws SpRuntimeException {
+
+ this.eventProperties = eventProperties;
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ this.user = user;
+ this.password = password;
+ this.allowedRegEx = allowedRegEx;
+ this.logger = logger;
+
+ this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
+ try {
+ Class.forName(driver);
+ } catch (ClassNotFoundException e) {
+ throw new SpRuntimeException("Driver '" + driver + "' not found.");
}
- /**
- * Contains all information needed to "fill" a prepared statement (index and the data type)
- */
- protected static class Parameterinfo {
- private int index;
- private SqlAttribute type;
-
- public Parameterinfo(final int index, final SqlAttribute type) {
- this.index = index;
- this.type = type;
- }
+ connect();
+ ensureDatabaseExists();
+ ensureTableExists();
+ }
+
+ /**
+ * Connects to the HadoopFileSystem Server and initilizes {@link JdbcClient#c} and
+ * {@link JdbcClient#st}
+ *
+ * @throws SpRuntimeException When the connection could not be established (because of a
+ * wrong identification, missing database etc.)
+ */
+ protected void connect() throws SpRuntimeException {
+ try {
+ c = DriverManager.getConnection(url, user, password);
+ } catch (SQLException e) {
+ // host or port is wrong -- Class 08 Connection Exception
+ if (e.getSQLState().substring(0, 2).equals("08")) {
+ throw new SpRuntimeException("Connection can't be established. Check host or port setting: \n" + e.getMessage());
+ }
+ // username or password is wrong -- Class 28 Invalid Authorization Specification
+ else if (e.getSQLState().substring(0, 2).equals("28")) {
+ throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
+ } else {
+ throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
+ }
}
-
-
- public JdbcClient() {
+ }
+
+ /**
+ * If this method returns successfully a database with the given name exists on the server, specified by the url.
+ *
+ * @throws SpRuntimeException If the database does not exists and could not be created
+ */
+ protected void ensureDatabaseExists() throws SpRuntimeException {
+ checkRegEx(databaseName, "databasename");
+
+ try {
+ // Checks whether the database already exists (using catalogs has not worked with postgres)
+ st = c.createStatement();
+ st.executeUpdate("CREATE DATABASE " + databaseName + ";");
+ logger.info("Created new database '" + databaseName + "'");
+ } catch (SQLException e1) {
+ if (!e1.getSQLState().substring(0, 2).equals("42")) {
+ throw new SpRuntimeException("Error while creating database: " + e1.getMessage());
+ }
}
-
- protected void initializeJdbc(List<EventProperty> eventProperties,
- String host,
- Integer port,
- String databaseName,
- String tableName,
- String user,
- String password,
- String allowedRegEx,
- String driver,
- String urlName,
- Logger logger) throws SpRuntimeException {
- this.tableName = tableName;
- this.user = user;
- this.password = password;
- this.allowedRegEx = allowedRegEx;
- this.logger = logger;
- this.eventProperties = eventProperties;
- try {
- Class.forName(driver);
- } catch (ClassNotFoundException e) {
- throw new SpRuntimeException("Driver '" + driver + "' not found.");
- }
-
- connect(host, port, urlName, databaseName);
+ closeAll();
+ }
+
+ /**
+ * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
+ * with the given database name exists on the server, specified by the url.
+ *
+ * @throws SpRuntimeException If the table does not exist and could not be created
+ */
+ protected void ensureTableExists() throws SpRuntimeException {
+ try {
+ // Database should exist by now so we can establish a connection
+ c = DriverManager.getConnection(url + databaseName, user, password);
+ st = c.createStatement();
+ ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+ if (rs.next()) {
+ validateTable();
+ } else {
+ createTable();
+ }
+ tableExists = true;
+ rs.close();
+ } catch (SQLException e) {
+ closeAll();
+ throw new SpRuntimeException(e.getMessage());
}
-
-
- /**
- * Connects to the HadoopFileSystem Server and initilizes {@link JdbcClient#c} and
- * {@link JdbcClient#st}
- *
- * @throws SpRuntimeException When the connection could not be established (because of a
- * wrong identification, missing database etc.)
- */
- private void connect(String host, int port, String urlName, String databaseName) throws SpRuntimeException {
- String url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
- try {
- c = DriverManager.getConnection(url, user, password);
- ensureDatabaseExists(url, databaseName);
- ensureTableExists(url, databaseName);
- } catch (SQLException e) {
- throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
- }
+ }
+
+ /**
+ * Clears, fills and executes the saved prepared statement {@code ps} with the data found in
+ * event. To fill in the values it calls {@link JdbcClient#fillPreparedStatement(Map)}.
+ *
+ * @param event Data to be saved in the SQL table
+ * @throws SQLException When the statement cannot be executed
+ * @throws SpRuntimeException When the table name is not allowed or it is thrown
+ * by {@link SqlAttribute#setValue(Parameterinfo, Object, PreparedStatement)}
+ */
+ protected void executePreparedStatement(final Map<String, Object> event)
+ throws SQLException, SpRuntimeException {
+ checkConnected();
+ if (ps != null) {
+ ps.clearParameters();
}
-
- /**
- * If this method returns successfully a database with the given name exists on the server, specified by the url.
- *
- * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
- * @param databaseName The name of the database that should exist
- * @throws SpRuntimeException If the database does not exists and could not be created
- */
- protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
- checkRegEx(databaseName, "databasename");
-
- try {
- // Checks whether the database already exists (using catalogs has not worked with postgres)
- st = c.createStatement();
- st.executeUpdate("CREATE DATABASE " + databaseName + ";");
- logger.info("Created new database '" + databaseName + "'");
- } catch (SQLException e1) {
- if (!e1.getSQLState().substring(0, 2).equals("42")) {
- throw new SpRuntimeException("Error while creating database: " + e1.getMessage());
- }
- }
- closeAll();
- }
-
- /**
- * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
- * with the given database name exists on the server, specified by the url.
- *
- * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
- * @param databaseName The database in which the table should exist
- * @throws SpRuntimeException If the table does not exist and could not be created
- */
- protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
- try {
- // Database should exist by now so we can establish a connection
- c = DriverManager.getConnection(url + databaseName, user, password);
- st = c.createStatement();
- ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
- if (rs.next()) {
- validateTable();
- } else {
- createTable();
- }
- tableExists = true;
- rs.close();
- } catch (SQLException e) {
- closeAll();
- throw new SpRuntimeException(e.getMessage());
- }
- }
-
- /**
- * Clears, fills and executes the saved prepared statement {@code ps} with the data found in
- * event. To fill in the values it calls {@link JdbcClient#fillPreparedStatement(Map)}.
- *
- * @param event Data to be saved in the SQL table
- * @throws SQLException When the statement cannot be executed
- * @throws SpRuntimeException When the table name is not allowed or it is thrown
- * by {@link SqlAttribute#setValue(Parameterinfo, Object, PreparedStatement)}
- */
- private void executePreparedStatement(final Map<String, Object> event)
- throws SQLException, SpRuntimeException {
- checkConnected();
- if (ps != null) {
- ps.clearParameters();
- }
- fillPreparedStatement(event);
- ps.executeUpdate();
+ fillPreparedStatement(event);
+ ps.executeUpdate();
+ }
+
+ /**
+ * Prepares a statement for the insertion of values or the
+ *
+ * @param event The event which should be saved to the Postgres table
+ * @throws SpRuntimeException When there was an error in the saving process
+ */
+ protected void save(final Event event) throws SpRuntimeException {
+ //TODO: Add batch support (https://stackoverflow.com/questions/3784197/efficient-way-to-do-batch-inserts-with-jdbc)
+ checkConnected();
+ Map<String, Object> eventMap = event.getRaw();
+ if (event == null) {
+ throw new SpRuntimeException("event is null");
}
-
- /**
- * Prepares a statement for the insertion of values or the
- *
- * @param event The event which should be saved to the Postgres table
- * @throws SpRuntimeException When there was an error in the saving process
- */
- protected void save(final Event event) throws SpRuntimeException {
- //TODO: Add batch support (https://stackoverflow.com/questions/3784197/efficient-way-to-do-batch-inserts-with-jdbc)
- checkConnected();
- Map<String, Object> eventMap = event.getRaw();
- if (event == null) {
- throw new SpRuntimeException("event is null");
- }
- if (!tableExists) {
- // Creates the table
- createTable();
- tableExists = true;
- }
- try {
- executePreparedStatement(eventMap);
- } catch (SQLException e) {
- if (e.getSQLState().substring(0, 2).equals("42")) {
- // If the table does not exists (because it got deleted or something, will cause the error
- // code "42") we will try to create a new one. Otherwise we do not handle the exception.
- logger.warn("Table '" + tableName + "' was unexpectedly not found and gets recreated.");
- tableExists = false;
- createTable();
- tableExists = true;
-
- try {
- executePreparedStatement(eventMap);
- } catch (SQLException e1) {
- throw new SpRuntimeException(e1.getMessage());
- }
- } else {
- throw new SpRuntimeException(e.getMessage());
- }
- }
- }
-
- private void fillPreparedStatement(final Map<String, Object> event)
- throws SQLException, SpRuntimeException {
- fillPreparedStatement(event, "");
+ if (!tableExists) {
+ // Creates the table
+ createTable();
+ tableExists = true;
}
+ try {
+ executePreparedStatement(eventMap);
+ } catch (SQLException e) {
+ if (e.getSQLState().substring(0, 2).equals("42")) {
+ // If the table does not exists (because it got deleted or something, will cause the error
+ // code "42") we will try to create a new one. Otherwise we do not handle the exception.
+ logger.warn("Table '" + tableName + "' was unexpectedly not found and gets recreated.");
+ tableExists = false;
+ createTable();
+ tableExists = true;
- /**
- * Fills a prepared statement with the actual values base on {@link JdbcClient#parameters}. If
- * {@link JdbcClient#parameters} is empty or not complete (which should only happen once in the
- * begining), it calls {@link JdbcClient#generatePreparedStatement(Map)} to generate a new one.
- *
- * @param event
- * @param pre
- * @throws SQLException
- * @throws SpRuntimeException
- */
- private void fillPreparedStatement(final Map<String, Object> event, String pre)
- throws SQLException, SpRuntimeException {
- // checkConnected();
- //TODO: Possible error: when the event does not contain all objects of the parameter list
- for (Map.Entry<String, Object> pair : event.entrySet()) {
- String newKey = pre + pair.getKey();
- if (pair.getValue() instanceof Map) {
- // recursively extracts nested values
- fillPreparedStatement((Map<String, Object>) pair.getValue(), newKey + "_");
- } else {
- if (!parameters.containsKey(newKey)) {
- //TODO: start the for loop all over again
- generatePreparedStatement(event);
- }
- Parameterinfo p = parameters.get(newKey);
- SqlAttribute.setValue(p, pair.getValue(), ps);
- }
+ try {
+ executePreparedStatement(eventMap);
+ } catch (SQLException e1) {
+ throw new SpRuntimeException(e1.getMessage());
}
+ } else {
+ throw new SpRuntimeException(e.getMessage());
+ }
}
-
- /**
- * Initializes the variables {@link JdbcClient#parameters} and {@link JdbcClient#ps}
- * according to the parameter event.
- *
- * @param event The event which is getting analyzed
- * @throws SpRuntimeException When the tablename is not allowed
- * @throws SQLException When the prepareStatment cannot be evaluated
- */
- private void generatePreparedStatement(final Map<String, Object> event)
- throws SQLException, SpRuntimeException {
- // input: event
- // wanted: INSERT INTO test4321 ( randomString, randomValue ) VALUES ( ?,? );
- checkConnected();
- parameters.clear();
- StringBuilder statement1 = new StringBuilder("INSERT INTO ");
- StringBuilder statement2 = new StringBuilder("VALUES ( ");
- checkRegEx(tableName, "Tablename");
- statement1.append(tableName).append(" ( ");
-
- // Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
- extendPreparedStatement(event, statement1, statement2, 1);
-
- statement1.append(" ) ");
- statement2.append(" );");
- String finalStatement = statement1.append(statement2).toString();
- ps = c.prepareStatement(finalStatement);
+ }
+
+ protected void fillPreparedStatement(final Map<String, Object> event)
+ throws SQLException, SpRuntimeException {
+ fillPreparedStatement(event, "");
+ }
+
+ /**
+ * Fills a prepared statement with the actual values base on {@link JdbcClient#parameters}. If
+ * {@link JdbcClient#parameters} is empty or not complete (which should only happen once in the
+ * begining), it calls {@link JdbcClient#generatePreparedStatement(Map)} to generate a new one.
+ *
+ * @param event
+ * @param pre
+ * @throws SQLException
+ * @throws SpRuntimeException
+ */
+ protected void fillPreparedStatement(final Map<String, Object> event, String pre)
+ throws SQLException, SpRuntimeException {
+ // checkConnected();
+ //TODO: Possible error: when the event does not contain all objects of the parameter list
+ for (Map.Entry<String, Object> pair : event.entrySet()) {
+ String newKey = pre + pair.getKey();
+ if (pair.getValue() instanceof Map) {
+ // recursively extracts nested values
+ fillPreparedStatement((Map<String, Object>) pair.getValue(), newKey + "_");
+ } else {
+ if (!parameters.containsKey(newKey)) {
+ //TODO: start the for loop all over again
+ generatePreparedStatement(event);
+ }
+ Parameterinfo p = parameters.get(newKey);
+ SqlAttribute.setValue(p, pair.getValue(), ps);
+ }
}
-
- private int extendPreparedStatement(final Map<String, Object> event,
+ }
+
+ /**
+ * Initializes the variables {@link JdbcClient#parameters} and {@link JdbcClient#ps}
+ * according to the parameter event.
+ *
+ * @param event The event which is getting analyzed
+ * @throws SpRuntimeException When the tablename is not allowed
+ * @throws SQLException When the prepareStatment cannot be evaluated
+ */
+ protected void generatePreparedStatement(final Map<String, Object> event)
+ throws SQLException, SpRuntimeException {
+ // input: event
+ // wanted: INSERT INTO test4321 ( randomString, randomValue ) VALUES ( ?,? );
+ checkConnected();
+ parameters.clear();
+ StringBuilder statement1 = new StringBuilder("INSERT INTO ");
+ StringBuilder statement2 = new StringBuilder("VALUES ( ");
+ checkRegEx(tableName, "Tablename");
+ statement1.append(tableName).append(" ( ");
+
+ // Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
+ extendPreparedStatement(event, statement1, statement2, 1);
+
+ statement1.append(" ) ");
+ statement2.append(" );");
+ String finalStatement = statement1.append(statement2).toString();
+ ps = c.prepareStatement(finalStatement);
+ }
+
+ protected int extendPreparedStatement(final Map<String, Object> event,
StringBuilder s1, StringBuilder s2, int index) throws SpRuntimeException {
- return extendPreparedStatement(event, s1, s2, index, "", "");
- }
-
- /**
- * @param event
- * @param s1
- * @param s2
- * @param index
- * @param preProperty
- * @param pre
- * @return
- * @throws SpRuntimeException
- */
- private int extendPreparedStatement(final Map<String, Object> event,
+ return extendPreparedStatement(event, s1, s2, index, "", "");
+ }
+
+ /**
+ * @param event
+ * @param s1
+ * @param s2
+ * @param index
+ * @param preProperty
+ * @param pre
+ * @return
+ * @throws SpRuntimeException
+ */
+ protected int extendPreparedStatement(final Map<String, Object> event,
StringBuilder s1, StringBuilder s2, int index, String preProperty, String pre)
- throws SpRuntimeException {
- checkConnected();
- for (Map.Entry<String, Object> pair : event.entrySet()) {
- if (pair.getValue() instanceof Map) {
- index = extendPreparedStatement((Map<String, Object>) pair.getValue(), s1, s2, index,
- pair.getKey() + "_", pre);
- } else {
- checkRegEx(pair.getKey(), "Columnname");
- parameters.put(pair.getKey(), new Parameterinfo(index, SqlAttribute.getFromObject(pair.getValue())));
- s1.append(pre).append("\"").append(preProperty).append(pair.getKey()).append("\"");
- s2.append(pre).append("?");
- index++;
- }
- pre = ", ";
- }
- return index;
+ throws SpRuntimeException {
+ checkConnected();
+ for (Map.Entry<String, Object> pair : event.entrySet()) {
+ if (pair.getValue() instanceof Map) {
+ index = extendPreparedStatement((Map<String, Object>) pair.getValue(), s1, s2, index,
+ pair.getKey() + "_", pre);
+ } else {
+ checkRegEx(pair.getKey(), "Columnname");
+ parameters.put(pair.getKey(), new Parameterinfo(index, SqlAttribute.getFromObject(pair.getValue())));
+ s1.append(pre).append("\"").append(preProperty).append(pair.getKey()).append("\"");
+ s2.append(pre).append("?");
+ index++;
+ }
+ pre = ", ";
}
-
- /**
- * Creates a table with the name {@link JdbcClient#tableName} and the
- * properties {@link JdbcClient#eventProperties}. Calls
- * {@link JdbcClient#extractEventProperties(List)} internally with the
- * {@link JdbcClient#eventProperties} to extract all possible columns.
- *
- * @throws SpRuntimeException If the {@link JdbcClient#tableName} is not allowed, if
- * executeUpdate throws an SQLException or if {@link JdbcClient#extractEventProperties(List)}
- * throws an exception
- */
- protected void createTable() throws SpRuntimeException {
- checkConnected();
- checkRegEx(tableName, "Tablename");
-
- StringBuilder statement = new StringBuilder("CREATE TABLE \"");
- statement.append(tableName).append("\" ( ");
- statement.append(extractEventProperties(eventProperties)).append(" );");
-
- try {
- st.executeUpdate(statement.toString());
- } catch (SQLException e) {
- throw new SpRuntimeException(e.getMessage());
+ return index;
+ }
+
+ /**
+ * Creates a table with the name {@link JdbcClient#tableName} and the
+ * properties {@link JdbcClient#eventProperties}. Calls
+ * {@link JdbcClient#extractEventProperties(List)} internally with the
+ * {@link JdbcClient#eventProperties} to extract all possible columns.
+ *
+ * @throws SpRuntimeException If the {@link JdbcClient#tableName} is not allowed, if
+ * executeUpdate throws an SQLException or if {@link JdbcClient#extractEventProperties(List)}
+ * throws an exception
+ */
+ protected void createTable() throws SpRuntimeException {
+ checkConnected();
+ checkRegEx(tableName, "Tablename");
+
+ StringBuilder statement = new StringBuilder("CREATE TABLE \"");
+ statement.append(tableName).append("\" ( ");
+ statement.append(extractEventProperties(eventProperties)).append(" );");
+
+ try {
+ st.executeUpdate(statement.toString());
+ } catch (SQLException e) {
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+ /**
+ * Creates a SQL-Query with the given Properties (SQL-Injection safe). Calls
+ * {@link JdbcClient#extractEventProperties(List, String)} with an empty string
+ *
+ * @param properties The list of properties which should be included in the query
+ * @return A StringBuilder with the query which needs to be executed in order to create the table
+ * @throws SpRuntimeException See {@link JdbcClient#extractEventProperties(List, String)} for details
+ */
+ protected StringBuilder extractEventProperties(List<EventProperty> properties)
+ throws SpRuntimeException {
+ return extractEventProperties(properties, "");
+ }
+
+ /**
+ * Creates a SQL-Query with the given Properties (SQL-Injection safe). For nested properties it
+ * recursively extracts the information. EventPropertyList are getting converted to a string (so
+ * in SQL to a VARCHAR(255)). For each type it uses {@link SqlAttribute#getFromUri(String)}
+ * internally to identify the SQL-type from the runtimeType.
+ *
+ * @param properties The list of properties which should be included in the query
+ * @param preProperty A string which gets prepended to all property runtimeNames
+ * @return A StringBuilder with the query which needs to be executed in order to create the table
+ * @throws SpRuntimeException If the runtimeName of any property is not allowed
+ */
+ protected StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+ throws SpRuntimeException {
+ // output: "randomString VARCHAR(255), randomValue INT"
+ StringBuilder s = new StringBuilder();
+ String pre = "";
+ for (EventProperty property : properties) {
+ // Protection against SqlInjection
+
+ checkRegEx(property.getRuntimeName(), "Column name");
+ if (property instanceof EventPropertyNested) {
+ // if it is a nested property, recursively extract the needed properties
+ StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
+ preProperty + property.getRuntimeName() + "_");
+ if (tmp.length() > 0) {
+ s.append(pre).append(tmp);
+ }
+ } else {
+ // Adding the name of the property (e.g. "randomString")
+ // Or for properties in a nested structure: input1_randomValue
+ // "pre" is there for the ", " part
+ s.append(pre).append("\"").append(preProperty).append(property.getRuntimeName()).append("\" ");
+
+ // adding the type of the property (e.g. "VARCHAR(255)")
+ if (property instanceof EventPropertyPrimitive) {
+ s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+ } else {
+ // Must be an EventPropertyList then
+ s.append(SqlAttribute.getFromUri(XSD._string.toString()));
}
+ }
+ pre = ", ";
}
- /**
- * Creates a SQL-Query with the given Properties (SQL-Injection safe). Calls
- * {@link JdbcClient#extractEventProperties(List, String)} with an empty string
- *
- * @param properties The list of properties which should be included in the query
- * @return A StringBuilder with the query which needs to be executed in order to create the table
- * @throws SpRuntimeException See {@link JdbcClient#extractEventProperties(List, String)} for details
- */
- private StringBuilder extractEventProperties(List<EventProperty> properties)
- throws SpRuntimeException {
- return extractEventProperties(properties, "");
+ return s;
+ }
+
+ /**
+ * Checks if the input string is allowed (regEx match and length > 0)
+ *
+ * @param input String which is getting matched with the regEx
+ * @param regExIdentifier Information about the use of the input. Gets included in the exception message
+ * @throws SpRuntimeException If {@code input} does not match with {@link JdbcClient#allowedRegEx}
+ * or if the length of {@code input} is 0
+ */
+ protected final void checkRegEx(String input, String regExIdentifier) throws SpRuntimeException {
+ if (!input.matches(allowedRegEx) || input.length() == 0) {
+ throw new SpRuntimeException(regExIdentifier + " '" + input
+ + "' not allowed (allowed: '" + allowedRegEx + "') with a min length of 1");
}
+ }
- /**
- * Creates a SQL-Query with the given Properties (SQL-Injection safe). For nested properties it
- * recursively extracts the information. EventPropertyList are getting converted to a string (so
- * in SQL to a VARCHAR(255)). For each type it uses {@link SqlAttribute#getFromUri(String)}
- * internally to identify the SQL-type from the runtimeType.
- *
- * @param properties The list of properties which should be included in the query
- * @param preProperty A string which gets prepended to all property runtimeNames
- * @return A StringBuilder with the query which needs to be executed in order to create the table
- * @throws SpRuntimeException If the runtimeName of any property is not allowed
- */
- private StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
- throws SpRuntimeException {
- // output: "randomString VARCHAR(255), randomValue INT"
- StringBuilder s = new StringBuilder();
- String pre = "";
- for (EventProperty property : properties) {
- // Protection against SqlInjection
-
- checkRegEx(property.getRuntimeName(), "Column name");
- if (property instanceof EventPropertyNested) {
- // if it is a nested property, recursively extract the needed properties
- StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
- preProperty + property.getRuntimeName() + "_");
- if (tmp.length() > 0) {
- s.append(pre).append(tmp);
- }
- } else {
- // Adding the name of the property (e.g. "randomString")
- // Or for properties in a nested structure: input1_randomValue
- // "pre" is there for the ", " part
- s.append(pre).append("\"").append(preProperty).append(property.getRuntimeName()).append("\" ");
-
- // adding the type of the property (e.g. "VARCHAR(255)")
- if (property instanceof EventPropertyPrimitive) {
- s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
- } else {
- // Must be an EventPropertyList then
- s.append(SqlAttribute.getFromUri(XSD._string.toString()));
- }
- }
- pre = ", ";
- }
-
- return s;
+ protected void validateTable() throws SpRuntimeException {
+ //TODO: Add validation of an existing table
+ if (false) {
+ throw new SpRuntimeException("Table '" + tableName + "' does not match the eventproperties");
}
-
- /**
- * Checks if the input string is allowed (regEx match and length > 0)
- *
- * @param input String which is getting matched with the regEx
- * @param regExIdentifier Information about the use of the input. Gets included in the exception message
- * @throws SpRuntimeException If {@code input} does not match with {@link JdbcClient#allowedRegEx}
- * or if the length of {@code input} is 0
- */
- protected final void checkRegEx(String input, String regExIdentifier) throws SpRuntimeException {
- if (!input.matches(allowedRegEx) || input.length() == 0) {
- throw new SpRuntimeException(regExIdentifier + " '" + input
- + "' not allowed (allowed: '" + allowedRegEx + "') with a min length of 1");
- }
+ }
+
+ /**
+ * Closes all open connections and statements of JDBC
+ */
+ protected void closeAll() {
+ boolean error = false;
+ try {
+ if (st != null) {
+ st.close();
+ st = null;
+ }
+ } catch (SQLException e) {
+ error = true;
+ logger.warn("Exception when closing the statement: " + e.getMessage());
}
-
- protected void validateTable() throws SpRuntimeException {
- //TODO: Add validation of an existing table
- if (false) {
- throw new SpRuntimeException("Table '" + tableName + "' does not match the eventproperties");
- }
+ try {
+ if (c != null) {
+ c.close();
+ c = null;
+ }
+ } catch (SQLException e) {
+ error = true;
+ logger.warn("Exception when closing the connection: " + e.getMessage());
}
-
- /**
- * Closes all open connections and statements of JDBC
- */
- protected void closeAll() {
- boolean error = false;
- try {
- if (st != null) {
- st.close();
- st = null;
- }
- } catch (SQLException e) {
- error = true;
- logger.warn("Exception when closing the statement: " + e.getMessage());
- }
- try {
- if (c != null) {
- c.close();
- c = null;
- }
- } catch (SQLException e) {
- error = true;
- logger.warn("Exception when closing the connection: " + e.getMessage());
- }
- try {
- if (ps != null) {
- ps.close();
- ps = null;
- }
- } catch (SQLException e) {
- error = true;
- logger.warn("Exception when closing the prepared statement: " + e.getMessage());
- }
- if (!error) {
- logger.info("Shutdown all connections successfully.");
- }
+ try {
+ if (ps != null) {
+ ps.close();
+ ps = null;
+ }
+ } catch (SQLException e) {
+ error = true;
+ logger.warn("Exception when closing the prepared statement: " + e.getMessage());
+ }
+ if (!error) {
+ logger.info("Shutdown all connections successfully.");
}
+ }
- protected void checkConnected() throws SpRuntimeException {
- if (c == null) {
- throw new SpRuntimeException("Connection is not established.");
- }
+ protected void checkConnected() throws SpRuntimeException {
+ if (c == null) {
+ throw new SpRuntimeException("Connection is not established.");
}
+ }
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/Parameterinfo.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/Parameterinfo.java
new file mode 100644
index 0000000..74fc329
--- /dev/null
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/Parameterinfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.jdbcclient;
+
+/**
+ * Contains all information needed to "fill" a prepared statement (index and the data type)
+ */
+public class Parameterinfo {
+ protected int index;
+ protected SqlAttribute type;
+
+ public Parameterinfo(final int index, final SqlAttribute type) {
+ this.index = index;
+ this.type = type;
+ }
+}
+
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
new file mode 100644
index 0000000..88f0059
--- /dev/null
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.jdbcclient;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.vocabulary.XSD;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+
+/**
+ * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
+ * If no matching type is found, it is interpreted as a String (VARCHAR(255))
+ */
+public enum SqlAttribute {
+ INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
+ BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
+ private final String sqlName;
+
+ SqlAttribute(String s) {
+ sqlName = s;
+ }
+
+ /**
+ * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
+ * interpreted as a String (VARCHAR(255))
+ *
+ * @param o The object which should be identified
+ * @return An {@link SqlAttribute} of the identified type
+ */
+ public static SqlAttribute getFromObject(final Object o) {
+ SqlAttribute r;
+ if (o instanceof Integer) {
+ r = SqlAttribute.INTEGER;
+ } else if (o instanceof Long) {
+ r = SqlAttribute.LONG;
+ } else if (o instanceof Float) {
+ r = SqlAttribute.FLOAT;
+ } else if (o instanceof Double) {
+ r = SqlAttribute.DOUBLE;
+ } else if (o instanceof Boolean) {
+ r = SqlAttribute.BOOLEAN;
+ } else {
+ r = SqlAttribute.STRING;
+ }
+ return r;
+ }
+
+ public static SqlAttribute getFromUri(final String s) {
+ SqlAttribute r;
+ if (s.equals(XSD._integer.toString())) {
+ r = SqlAttribute.INTEGER;
+ } else if (s.equals(XSD._long.toString())) {
+ r = SqlAttribute.LONG;
+ } else if (s.equals(XSD._float.toString())) {
+ r = SqlAttribute.FLOAT;
+ } else if (s.equals(XSD._double.toString())) {
+ r = SqlAttribute.DOUBLE;
+ } else if (s.equals(XSD._boolean.toString())) {
+ r = SqlAttribute.BOOLEAN;
+ } else {
+ r = SqlAttribute.STRING;
+ }
+ return r;
+ }
+
+ /**
+ * Sets the value in the prepardStatement {@code ps}
+ *
+ * @param p The needed info about the parameter (index and type)
+ * @param value The value of the object, which should be filled in the
+ * @param ps The prepared statement, which will be filled
+ * @throws SpRuntimeException When the data type in {@code p} is unknown
+ * @throws SQLException When the setters of the statement throw an
+ * exception (e.g. {@code setInt()})
+ */
+ public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
+ throws SQLException, SpRuntimeException {
+ switch (p.type) {
+ case INTEGER:
+ ps.setInt(p.index, (Integer) value);
+ break;
+ case LONG:
+ ps.setLong(p.index, (Long) value);
+ break;
+ case FLOAT:
+ ps.setFloat(p.index, (Float) value);
+ break;
+ case DOUBLE:
+ ps.setDouble(p.index, (Double) value);
+ break;
+ case BOOLEAN:
+ ps.setBoolean(p.index, (Boolean) value);
+ break;
+ case STRING:
+ ps.setString(p.index, value.toString());
+ break;
+ default:
+ throw new SpRuntimeException("Unknown SQL datatype");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return sqlName;
+ }
+}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 7009970..95c3538 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.SqlAttribute;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.vocabulary.XSD;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
@@ -32,8 +33,6 @@ import org.apache.streampipes.model.schema.EventProperty;
import java.sql.*;
import java.util.*;
-import java.util.Objects;
-
public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
@@ -80,7 +79,6 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
}
- @Override
protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
checkRegEx(databaseName, "databasename");
try {
@@ -218,13 +216,13 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
}
- private StringBuilder extractEventProperties(List<EventProperty> properties)
+ protected StringBuilder extractEventProperties(List<EventProperty> properties)
throws SpRuntimeException {
return extractEventProperties(properties, "");
}
- private StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+ protected StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
throws SpRuntimeException {
StringBuilder s = new StringBuilder();
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 68ae54d..c8e52aa 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -21,11 +21,10 @@ package org.apache.streampipes.sinks.databases.jvm.postgresql;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
-public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParameters> {
+public class PostgreSql extends PostgresJdbcClient implements EventSink<PostgreSqlParameters> {
private static Logger LOG;
@@ -47,7 +46,9 @@ public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParame
"^[a-zA-Z_][a-zA-Z0-9_]*$",
"org.postgresql.Driver",
"postgresql",
- LOG);
+ LOG,
+ parameters.getSchemaName(),
+ parameters.isToDropTable());
}
@Override
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
index b7a5838..356bab6 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
@@ -38,7 +39,12 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
private static final String DATABASE_NAME_KEY = "db_name";
private static final String DATABASE_TABLE_KEY = "db_table";
private static final String DATABASE_USER_KEY = "db_user";
+ private static final String DATABASE_SCHEMA_KEY = "db_schema";
private static final String DATABASE_PASSWORD_KEY = "db_password";
+ private static final String DATABASE_REPLACETABLE_KEY = "db_replaceTable";
+
+ private static final String YES = "Yes";
+ private static final String NO = "No";
@Override
public DataSinkDescription declareModel() {
@@ -54,7 +60,11 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
.requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
.requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
.requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
+ .requiredTextParameter(Labels.withId(DATABASE_SCHEMA_KEY))
.requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+ .requiredSingleValueSelection(
+ Labels.withId(DATABASE_REPLACETABLE_KEY),
+ Options.from(YES, NO))
.build();
}
@@ -62,12 +72,22 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
public ConfiguredEventSink<PostgreSqlParameters> onInvocation(DataSinkInvocation graph,
DataSinkParameterExtractor extractor) {
+
+
String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, String.class);
Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, Integer.class);
String dbName = extractor.singleValueParameter(DATABASE_NAME_KEY, String.class);
String tableName = extractor.singleValueParameter(DATABASE_TABLE_KEY, String.class);
String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
String password = extractor.secretValue(DATABASE_PASSWORD_KEY);
+ String schemaName = extractor.singleValueParameter(DATABASE_SCHEMA_KEY, String.class);
+ String valueOption = extractor.selectedSingleValue(DATABASE_REPLACETABLE_KEY, String.class);
+
+
+ boolean IsToDropTable = false;
+ if (valueOption.equals(YES)) {
+ IsToDropTable = true;
+ }
PostgreSqlParameters params = new PostgreSqlParameters(graph,
hostname,
@@ -75,7 +95,9 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
dbName,
tableName,
user,
- password);
+ password,
+ schemaName,
+ IsToDropTable);
return new ConfiguredEventSink<>(params, PostgreSql::new);
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
index d45dfdb..2e50fdd 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
@@ -29,8 +29,10 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
private String tableName;
private String user;
private String password;
+ private String schemaName;
+ private boolean isToDropTable;
- public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password) {
+ public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password, String schemaName, boolean dropTable) {
super(graph);
this.PostgreSqlHost = PostgreSqlHost;
this.PostgreSqlPort = PostgreSqlPort;
@@ -38,6 +40,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
this.tableName = tableName;
this.user = user;
this.password = password;
+ this.schemaName = schemaName;
+ this.isToDropTable = dropTable;
}
public String getPostgreSqlHost() {
@@ -63,4 +67,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
public String getPassword() {
return password;
}
+
+ public String getSchemaName() { return schemaName; }
+
+ public boolean isToDropTable() { return isToDropTable; }
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
new file mode 100644
index 0000000..a70bdf4
--- /dev/null
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.postgresql;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyNested;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.vocabulary.XSD;
+
+import java.sql.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class PostgresJdbcClient extends JdbcClient {
+
+ protected String schemaName;
+ protected boolean schemaExists = false;
+ protected boolean isToDropTable = false;
+
+// /**
+// * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
+// * If no matching type is found, it is interpreted as a String (VARCHAR(255))
+// */
+ protected enum SqlAttribute {
+ INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
+ private final String sqlName;
+
+ SqlAttribute(String s) {
+ sqlName = s;
+ }
+}
+//
+// /**
+// * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
+// * interpreted as a String (VARCHAR(255))
+// *
+// * @param o The object which should be identified
+// * @return An {@link SqlAttribute} of the identified type
+// */
+// public static SqlAttribute getFromObject(final Object o) {
+// SqlAttribute r;
+// if (o instanceof Integer) {
+// r = SqlAttribute.INTEGER;
+// } else if (o instanceof Long) {
+// r = SqlAttribute.LONG;
+// } else if (o instanceof Float) {
+// r = SqlAttribute.FLOAT;
+// } else if (o instanceof Double) {
+// r = SqlAttribute.DOUBLE;
+// } else if (o instanceof Boolean) {
+// r = SqlAttribute.BOOLEAN;
+// } else {
+// r = SqlAttribute.STRING;
+// }
+// return r;
+// }
+//
+// public static SqlAttribute getFromUri(final String s) {
+// SqlAttribute r;
+// if (s.equals(XSD._integer.toString())) {
+// r = SqlAttribute.INTEGER;
+// } else if (s.equals(XSD._long.toString())) {
+// r = SqlAttribute.LONG;
+// } else if (s.equals(XSD._float.toString())) {
+// r = SqlAttribute.FLOAT;
+// } else if (s.equals(XSD._double.toString())) {
+// r = SqlAttribute.DOUBLE;
+// } else if (s.equals(XSD._boolean.toString())) {
+// r = SqlAttribute.BOOLEAN;
+// } else {
+// r = SqlAttribute.STRING;
+// }
+// return r;
+// }
+//
+// /**
+// * Sets the value in the prepardStatement {@code ps}
+// *
+// * @param p The needed info about the parameter (index and type)
+// * @param value The value of the object, which should be filled in the
+// * @param ps The prepared statement, which will be filled
+// * @throws SpRuntimeException When the data type in {@code p} is unknown
+// * @throws SQLException When the setters of the statement throw an
+// * exception (e.g. {@code setInt()})
+// */
+// public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
+// throws SQLException, SpRuntimeException {
+// switch (p.type) {
+// case INTEGER:
+// ps.setInt(p.index, (Integer) value);
+// break;
+// case LONG:
+// ps.setLong(p.index, (Long) value);
+// break;
+// case FLOAT:
+// ps.setFloat(p.index, (Float) value);
+// break;
+// case DOUBLE:
+// ps.setDouble(p.index, (Double) value);
+// break;
+// case BOOLEAN:
+// ps.setBoolean(p.index, (Boolean) value);
+// break;
+// case STRING:
+// ps.setString(p.index, value.toString());
+// break;
+// default:
+// throw new SpRuntimeException("Unknown SQL datatype");
+// }
+// }
+//
+// @Override
+// public String toString() {
+// return sqlName;
+// }
+// }
+
+// /**
+// * Contains all information needed to "fill" a prepared statement (index and the data type)
+// */
+// protected static class Parameterinfo {
+// private int index;
+// private SqlAttribute type;
+//
+// public Parameterinfo(final int index, final SqlAttribute type) {
+// this.index = index;
+// this.type = type;
+// }
+// }
+
+ public PostgresJdbcClient() {
+ super();
+ }
+
+ protected void initializeJdbc(List<EventProperty> eventProperties,
+ String host, Integer port,
+ String databaseName,
+ String tableName,
+ String user,
+ String password,
+ String allowedRegEx,
+ String driver,
+ String urlName,
+ Logger logger,
+ String schemaName,
+ boolean isToDropTable) throws SpRuntimeException {
+ super.initializeJdbc(
+ eventProperties,
+ host,
+ port,
+ databaseName,
+ tableName,
+ user,
+ password,
+ allowedRegEx,
+ driver,
+ urlName,
+ logger);
+ this.schemaName = schemaName;
+ this.isToDropTable = isToDropTable;
+
+ try {
+ Class.forName(driver);
+ } catch (ClassNotFoundException e) {
+ throw new SpRuntimeException("Driver '" + driver + "' not found.");
+ }
+
+ connect();
+ ensureDatabaseExists();
+ ensureSchemaExists();
+ ensureTableExists();
+ }
+
+
+ /**
+ * If this method returns successfully a schema with the name in {@link PostgresJdbcClient#schemaName} exists in the database
+ * with the given database name exists on the server, specified by the url.
+ *
+ * @throws SpRuntimeException If the table does not exist and could not be created
+ */
+ protected void ensureSchemaExists() throws SpRuntimeException {
+ try {
+ // Database should exist by now so we can establish a connection
+ c = DriverManager.getConnection(url + databaseName, user, password);
+ st = c.createStatement();
+ ResultSet rs = c.getMetaData().getSchemas();
+
+ boolean isItExisting = false;
+ while (rs.next()) {
+ String schema = rs.getString("TABLE_SCHEM");
+ if (schema.toLowerCase().equals(schemaName.toLowerCase())) {
+ isItExisting = true;
+ }
+ }
+ if (!isItExisting) {
+ createSchema();
+ }
+
+ schemaExists = true;
+ rs.close();
+ } catch (SQLException e) {
+ closeAll();
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+
+ /**
+ * If this method returns successfully a table with the name in {@link PostgresJdbcClient#tableName} exists in the database
+ * with the given database name exists on the server, specified by the url.
+ *
+ * @throws SpRuntimeException If the table does not exist and could not be created
+ */
+ @Override
+ protected void ensureTableExists() throws SpRuntimeException {
+ try {
+ // Database should exist by now so we can establish a connection
+ c = DriverManager.getConnection(url + databaseName, user, password);
+ st = c.createStatement();
+ ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+ while (rs.next()) {
+ // same table names can exists in different schmemas
+ if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+ if (isToDropTable) {
+ createTable();
+ }
+ validateTable();
+ } else {
+ createTable();
+ }
+ }
+ tableExists = true;
+ rs.close();
+ } catch (SQLException e) {
+ closeAll();
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+
+
+ /**
+ * Prepares a statement for the insertion of values or the
+ *
+ * @param event The event which should be saved to the Postgres table
+ * @throws SpRuntimeException When there was an error in the saving process
+ */
+ @Override
+ protected void save(final Event event) throws SpRuntimeException {
+ //TODO: Add batch support (https://stackoverflow.com/questions/3784197/efficient-way-to-do-batch-inserts-with-jdbc)
+ checkConnected();
+ Map<String, Object> eventMap = event.getRaw();
+ if (event == null) {
+ throw new SpRuntimeException("event is null");
+ }
+
+ if (!schemaExists) {
+ // Creates the schema
+ createSchema();
+ schemaExists = true;
+ }
+
+ if (!tableExists) {
+ // Creates the table
+ createTable();
+ tableExists = true;
+ }
+ try {
+ executePreparedStatement(eventMap);
+ } catch (SQLException e) {
+ if (e.getSQLState().substring(0, 2).equals("42")) {
+ // If the table does not exists (because it got deleted or something, will cause the error
+ // code "42") we will try to create a new one. Otherwise we do not handle the exception.
+ logger.warn("Table '" + tableName + "' was unexpectedly not found and gets recreated.");
+ tableExists = false;
+ createTable();
+ tableExists = true;
+
+ try {
+ executePreparedStatement(eventMap);
+ } catch (SQLException e1) {
+ throw new SpRuntimeException(e1.getMessage());
+ }
+ } else {
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+ }
+
+
+ /**
+ * Creates a schema with the name {@link PostgresJdbcClient#schemaName}
+ *
+ * @throws SpRuntimeException If the {@link PostgresJdbcClient#schemaName} is not allowed, if executeUpdate throws an SQLException
+ */
+ protected void createSchema() throws SpRuntimeException {
+ checkConnected();
+ checkRegEx(tableName, "Tablename");
+
+ StringBuilder statement = new StringBuilder("CREATE SCHEMA ");
+ statement.append(schemaName).append(";");
+ try {
+ st.executeUpdate(statement.toString());
+ } catch (SQLException e) {
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ protected void createTable() throws SpRuntimeException {
+ checkConnected();
+ checkRegEx(tableName, "Tablename");
+
+ if (isToDropTable) {
+ StringBuilder statement = new StringBuilder("DROP TABLE IF EXISTS ");
+ statement.append(schemaName);
+ statement.append(".");
+ statement.append(tableName);
+ statement.append(";");
+
+ try {
+ st.executeUpdate(statement.toString());
+ } catch (SQLException e) {
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+ StringBuilder statement = new StringBuilder("CREATE TABLE ");
+ statement.append(schemaName);
+ statement.append(".");
+ statement.append(tableName).append(" ( ");
+ statement.append(extractEventProperties(eventProperties)).append(" );");
+
+ try {
+ st.executeUpdate(statement.toString());
+ } catch (SQLException e) {
+ e.getErrorCode();
+ if (e.getSQLState().equals("42P07")) {
+ throw new SpRuntimeException("Table already exists. Change option \"DROP TABLE\" to prevent this error. Error Message: " + e.getMessage());
+ } else {
+ throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+ }
+ }
+ }
+}