You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/01 21:35:14 UTC
[incubator-streampipes-extensions] branch dev updated: Working on
SQL sink and adapters
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 4d781ce Working on SQL sink and adapters
4d781ce is described below
commit 4d781ce2583f050244478b42b727c5a1b0848227
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Jun 1 23:33:36 2020 +0200
Working on SQL sink and adapters
---
.../streampipes/connect/adapters/mysql/Column.java | 16 ++++++++++++-
.../connect/adapters/mysql/MySqlAdapter.java | 20 ++++++++++++----
.../connect/adapters/mysql/MySqlClient.java | 19 +++++++++++----
.../sinks/databases/jvm/jdbcclient/JdbcClient.java | 3 ++-
.../sinks/databases/jvm/mysql/Column.java | 11 ++++++++-
.../sinks/databases/jvm/mysql/Mysql.java | 27 ++++++++++++++++++++--
6 files changed, 83 insertions(+), 13 deletions(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/Column.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/Column.java
index 58c63e2..4b0a68e 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/Column.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/Column.java
@@ -19,11 +19,13 @@
package org.apache.streampipes.connect.adapters.mysql;
import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.SO;
class Column {
private String name;
private Datatypes type;
private Object def;
+ private String domainProperty;
Column(String name, String dataType, String columnType) {
this.name = name;
@@ -52,8 +54,16 @@ class Column {
this.type = Datatypes.String;
def = "";
break;
+ case "date":
+ case "datetime":
+ case "time":
+ case "timestamp":
+ case "year":
+ this.type = Datatypes.Long;
+ def = System.currentTimeMillis();
+ this.domainProperty = SO.DateTime;
+ break;
default:
- // date, datetime, time, timestamp, year
throw new IllegalArgumentException("Type " + type + " not supported.");
}
if (columnType.equals("tinyint(1)") || columnType.equals("bit(1)")) {
@@ -73,4 +83,8 @@ class Column {
public Object getDefault() {
return def;
}
+
+ public String getDomainProperty() {
+ return domainProperty;
+ }
}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java
index 4034b23..c88ac32 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.vocabulary.SO;
import java.io.IOException;
import java.io.Serializable;
@@ -43,6 +44,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static org.apache.streampipes.sdk.helpers.EpProperties.timestampProperty;
+
public class MySqlAdapter extends SpecificDataStreamAdapter {
public static final String ID = "http://streampipes.org/adapter/specific/mysql";
@@ -183,10 +186,19 @@ public class MySqlAdapter extends SpecificDataStreamAdapter {
extractTableInformation();
for (Column column : tableSchema) {
- allProperties.add(PrimitivePropertyBuilder
- .create(column.getType(), database + "." + table + "." + column.getName())
- .label(column.getName())
- .build());
+ if (SO.DateTime.equals(column.getDomainProperty())) {
+ allProperties.add(PrimitivePropertyBuilder
+ .create(column.getType(), column.getName())
+ .label(column.getName())
+ .domainProperty(SO.DateTime)
+ .build());
+ } else {
+ allProperties.add(PrimitivePropertyBuilder
+ .create(column.getType(), column.getName())
+ .label(column.getName())
+ .build());
+ }
+
}
eventSchema.setEventProperties(allProperties);
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlClient.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlClient.java
index ae9d66e..b3c6f7d 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlClient.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlClient.java
@@ -23,11 +23,14 @@ import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.vocabulary.SO;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.streampipes.sdk.helpers.EpProperties.timestampProperty;
+
public class MySqlClient {
public static final String ID = "http://streampipes.org/adapter/specific/mysql";
@@ -102,10 +105,18 @@ public class MySqlClient {
List<EventProperty> allProperties = new ArrayList<>();
for (Column column : columns) {
- allProperties.add(PrimitivePropertyBuilder
- .create(column.getType(), column.getName())
- .label(column.getName())
- .build());
+ if (SO.DateTime.equals(column.getDomainProperty())) {
+ allProperties.add(PrimitivePropertyBuilder
+ .create(column.getType(), column.getName())
+ .label(column.getName())
+ .domainProperty(SO.DateTime)
+ .build());
+ } else {
+ allProperties.add(PrimitivePropertyBuilder
+ .create(column.getType(), column.getName())
+ .label(column.getName())
+ .build());
+ }
}
eventSchema.setEventProperties(allProperties);
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index ff3f539..71a357f 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -61,7 +61,8 @@ public class JdbcClient {
* If no matching type is found, it is interpreted as a String (VARCHAR(255))
*/
protected enum SqlAttribute {
- INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"), BOOLEAN("BOOLEAN");
+ INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
+ BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
private final String sqlName;
SqlAttribute(String s) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Column.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Column.java
index 4b58c5f..942078b 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Column.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Column.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.sinks.databases.jvm.mysql;
import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.SO;
class Column {
private Datatypes type;
@@ -50,8 +51,16 @@ class Column {
this.type = Datatypes.String;
def = "";
break;
+
+ case "date":
+ case "datetime":
+ case "time":
+ case "timestamp":
+ case "year":
+ this.type = Datatypes.Long;
+ def = System.currentTimeMillis();
+ break;
default:
- // date, datetime, time, timestamp, year
throw new IllegalArgumentException("Type " + type + " not supported.");
}
if (columnType.equals("tinyint(1)") || columnType.equals("bit(1)")) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index fd503a2..5b6de68 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.vocabulary.XSD;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -38,11 +39,13 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
private MysqlParameters params;
private static Logger LOG;
private HashMap<String, Column> tableColumns;
+ private List<String> timestampKeys;
@Override
public void onInvocation(MysqlParameters params, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
this.params = params;
+ this.timestampKeys = new ArrayList<>();
LOG = params.getGraph().getLogger(Mysql.class);
initializeJdbc(
@@ -112,6 +115,12 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
}
}
+ public static void main(String... args) {
+ long mil = System.currentTimeMillis();
+ java.sql.Timestamp timestamp = new java.sql.Timestamp(mil);
+
+ System.out.println(timestamp);
+ }
@Override
protected void save(final Event event) throws SpRuntimeException {
@@ -127,7 +136,14 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
if (event.getFieldByRuntimeName(s).getRawValue() instanceof String) {
sb2.append("\"").append(event.getFieldByRuntimeName(s).getRawValue().toString()).append("\", ");
} else {
- sb2.append(event.getFieldByRuntimeName(s).getRawValue().toString()).append(", ");
+ //
+ if (this.timestampKeys.contains(s)) {
+ java.sql.Timestamp sqlTimestamp = new java.sql.Timestamp(event.getFieldByRuntimeName(s).getAsPrimitive().getAsLong());
+ sb2.append("\"").append(sqlTimestamp).append("\", ");
+ } else {
+ sb2.append(event.getFieldByRuntimeName(s).getRawValue().toString()).append(", ");
+ }
+
}
}
// Remove last comma
@@ -231,7 +247,14 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
// Adding the type of the property (e.g. "VARCHAR(255)")
if (property instanceof EventPropertyPrimitive) {
- s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+ // If domain property is a timestamp
+ if (property.getDomainProperties().stream().anyMatch(x ->
+ SO.DateTime.equals(x.toString()))) {
+ s.append(SqlAttribute.DATETIME);
+ this.timestampKeys.add(property.getRuntimeName());
+ } else {
+ s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+ }
} else {
// Must be an EventPropertyList then
s.append(SqlAttribute.getFromUri(XSD._string.toString()));