You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/01 21:35:14 UTC

[incubator-streampipes-extensions] branch dev updated: Working on SQL sink and adapters

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4d781ce  Working on SQL sink and adapters
4d781ce is described below

commit 4d781ce2583f050244478b42b727c5a1b0848227
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Jun 1 23:33:36 2020 +0200

    Working on SQL sink and adapters
---
 .../streampipes/connect/adapters/mysql/Column.java | 16 ++++++++++++-
 .../connect/adapters/mysql/MySqlAdapter.java       | 20 ++++++++++++----
 .../connect/adapters/mysql/MySqlClient.java        | 19 +++++++++++----
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java |  3 ++-
 .../sinks/databases/jvm/mysql/Column.java          | 11 ++++++++-
 .../sinks/databases/jvm/mysql/Mysql.java           | 27 ++++++++++++++++++++--
 6 files changed, 83 insertions(+), 13 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/Column.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/Column.java
index 58c63e2..4b0a68e 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/Column.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/Column.java
@@ -19,11 +19,13 @@
 package org.apache.streampipes.connect.adapters.mysql;
 
 import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.SO;
 
 class Column {
   private String name;
   private Datatypes type;
   private Object def;
+  private String domainProperty;
 
   Column(String name, String dataType, String columnType) {
     this.name = name;
@@ -52,8 +54,16 @@ class Column {
         this.type = Datatypes.String;
         def = "";
         break;
+      case "date":
+      case "datetime":
+      case "time":
+      case "timestamp":
+      case "year":
+        this.type = Datatypes.Long;
+        def = System.currentTimeMillis();
+        this.domainProperty = SO.DateTime;
+        break;
       default:
-        // date, datetime, time, timestamp, year
         throw new IllegalArgumentException("Type " + type + " not supported.");
     }
     if (columnType.equals("tinyint(1)") || columnType.equals("bit(1)")) {
@@ -73,4 +83,8 @@ class Column {
   public Object getDefault() {
     return def;
   }
+
+  public String getDomainProperty() {
+    return domainProperty;
+  }
 }
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java
index 4034b23..c88ac32 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.vocabulary.SO;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -43,6 +44,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import static org.apache.streampipes.sdk.helpers.EpProperties.timestampProperty;
+
 public class MySqlAdapter extends SpecificDataStreamAdapter {
 
   public static final String ID = "http://streampipes.org/adapter/specific/mysql";
@@ -183,10 +186,19 @@ public class MySqlAdapter extends SpecificDataStreamAdapter {
     extractTableInformation();
 
     for (Column column : tableSchema) {
-      allProperties.add(PrimitivePropertyBuilder
-              .create(column.getType(), database + "." + table + "." + column.getName())
-              .label(column.getName())
-              .build());
+      if (SO.DateTime.equals(column.getDomainProperty())) {
+        allProperties.add(PrimitivePropertyBuilder
+                .create(column.getType(), column.getName())
+                .label(column.getName())
+                .domainProperty(SO.DateTime)
+                .build());
+      } else {
+        allProperties.add(PrimitivePropertyBuilder
+                .create(column.getType(), column.getName())
+                .label(column.getName())
+                .build());
+      }
+
     }
 
     eventSchema.setEventProperties(allProperties);
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlClient.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlClient.java
index ae9d66e..b3c6f7d 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlClient.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlClient.java
@@ -23,11 +23,14 @@ import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.vocabulary.SO;
 
 import java.sql.*;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.streampipes.sdk.helpers.EpProperties.timestampProperty;
+
 public class MySqlClient {
 
   public static final String ID = "http://streampipes.org/adapter/specific/mysql";
@@ -102,10 +105,18 @@ public class MySqlClient {
     List<EventProperty> allProperties = new ArrayList<>();
 
     for (Column column : columns) {
-      allProperties.add(PrimitivePropertyBuilder
-              .create(column.getType(), column.getName())
-              .label(column.getName())
-              .build());
+      if (SO.DateTime.equals(column.getDomainProperty())) {
+        allProperties.add(PrimitivePropertyBuilder
+                .create(column.getType(), column.getName())
+                .label(column.getName())
+                .domainProperty(SO.DateTime)
+                .build());
+      } else {
+        allProperties.add(PrimitivePropertyBuilder
+                .create(column.getType(), column.getName())
+                .label(column.getName())
+                .build());
+      }
     }
 
     eventSchema.setEventProperties(allProperties);
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index ff3f539..71a357f 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -61,7 +61,8 @@ public class JdbcClient {
      * If no matching type is found, it is interpreted as a String (VARCHAR(255))
      */
     protected enum SqlAttribute {
-        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"), BOOLEAN("BOOLEAN");
+        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
+        BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
         private final String sqlName;
 
         SqlAttribute(String s) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Column.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Column.java
index 4b58c5f..942078b 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Column.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Column.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.sinks.databases.jvm.mysql;
 
 import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.SO;
 
 class Column {
     private Datatypes type;
@@ -50,8 +51,16 @@ class Column {
                 this.type = Datatypes.String;
                 def = "";
                 break;
+
+            case "date":
+            case "datetime":
+            case "time":
+            case "timestamp":
+            case "year":
+                this.type = Datatypes.Long;
+                def = System.currentTimeMillis();
+                break;
             default:
-                // date, datetime, time, timestamp, year
                 throw new IllegalArgumentException("Type " + type + " not supported.");
         }
         if (columnType.equals("tinyint(1)") || columnType.equals("bit(1)")) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index fd503a2..5b6de68 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.schema.EventPropertyNested;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.vocabulary.XSD;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -38,11 +39,13 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
     private MysqlParameters params;
     private static Logger LOG;
     private HashMap<String, Column> tableColumns;
+    private List<String> timestampKeys;
 
     @Override
     public void onInvocation(MysqlParameters params, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
 
         this.params = params;
+        this.timestampKeys = new ArrayList<>();
         LOG = params.getGraph().getLogger(Mysql.class);
 
         initializeJdbc(
@@ -112,6 +115,12 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
         }
     }
 
+    public static void main(String... args) {
+        long mil = System.currentTimeMillis();
+        java.sql.Timestamp timestamp = new java.sql.Timestamp(mil);
+
+        System.out.println(timestamp);
+    }
 
     @Override
     protected void save(final Event event) throws SpRuntimeException {
@@ -127,7 +136,14 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
                 if (event.getFieldByRuntimeName(s).getRawValue() instanceof String) {
                     sb2.append("\"").append(event.getFieldByRuntimeName(s).getRawValue().toString()).append("\", ");
                 } else {
-                    sb2.append(event.getFieldByRuntimeName(s).getRawValue().toString()).append(", ");
+                    //
+                    if (this.timestampKeys.contains(s)) {
+                        java.sql.Timestamp sqlTimestamp = new java.sql.Timestamp(event.getFieldByRuntimeName(s).getAsPrimitive().getAsLong());
+                        sb2.append("\"").append(sqlTimestamp).append("\", ");
+                    } else {
+                        sb2.append(event.getFieldByRuntimeName(s).getRawValue().toString()).append(", ");
+                    }
+
                 }
             }
             // Remove last comma
@@ -231,7 +247,14 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
 
                 // Adding the type of the property (e.g. "VARCHAR(255)")
                 if (property instanceof EventPropertyPrimitive) {
-                    s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+                    // If domain property is a timestamp
+                    if (property.getDomainProperties().stream().anyMatch(x ->
+                       SO.DateTime.equals(x.toString()))) {
+                        s.append(SqlAttribute.DATETIME);
+                        this.timestampKeys.add(property.getRuntimeName());
+                    } else {
+                        s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+                    }
                 } else {
                     // Must be an EventPropertyList then
                     s.append(SqlAttribute.getFromUri(XSD._string.toString()));