You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/21 16:13:16 UTC

[incubator-streampipes-extensions] branch feature/jdbc_rewrite updated: fixes fallback, fixes empty db situation in postgres, SQL Attributes and Geometry Timestamp type added

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/jdbc_rewrite
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/feature/jdbc_rewrite by this push:
     new a0e8d0c  fixes fallback,fixes empty db situation in postgres,  SQL Attributes and Geometry Timestamp type added
a0e8d0c is described below

commit a0e8d0cca2a9447553a18e959c8041cdf0b991c0
Author: micklich <fl...@disy.net>
AuthorDate: Sun Jun 21 18:12:57 2020 +0200

    fixes fallback,fixes empty db situation in postgres,  SQL Attributes and Geometry Timestamp type added
---
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java | 14 +++++--
 .../databases/jvm/jdbcclient/SqlAttribute.java     | 32 ++++++++------
 .../sinks/databases/jvm/mysql/Mysql.java           |  2 +-
 .../jvm/postgresql/PostgresJdbcClient.java         | 49 ++++++++++++++--------
 4 files changed, 62 insertions(+), 35 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index b56bb20..ad2a917 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -40,6 +40,9 @@ public class JdbcClient {
   protected String tableName;
   protected String user;
   protected String password;
+  protected String urlName;
+  protected String host;
+  protected Integer port;
 
   protected boolean tableExists = false;
 
@@ -85,6 +88,10 @@ public class JdbcClient {
     this.password = password;
     this.allowedRegEx = allowedRegEx;
     this.logger = logger;
+    // needed for fallback
+    this.urlName = urlName;
+    this.host = host;
+    this.port = port;
     this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/" + databaseName;
     try {
       Class.forName(driver);
@@ -116,7 +123,8 @@ public class JdbcClient {
         throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
       } else if (e.getSQLState().substring(0, 2).equals("3D")){
         try {
-          c = DriverManager.getConnection(url.replaceAll(databaseName, ""), user, password);
+          String urlFallback= "jdbc:" + urlName + "://" + host + ":" + port + "/";
+          c = DriverManager.getConnection(urlFallback, user, password);
           ensureDatabaseExists();
         } catch (SQLException e2) {
           closeAll();
@@ -414,10 +422,10 @@ public class JdbcClient {
 
         // adding the type of the property (e.g. "VARCHAR(255)")
         if (property instanceof EventPropertyPrimitive) {
-          s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+          s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()).sqlName);
         } else {
           // Must be an EventPropertyList then
-          s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+          s.append(SqlAttribute.getFromUri(XSD._string.toString()).sqlName);
         }
       }
       pre = ", ";
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
index 7da048a..7fa360a 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
@@ -19,10 +19,12 @@
 package org.apache.streampipes.sinks.databases.jvm.jdbcclient;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.vocabulary.XSD;
 
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.HashMap;
 
 
 /**
@@ -34,15 +36,13 @@ public enum SqlAttribute {
   INTEGER("INT"),
   LONG("BIGINT"),
   FLOAT("FLOAT"),
-  DOUBLE("DOUBLE"),
-  STRING("VARCHAR(255)"),
+  DOUBLE("DOUBLE PRECISION"),
+  STRING("TEXT"),
   BOOLEAN("BOOLEAN"),
-  //MYSQL
-  MYSQL_DATETIME("DATETIME"),
-  //POSTGRES / POSTGIS
-  PG_DOUBLE("NUMERIC");
-
-  private final String sqlName;
+  TIMESTAMP("TIMESTAMP"),
+  GEOMETRY("GEOMETRY");
+  
+  public final String sqlName;
 
   SqlAttribute(String s) {
     sqlName = s;
@@ -85,6 +85,10 @@ public enum SqlAttribute {
       r = SqlAttribute.DOUBLE;
     } else if (s.equals(XSD._boolean.toString())) {
       r = SqlAttribute.BOOLEAN;
+    } else if (s.equals(SO.DateTime)) {
+      r = SqlAttribute.TIMESTAMP;
+    } else if (s.equals("http://www.opengis.net/ont/geosparql#Geometry")) {
+      r = SqlAttribute.GEOMETRY;
     } else {
       r = SqlAttribute.STRING;
     }
@@ -122,13 +126,15 @@ public enum SqlAttribute {
       case STRING:
         ps.setString(p.index, value.toString());
         break;
+      case TIMESTAMP:
+        java.sql.Timestamp sqlTimestamp = new java.sql.Timestamp((Long) value);
+        ps.setString(p.index,  sqlTimestamp.toString());
+      case GEOMETRY:
+        //todo extract unit and SRID after #
+        int epsg = 4326;
+        ps.setString(p.index,  "ST_GeomFromText('" + value.toString() + "', "+ epsg + "))");
       default:
         throw new SpRuntimeException("Unknown SQL datatype");
     }
   }
-
-  @Override
-  public String toString() {
-    return sqlName;
-  }
 }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index a009261..8a9a0dd 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -249,7 +249,7 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
                     // If domain property is a timestamp
                     if (property.getDomainProperties() != null && property.getDomainProperties().stream().anyMatch(x ->
                        SO.DateTime.equals(x.toString()))) {
-                        s.append(SqlAttribute.MYSQL_DATETIME);
+                        s.append(SqlAttribute.TIMESTAMP);
                         this.timestampKeys.add(property.getRuntimeName());
                     } else {
                         s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index 1dd1dec..4ae1dc1 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -47,7 +47,8 @@ public class PostgresJdbcClient extends JdbcClient {
 
 
   protected void initializeJdbc(List<EventProperty> eventProperties,
-                                String host, Integer port,
+                                String host,
+                                Integer port,
                                 String databaseName,
                                 String tableName,
                                 String user,
@@ -65,7 +66,11 @@ public class PostgresJdbcClient extends JdbcClient {
     super.password = password;
     super.allowedRegEx = allowedRegEx;
     super.logger = logger;
-    super.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
+    // needed for fallback
+    super.urlName = urlName;
+    super.host = host;
+    super.port = port;
+    super.url = "jdbc:" + urlName + "://" + host + ":" + port + "/" + databaseName;
     this.schemaName = schemaName;
     this.isToDropTable = isToDropTable;
 
@@ -89,7 +94,7 @@ public class PostgresJdbcClient extends JdbcClient {
   protected void ensureSchemaExists() throws SpRuntimeException {
     try {
       // Database should exist by now so we can establish a connection
-      c = DriverManager.getConnection(url + databaseName, user, password);
+      c = DriverManager.getConnection(url, user, password);
       st = c.createStatement();
       ResultSet rs = c.getMetaData().getSchemas();
 
@@ -123,21 +128,29 @@ public class PostgresJdbcClient extends JdbcClient {
   protected void ensureTableExists() throws SpRuntimeException {
     try {
       // Database should exist by now so we can establish a connection
-      c = DriverManager.getConnection(url + databaseName, user, password);
+      c = DriverManager.getConnection(url, user, password);
       st = c.createStatement();
       ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
-      while (rs.next()) {
-        // same table names can exists in different schmemas
-        if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
-          if (isToDropTable) {
+
+      // no table at all found in db so create table
+      if (!rs.next()){
+        createTable();
+        tableExists = true;
+      } else {
+        while (rs.next()) {
+          // same table names can exists in different schmemas
+          if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+            if (isToDropTable) {
+              createTable();
+              tableExists = true;
+            }
+            validateTable();
+          } else {
             createTable();
+            tableExists = true;
           }
-          validateTable();
-        } else {
-          createTable();
         }
       }
-      tableExists = true;
       rs.close();
     } catch (SQLException e) {
       closeAll();
@@ -299,14 +312,14 @@ public class PostgresJdbcClient extends JdbcClient {
         // Adding the type of the property (e.g. "VARCHAR(255)")
         if (property instanceof EventPropertyPrimitive) {
           // use PG_DOUBLE instead of DEFAULT
-          if ((((EventPropertyPrimitive) property).getRuntimeType().equals(XSD._double.toString()))) {
-            s.append(SqlAttribute.PG_DOUBLE);
-          } else {
-            s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
-          }
+//          if ((((EventPropertyPrimitive) property).getRuntimeType().equals(XSD._double.toString()))) {
+//            s.append(SqlAttribute.PG_DOUBLE.sqlName);
+//          } else {
+            s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()).sqlName);
+//          }
         } else {
           // Must be an EventPropertyList then
-          s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+          s.append(SqlAttribute.getFromUri(XSD._string.toString()).sqlName);
         }
       }
       pre = ", ";