You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/07 09:32:47 UTC

[incubator-streampipes-extensions] 01/03: extended super attributes. change SQLAttribute and names

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/jdbc_rewrite
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 3473e021dfda7b45cda5ccf40d779d793edc7199
Author: micklich <fl...@disy.net>
AuthorDate: Sun Jun 7 10:00:39 2020 +0200

    extended super attributes. change SQLAttribute and names
---
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java |   2 -
 .../databases/jvm/jdbcclient/SqlAttribute.java     |  13 +-
 .../sinks/databases/jvm/mysql/Mysql.java           |   2 +-
 .../jvm/postgresql/PostgresJdbcClient.java         | 216 +++++++++------------
 4 files changed, 104 insertions(+), 129 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index ee67bac..60b7ad1 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -78,7 +78,6 @@ public class JdbcClient {
                                 String driver,
                                 String urlName,
                                 Logger logger) throws SpRuntimeException {
-
     this.eventProperties = eventProperties;
     this.databaseName = databaseName;
     this.tableName = tableName;
@@ -86,7 +85,6 @@ public class JdbcClient {
     this.password = password;
     this.allowedRegEx = allowedRegEx;
     this.logger = logger;
-
     this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
     try {
       Class.forName(driver);
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
index 88f0059..6d62146 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
@@ -30,8 +30,17 @@ import java.sql.SQLException;
  * If no matching type is found, it is interpreted as a String (VARCHAR(255))
  */
 public enum SqlAttribute {
-  INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
-  BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
+  // DEFAULT
+  INTEGER("INT"),
+  LONG("BIGINT"),
+  FLOAT("FLOAT"),
+  DOUBLE("DOUBLE"),
+  STRING("VARCHAR(255)"),
+  BOOLEAN("BOOLEAN"),
+  //MYSQL
+  MYSQL_DATETIME("DATETIME"),
+  //POSTGRES / POSTGIS
+  PG_DOUBLE("DOUBLE PRECISION");
   private final String sqlName;
 
   SqlAttribute(String s) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 95c3538..a009261 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -249,7 +249,7 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
                     // If domain property is a timestamp
                     if (property.getDomainProperties() != null && property.getDomainProperties().stream().anyMatch(x ->
                        SO.DateTime.equals(x.toString()))) {
-                        s.append(SqlAttribute.DATETIME);
+                        s.append(SqlAttribute.MYSQL_DATETIME);
                         this.timestampKeys.add(property.getRuntimeName());
                     } else {
                         s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index a70bdf4..d45462b 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -25,10 +25,11 @@ import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyNested;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.SqlAttribute;
+import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.vocabulary.XSD;
 
 import java.sql.*;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -39,121 +40,12 @@ public class PostgresJdbcClient extends JdbcClient {
   protected boolean schemaExists = false;
   protected boolean isToDropTable = false;
 
-//  /**
-//   * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
-//   * If no matching type is found, it is interpreted as a String (VARCHAR(255))
-//   */
-  protected enum SqlAttribute {
-  INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
-  private final String sqlName;
-
-  SqlAttribute(String s) {
-    sqlName = s;
-  }
-}
-//
-//    /**
-//     * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
-//     * interpreted as a String (VARCHAR(255))
-//     *
-//     * @param o The object which should be identified
-//     * @return An {@link SqlAttribute} of the identified type
-//     */
-//    public static SqlAttribute getFromObject(final Object o) {
-//      SqlAttribute r;
-//      if (o instanceof Integer) {
-//        r = SqlAttribute.INTEGER;
-//      } else if (o instanceof Long) {
-//        r = SqlAttribute.LONG;
-//      } else if (o instanceof Float) {
-//        r = SqlAttribute.FLOAT;
-//      } else if (o instanceof Double) {
-//        r = SqlAttribute.DOUBLE;
-//      } else if (o instanceof Boolean) {
-//        r = SqlAttribute.BOOLEAN;
-//      } else {
-//        r = SqlAttribute.STRING;
-//      }
-//      return r;
-//    }
-//
-//    public static SqlAttribute getFromUri(final String s) {
-//      SqlAttribute r;
-//      if (s.equals(XSD._integer.toString())) {
-//        r = SqlAttribute.INTEGER;
-//      } else if (s.equals(XSD._long.toString())) {
-//        r = SqlAttribute.LONG;
-//      } else if (s.equals(XSD._float.toString())) {
-//        r = SqlAttribute.FLOAT;
-//      } else if (s.equals(XSD._double.toString())) {
-//        r = SqlAttribute.DOUBLE;
-//      } else if (s.equals(XSD._boolean.toString())) {
-//        r = SqlAttribute.BOOLEAN;
-//      } else {
-//        r = SqlAttribute.STRING;
-//      }
-//      return r;
-//    }
-//
-//    /**
-//     * Sets the value in the prepardStatement {@code ps}
-//     *
-//     * @param p     The needed info about the parameter (index and type)
-//     * @param value The value of the object, which should be filled in the
-//     * @param ps    The prepared statement, which will be filled
-//     * @throws SpRuntimeException When the data type in {@code p} is unknown
-//     * @throws SQLException       When the setters of the statement throw an
-//     *                            exception (e.g. {@code setInt()})
-//     */
-//    public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
-//        throws SQLException, SpRuntimeException {
-//      switch (p.type) {
-//        case INTEGER:
-//          ps.setInt(p.index, (Integer) value);
-//          break;
-//        case LONG:
-//          ps.setLong(p.index, (Long) value);
-//          break;
-//        case FLOAT:
-//          ps.setFloat(p.index, (Float) value);
-//          break;
-//        case DOUBLE:
-//          ps.setDouble(p.index, (Double) value);
-//          break;
-//        case BOOLEAN:
-//          ps.setBoolean(p.index, (Boolean) value);
-//          break;
-//        case STRING:
-//          ps.setString(p.index, value.toString());
-//          break;
-//        default:
-//          throw new SpRuntimeException("Unknown SQL datatype");
-//      }
-//    }
-//
-//    @Override
-//    public String toString() {
-//      return sqlName;
-//    }
-//  }
-
-//  /**
-//   * Contains all information needed to "fill" a prepared statement (index and the data type)
-//   */
-//  protected static class Parameterinfo {
-//    private int index;
-//    private SqlAttribute type;
-//
-//    public Parameterinfo(final int index, final SqlAttribute type) {
-//      this.index = index;
-//      this.type = type;
-//    }
-//  }
 
   public PostgresJdbcClient() {
     super();
   }
 
+
   protected void initializeJdbc(List<EventProperty> eventProperties,
                                 String host, Integer port,
                                 String databaseName,
@@ -166,18 +58,14 @@ public class PostgresJdbcClient extends JdbcClient {
                                 Logger logger,
                                 String schemaName,
                                 boolean isToDropTable) throws SpRuntimeException {
-    super.initializeJdbc(
-        eventProperties,
-        host,
-        port,
-        databaseName,
-        tableName,
-        user,
-        password,
-        allowedRegEx,
-        driver,
-        urlName,
-        logger);
+    super.eventProperties = eventProperties;
+    super.databaseName = databaseName;
+    super.tableName = tableName;
+    super.user = user;
+    super.password = password;
+    super.allowedRegEx = allowedRegEx;
+    super.logger = logger;
+    super.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
     this.schemaName = schemaName;
     this.isToDropTable = isToDropTable;
 
@@ -186,7 +74,6 @@ public class PostgresJdbcClient extends JdbcClient {
     } catch (ClassNotFoundException e) {
       throw new SpRuntimeException("Driver '" + driver + "' not found.");
     }
-
     connect();
     ensureDatabaseExists();
     ensureSchemaExists();
@@ -364,4 +251,85 @@ public class PostgresJdbcClient extends JdbcClient {
       }
     }
   }
+
+
+//  protected StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+//      throws SpRuntimeException {
+//    // output: "randomString VARCHAR(255), randomValue INT"
+//    StringBuilder s = new StringBuilder();
+//    String pre = "";
+//    for (EventProperty property : properties) {
+//      // Protection against SqlInjection
+//
+//      checkRegEx(property.getRuntimeName(), "Column name");
+//      if (property instanceof EventPropertyNested) {
+//        // if it is a nested property, recursively extract the needed properties
+//        StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
+//            preProperty + property.getRuntimeName() + "_");
+//        if (tmp.length() > 0) {
+//          s.append(pre).append(tmp);
+//        }
+//      } else {
+//        // Adding the name of the property (e.g. "randomString")
+//        // Or for properties in a nested structure: input1_randomValue
+//        // "pre" is there for the ", " part
+//        s.append(pre).append("\"").append(preProperty).append(property.getRuntimeName()).append("\" ");
+//
+//        // adding the type of the property (e.g. "VARCHAR(255)")
+//        if (property instanceof EventPropertyPrimitive) {
+//          s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+//        } else {
+//          // Must be an EventPropertyList then
+//          s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+//        }
+//      }
+//      pre = ", ";
+//    }
+//
+//    return s;
+//  }
+
+  @Override
+  protected StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+      throws SpRuntimeException {
+
+    StringBuilder s = new StringBuilder();
+    String pre = "";
+    for (EventProperty property : properties) {
+
+      // Protection against SQL-Injection
+      checkRegEx(property.getRuntimeName(), "Column name");
+
+      if (property instanceof EventPropertyNested) {
+
+        // If is is a nested property, recursively extract the required properties
+        StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
+            preProperty + property.getRuntimeName() + "_");
+        if (tmp.length() > 0) {
+          s.append(pre).append(tmp);
+        }
+      } else {
+        // Adding the name of the property (e.g. "randomString")
+        s.append(pre).append(preProperty).append(property.getRuntimeName()).append(" ");
+
+        // Adding the type of the property (e.g. "VARCHAR(255)")
+        if (property instanceof EventPropertyPrimitive) {
+          // use PG_DOUBLE instead of DEFAULT
+          if ((((EventPropertyPrimitive) property).getRuntimeType().equals(XSD._double.toString()))) {
+            s.append(SqlAttribute.PG_DOUBLE);
+          } else {
+            s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+          }
+        } else {
+          // Must be an EventPropertyList then
+          s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+        }
+      }
+      pre = ", ";
+    }
+
+    return s;
+  }
+
+
 }