You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/04 10:38:33 UTC

[incubator-streampipes-extensions] branch dev updated: Fix to SQL

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4c957de  Fix to SQL
     new 85da37d  Merge pull request #23 from Madabaru/dev
4c957de is described below

commit 4c957de4f80b51b7dd4cc8bece78668ac20bdd1b
Author: Madabaru <jo...@axantu.com>
AuthorDate: Thu Jun 4 12:22:20 2020 +0200

    Fix to SQL
---
 .../connect/adapters/mysql/MySqlSetAdapter.java        |  1 +
 .../streampipes/sinks/databases/jvm/mysql/Mysql.java   | 18 ++++++++----------
 2 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java
index a321ffe..7c5fd62 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java
@@ -81,6 +81,7 @@ public class MySqlSetAdapter extends SpecificDataSetAdapter {
                 if (executed) {
                     ResultSet resultSet = statement.getResultSet();
                     while (resultSet.next()) {
+
                         // Retrieve by column name
                         Map<String, Object> event = new HashMap<>();
                         for (Column column : mySqlClient.getColumns()) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 8f7f247..7009970 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -32,6 +32,7 @@ import org.apache.streampipes.model.schema.EventProperty;
 
 import java.sql.*;
 import java.util.*;
+import java.util.Objects;
 
 
 public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
@@ -102,12 +103,13 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
         for (EventProperty property : eventProperties) {
             if (this.tableColumns.get(property.getRuntimeName()) != null) {
                 if (property instanceof EventPropertyPrimitive) {
-                    if (property.getDomainProperties().stream().anyMatch(x ->
-                            SO.DateTime.equals(x.toString()))) {
-                        this.timestampKeys.add(property.getRuntimeName());
-                    }
                     Column col = this.tableColumns.get(property.getRuntimeName());
-                    if (((EventPropertyPrimitive) property).getRuntimeType().equals(col.getType().toString())) {
+                    // Validate SQL-DateTime separately
+                    if (property.getDomainProperties() != null && property.getDomainProperties().stream().anyMatch(x ->
+                            SO.DateTime.equals(x.toString())) && col.getType().toString().equals("http://www.w3.org/2001/XMLSchema#long")) {
+                        this.timestampKeys.add(property.getRuntimeName());
+                        continue;
+                    } else if (((EventPropertyPrimitive) property).getRuntimeType().equals(col.getType().toString())) {
                         continue;
                     } else {
                         throw new SpRuntimeException("Table '" + tableName + "' does not match the EventProperties");
@@ -197,10 +199,6 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
                     String columnName = resultSet.getString("COLUMN_NAME");
                     String dataType = resultSet.getString("DATA_TYPE");
                     String columnType = resultSet.getString("COLUMN_TYPE");
-
-                    System.out.println((dataType));
-                    System.out.println((columnType));
-
                     tableColumns.put(columnName, new Column(dataType, columnType));
                 } while (resultSet.next());
             } else {
@@ -251,7 +249,7 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
                 // Adding the type of the property (e.g. "VARCHAR(255)")
                 if (property instanceof EventPropertyPrimitive) {
                     // If domain property is a timestamp
-                    if (property.getDomainProperties().stream().anyMatch(x ->
+                    if (property.getDomainProperties() != null && property.getDomainProperties().stream().anyMatch(x ->
                        SO.DateTime.equals(x.toString()))) {
                         s.append(SqlAttribute.DATETIME);
                         this.timestampKeys.add(property.getRuntimeName());