You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/21 16:13:16 UTC
[incubator-streampipes-extensions] branch feature/jdbc_rewrite
updated: fixes fallback, fixes empty db situation in postgres,
SQL Attributes and Geometry Timestamp type added
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/jdbc_rewrite
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/feature/jdbc_rewrite by this push:
new a0e8d0c fixes fallback,fixes empty db situation in postgres, SQL Attributes and Geometry Timestamp type added
a0e8d0c is described below
commit a0e8d0cca2a9447553a18e959c8041cdf0b991c0
Author: micklich <fl...@disy.net>
AuthorDate: Sun Jun 21 18:12:57 2020 +0200
fixes fallback,fixes empty db situation in postgres, SQL Attributes and Geometry Timestamp type added
---
.../sinks/databases/jvm/jdbcclient/JdbcClient.java | 14 +++++--
.../databases/jvm/jdbcclient/SqlAttribute.java | 32 ++++++++------
.../sinks/databases/jvm/mysql/Mysql.java | 2 +-
.../jvm/postgresql/PostgresJdbcClient.java | 49 ++++++++++++++--------
4 files changed, 62 insertions(+), 35 deletions(-)
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index b56bb20..ad2a917 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -40,6 +40,9 @@ public class JdbcClient {
protected String tableName;
protected String user;
protected String password;
+ protected String urlName;
+ protected String host;
+ protected Integer port;
protected boolean tableExists = false;
@@ -85,6 +88,10 @@ public class JdbcClient {
this.password = password;
this.allowedRegEx = allowedRegEx;
this.logger = logger;
+ // needed for fallback
+ this.urlName = urlName;
+ this.host = host;
+ this.port = port;
this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/" + databaseName;
try {
Class.forName(driver);
@@ -116,7 +123,8 @@ public class JdbcClient {
throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
} else if (e.getSQLState().substring(0, 2).equals("3D")){
try {
- c = DriverManager.getConnection(url.replaceAll(databaseName, ""), user, password);
+ String urlFallback= "jdbc:" + urlName + "://" + host + ":" + port + "/";
+ c = DriverManager.getConnection(urlFallback, user, password);
ensureDatabaseExists();
} catch (SQLException e2) {
closeAll();
@@ -414,10 +422,10 @@ public class JdbcClient {
// adding the type of the property (e.g. "VARCHAR(255)")
if (property instanceof EventPropertyPrimitive) {
- s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+ s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()).sqlName);
} else {
// Must be an EventPropertyList then
- s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+ s.append(SqlAttribute.getFromUri(XSD._string.toString()).sqlName);
}
}
pre = ", ";
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
index 7da048a..7fa360a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
@@ -19,10 +19,12 @@
package org.apache.streampipes.sinks.databases.jvm.jdbcclient;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.vocabulary.XSD;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.HashMap;
/**
@@ -34,15 +36,13 @@ public enum SqlAttribute {
INTEGER("INT"),
LONG("BIGINT"),
FLOAT("FLOAT"),
- DOUBLE("DOUBLE"),
- STRING("VARCHAR(255)"),
+ DOUBLE("DOUBLE PRECISION"),
+ STRING("TEXT"),
BOOLEAN("BOOLEAN"),
- //MYSQL
- MYSQL_DATETIME("DATETIME"),
- //POSTGRES / POSTGIS
- PG_DOUBLE("NUMERIC");
-
- private final String sqlName;
+ TIMESTAMP("TIMESTAMP"),
+ GEOMETRY("GEOMETRY");
+
+ public final String sqlName;
SqlAttribute(String s) {
sqlName = s;
@@ -85,6 +85,10 @@ public enum SqlAttribute {
r = SqlAttribute.DOUBLE;
} else if (s.equals(XSD._boolean.toString())) {
r = SqlAttribute.BOOLEAN;
+ } else if (s.equals(SO.DateTime)) {
+ r = SqlAttribute.TIMESTAMP;
+ } else if (s.equals("http://www.opengis.net/ont/geosparql#Geometry")) {
+ r = SqlAttribute.GEOMETRY;
} else {
r = SqlAttribute.STRING;
}
@@ -122,13 +126,15 @@ public enum SqlAttribute {
case STRING:
ps.setString(p.index, value.toString());
break;
+ case TIMESTAMP:
+ java.sql.Timestamp sqlTimestamp = new java.sql.Timestamp((Long) value);
+ ps.setString(p.index, sqlTimestamp.toString());
+ case GEOMETRY:
+ //todo extract unit and SRID after #
+ int epsg = 4326;
+ ps.setString(p.index, "ST_GeomFromText('" + value.toString() + "', "+ epsg + "))");
default:
throw new SpRuntimeException("Unknown SQL datatype");
}
}
-
- @Override
- public String toString() {
- return sqlName;
- }
}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index a009261..8a9a0dd 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -249,7 +249,7 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
// If domain property is a timestamp
if (property.getDomainProperties() != null && property.getDomainProperties().stream().anyMatch(x ->
SO.DateTime.equals(x.toString()))) {
- s.append(SqlAttribute.MYSQL_DATETIME);
+ s.append(SqlAttribute.TIMESTAMP);
this.timestampKeys.add(property.getRuntimeName());
} else {
s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index 1dd1dec..4ae1dc1 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -47,7 +47,8 @@ public class PostgresJdbcClient extends JdbcClient {
protected void initializeJdbc(List<EventProperty> eventProperties,
- String host, Integer port,
+ String host,
+ Integer port,
String databaseName,
String tableName,
String user,
@@ -65,7 +66,11 @@ public class PostgresJdbcClient extends JdbcClient {
super.password = password;
super.allowedRegEx = allowedRegEx;
super.logger = logger;
- super.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
+ // needed for fallback
+ super.urlName = urlName;
+ super.host = host;
+ super.port = port;
+ super.url = "jdbc:" + urlName + "://" + host + ":" + port + "/" + databaseName;
this.schemaName = schemaName;
this.isToDropTable = isToDropTable;
@@ -89,7 +94,7 @@ public class PostgresJdbcClient extends JdbcClient {
protected void ensureSchemaExists() throws SpRuntimeException {
try {
// Database should exist by now so we can establish a connection
- c = DriverManager.getConnection(url + databaseName, user, password);
+ c = DriverManager.getConnection(url, user, password);
st = c.createStatement();
ResultSet rs = c.getMetaData().getSchemas();
@@ -123,21 +128,29 @@ public class PostgresJdbcClient extends JdbcClient {
protected void ensureTableExists() throws SpRuntimeException {
try {
// Database should exist by now so we can establish a connection
- c = DriverManager.getConnection(url + databaseName, user, password);
+ c = DriverManager.getConnection(url, user, password);
st = c.createStatement();
ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
- while (rs.next()) {
- // same table names can exists in different schmemas
- if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
- if (isToDropTable) {
+
+ // no table at all found in db so create table
+ if (!rs.next()){
+ createTable();
+ tableExists = true;
+ } else {
+ while (rs.next()) {
+ // same table names can exists in different schmemas
+ if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+ if (isToDropTable) {
+ createTable();
+ tableExists = true;
+ }
+ validateTable();
+ } else {
createTable();
+ tableExists = true;
}
- validateTable();
- } else {
- createTable();
}
}
- tableExists = true;
rs.close();
} catch (SQLException e) {
closeAll();
@@ -299,14 +312,14 @@ public class PostgresJdbcClient extends JdbcClient {
// Adding the type of the property (e.g. "VARCHAR(255)")
if (property instanceof EventPropertyPrimitive) {
// use PG_DOUBLE instead of DEFAULT
- if ((((EventPropertyPrimitive) property).getRuntimeType().equals(XSD._double.toString()))) {
- s.append(SqlAttribute.PG_DOUBLE);
- } else {
- s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
- }
+// if ((((EventPropertyPrimitive) property).getRuntimeType().equals(XSD._double.toString()))) {
+// s.append(SqlAttribute.PG_DOUBLE.sqlName);
+// } else {
+ s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()).sqlName);
+// }
} else {
// Must be an EventPropertyList then
- s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+ s.append(SqlAttribute.getFromUri(XSD._string.toString()).sqlName);
}
}
pre = ", ";