You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/07 09:32:47 UTC
[incubator-streampipes-extensions] 01/03: extended super
attributes. change SQLAttribute and names
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/jdbc_rewrite
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 3473e021dfda7b45cda5ccf40d779d793edc7199
Author: micklich <fl...@disy.net>
AuthorDate: Sun Jun 7 10:00:39 2020 +0200
extended super attributes. change SQLAttribute and names
---
.../sinks/databases/jvm/jdbcclient/JdbcClient.java | 2 -
.../databases/jvm/jdbcclient/SqlAttribute.java | 13 +-
.../sinks/databases/jvm/mysql/Mysql.java | 2 +-
.../jvm/postgresql/PostgresJdbcClient.java | 216 +++++++++------------
4 files changed, 104 insertions(+), 129 deletions(-)
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index ee67bac..60b7ad1 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -78,7 +78,6 @@ public class JdbcClient {
String driver,
String urlName,
Logger logger) throws SpRuntimeException {
-
this.eventProperties = eventProperties;
this.databaseName = databaseName;
this.tableName = tableName;
@@ -86,7 +85,6 @@ public class JdbcClient {
this.password = password;
this.allowedRegEx = allowedRegEx;
this.logger = logger;
-
this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
try {
Class.forName(driver);
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
index 88f0059..6d62146 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
@@ -30,8 +30,17 @@ import java.sql.SQLException;
* If no matching type is found, it is interpreted as a String (VARCHAR(255))
*/
public enum SqlAttribute {
- INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
- BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
+ // DEFAULT
+ INTEGER("INT"),
+ LONG("BIGINT"),
+ FLOAT("FLOAT"),
+ DOUBLE("DOUBLE"),
+ STRING("VARCHAR(255)"),
+ BOOLEAN("BOOLEAN"),
+ //MYSQL
+ MYSQL_DATETIME("DATETIME"),
+ //POSTGRES / POSTGIS
+ PG_DOUBLE("DOUBLE PRECISION");
private final String sqlName;
SqlAttribute(String s) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 95c3538..a009261 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -249,7 +249,7 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
// If domain property is a timestamp
if (property.getDomainProperties() != null && property.getDomainProperties().stream().anyMatch(x ->
SO.DateTime.equals(x.toString()))) {
- s.append(SqlAttribute.DATETIME);
+ s.append(SqlAttribute.MYSQL_DATETIME);
this.timestampKeys.add(property.getRuntimeName());
} else {
s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index a70bdf4..d45462b 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -25,10 +25,11 @@ import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.SqlAttribute;
+import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.vocabulary.XSD;
import java.sql.*;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,121 +40,12 @@ public class PostgresJdbcClient extends JdbcClient {
protected boolean schemaExists = false;
protected boolean isToDropTable = false;
-// /**
-// * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
-// * If no matching type is found, it is interpreted as a String (VARCHAR(255))
-// */
- protected enum SqlAttribute {
- INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
- private final String sqlName;
-
- SqlAttribute(String s) {
- sqlName = s;
- }
-}
-//
-// /**
-// * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
-// * interpreted as a String (VARCHAR(255))
-// *
-// * @param o The object which should be identified
-// * @return An {@link SqlAttribute} of the identified type
-// */
-// public static SqlAttribute getFromObject(final Object o) {
-// SqlAttribute r;
-// if (o instanceof Integer) {
-// r = SqlAttribute.INTEGER;
-// } else if (o instanceof Long) {
-// r = SqlAttribute.LONG;
-// } else if (o instanceof Float) {
-// r = SqlAttribute.FLOAT;
-// } else if (o instanceof Double) {
-// r = SqlAttribute.DOUBLE;
-// } else if (o instanceof Boolean) {
-// r = SqlAttribute.BOOLEAN;
-// } else {
-// r = SqlAttribute.STRING;
-// }
-// return r;
-// }
-//
-// public static SqlAttribute getFromUri(final String s) {
-// SqlAttribute r;
-// if (s.equals(XSD._integer.toString())) {
-// r = SqlAttribute.INTEGER;
-// } else if (s.equals(XSD._long.toString())) {
-// r = SqlAttribute.LONG;
-// } else if (s.equals(XSD._float.toString())) {
-// r = SqlAttribute.FLOAT;
-// } else if (s.equals(XSD._double.toString())) {
-// r = SqlAttribute.DOUBLE;
-// } else if (s.equals(XSD._boolean.toString())) {
-// r = SqlAttribute.BOOLEAN;
-// } else {
-// r = SqlAttribute.STRING;
-// }
-// return r;
-// }
-//
-// /**
-// * Sets the value in the prepardStatement {@code ps}
-// *
-// * @param p The needed info about the parameter (index and type)
-// * @param value The value of the object, which should be filled in the
-// * @param ps The prepared statement, which will be filled
-// * @throws SpRuntimeException When the data type in {@code p} is unknown
-// * @throws SQLException When the setters of the statement throw an
-// * exception (e.g. {@code setInt()})
-// */
-// public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
-// throws SQLException, SpRuntimeException {
-// switch (p.type) {
-// case INTEGER:
-// ps.setInt(p.index, (Integer) value);
-// break;
-// case LONG:
-// ps.setLong(p.index, (Long) value);
-// break;
-// case FLOAT:
-// ps.setFloat(p.index, (Float) value);
-// break;
-// case DOUBLE:
-// ps.setDouble(p.index, (Double) value);
-// break;
-// case BOOLEAN:
-// ps.setBoolean(p.index, (Boolean) value);
-// break;
-// case STRING:
-// ps.setString(p.index, value.toString());
-// break;
-// default:
-// throw new SpRuntimeException("Unknown SQL datatype");
-// }
-// }
-//
-// @Override
-// public String toString() {
-// return sqlName;
-// }
-// }
-
-// /**
-// * Contains all information needed to "fill" a prepared statement (index and the data type)
-// */
-// protected static class Parameterinfo {
-// private int index;
-// private SqlAttribute type;
-//
-// public Parameterinfo(final int index, final SqlAttribute type) {
-// this.index = index;
-// this.type = type;
-// }
-// }
public PostgresJdbcClient() {
super();
}
+
protected void initializeJdbc(List<EventProperty> eventProperties,
String host, Integer port,
String databaseName,
@@ -166,18 +58,14 @@ public class PostgresJdbcClient extends JdbcClient {
Logger logger,
String schemaName,
boolean isToDropTable) throws SpRuntimeException {
- super.initializeJdbc(
- eventProperties,
- host,
- port,
- databaseName,
- tableName,
- user,
- password,
- allowedRegEx,
- driver,
- urlName,
- logger);
+ super.eventProperties = eventProperties;
+ super.databaseName = databaseName;
+ super.tableName = tableName;
+ super.user = user;
+ super.password = password;
+ super.allowedRegEx = allowedRegEx;
+ super.logger = logger;
+ super.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
this.schemaName = schemaName;
this.isToDropTable = isToDropTable;
@@ -186,7 +74,6 @@ public class PostgresJdbcClient extends JdbcClient {
} catch (ClassNotFoundException e) {
throw new SpRuntimeException("Driver '" + driver + "' not found.");
}
-
connect();
ensureDatabaseExists();
ensureSchemaExists();
@@ -364,4 +251,85 @@ public class PostgresJdbcClient extends JdbcClient {
}
}
}
+
+
+// protected StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+// throws SpRuntimeException {
+// // output: "randomString VARCHAR(255), randomValue INT"
+// StringBuilder s = new StringBuilder();
+// String pre = "";
+// for (EventProperty property : properties) {
+// // Protection against SqlInjection
+//
+// checkRegEx(property.getRuntimeName(), "Column name");
+// if (property instanceof EventPropertyNested) {
+// // if it is a nested property, recursively extract the needed properties
+// StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
+// preProperty + property.getRuntimeName() + "_");
+// if (tmp.length() > 0) {
+// s.append(pre).append(tmp);
+// }
+// } else {
+// // Adding the name of the property (e.g. "randomString")
+// // Or for properties in a nested structure: input1_randomValue
+// // "pre" is there for the ", " part
+// s.append(pre).append("\"").append(preProperty).append(property.getRuntimeName()).append("\" ");
+//
+// // adding the type of the property (e.g. "VARCHAR(255)")
+// if (property instanceof EventPropertyPrimitive) {
+// s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+// } else {
+// // Must be an EventPropertyList then
+// s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+// }
+// }
+// pre = ", ";
+// }
+//
+// return s;
+// }
+
+ @Override
+ protected StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+ throws SpRuntimeException {
+
+ StringBuilder s = new StringBuilder();
+ String pre = "";
+ for (EventProperty property : properties) {
+
+ // Protection against SQL-Injection
+ checkRegEx(property.getRuntimeName(), "Column name");
+
+ if (property instanceof EventPropertyNested) {
+
+ // If is is a nested property, recursively extract the required properties
+ StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
+ preProperty + property.getRuntimeName() + "_");
+ if (tmp.length() > 0) {
+ s.append(pre).append(tmp);
+ }
+ } else {
+ // Adding the name of the property (e.g. "randomString")
+ s.append(pre).append(preProperty).append(property.getRuntimeName()).append(" ");
+
+ // Adding the type of the property (e.g. "VARCHAR(255)")
+ if (property instanceof EventPropertyPrimitive) {
+ // use PG_DOUBLE instead of DEFAULT
+ if ((((EventPropertyPrimitive) property).getRuntimeType().equals(XSD._double.toString()))) {
+ s.append(SqlAttribute.PG_DOUBLE);
+ } else {
+ s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+ }
+ } else {
+ // Must be an EventPropertyList then
+ s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+ }
+ }
+ pre = ", ";
+ }
+
+ return s;
+ }
+
+
}