You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/04 10:38:33 UTC
[incubator-streampipes-extensions] branch dev updated: Fix to SQL
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 4c957de Fix to SQL
new 85da37d Merge pull request #23 from Madabaru/dev
4c957de is described below
commit 4c957de4f80b51b7dd4cc8bece78668ac20bdd1b
Author: Madabaru <jo...@axantu.com>
AuthorDate: Thu Jun 4 12:22:20 2020 +0200
Fix to SQL
---
.../connect/adapters/mysql/MySqlSetAdapter.java | 1 +
.../streampipes/sinks/databases/jvm/mysql/Mysql.java | 18 ++++++++----------
2 files changed, 9 insertions(+), 10 deletions(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java
index a321ffe..7c5fd62 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java
@@ -81,6 +81,7 @@ public class MySqlSetAdapter extends SpecificDataSetAdapter {
if (executed) {
ResultSet resultSet = statement.getResultSet();
while (resultSet.next()) {
+
// Retrieve by column name
Map<String, Object> event = new HashMap<>();
for (Column column : mySqlClient.getColumns()) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 8f7f247..7009970 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -32,6 +32,7 @@ import org.apache.streampipes.model.schema.EventProperty;
import java.sql.*;
import java.util.*;
+import java.util.Objects;
public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
@@ -102,12 +103,13 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
for (EventProperty property : eventProperties) {
if (this.tableColumns.get(property.getRuntimeName()) != null) {
if (property instanceof EventPropertyPrimitive) {
- if (property.getDomainProperties().stream().anyMatch(x ->
- SO.DateTime.equals(x.toString()))) {
- this.timestampKeys.add(property.getRuntimeName());
- }
Column col = this.tableColumns.get(property.getRuntimeName());
- if (((EventPropertyPrimitive) property).getRuntimeType().equals(col.getType().toString())) {
+ // Validate SQL-DateTime separately
+ if (property.getDomainProperties() != null && property.getDomainProperties().stream().anyMatch(x ->
+ SO.DateTime.equals(x.toString())) && col.getType().toString().equals("http://www.w3.org/2001/XMLSchema#long")) {
+ this.timestampKeys.add(property.getRuntimeName());
+ continue;
+ } else if (((EventPropertyPrimitive) property).getRuntimeType().equals(col.getType().toString())) {
continue;
} else {
throw new SpRuntimeException("Table '" + tableName + "' does not match the EventProperties");
@@ -197,10 +199,6 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
String columnName = resultSet.getString("COLUMN_NAME");
String dataType = resultSet.getString("DATA_TYPE");
String columnType = resultSet.getString("COLUMN_TYPE");
-
- System.out.println((dataType));
- System.out.println((columnType));
-
tableColumns.put(columnName, new Column(dataType, columnType));
} while (resultSet.next());
} else {
@@ -251,7 +249,7 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
// Adding the type of the property (e.g. "VARCHAR(255)")
if (property instanceof EventPropertyPrimitive) {
// If domain property is a timestamp
- if (property.getDomainProperties().stream().anyMatch(x ->
+ if (property.getDomainProperties() != null && property.getDomainProperties().stream().anyMatch(x ->
SO.DateTime.equals(x.toString()))) {
s.append(SqlAttribute.DATETIME);
this.timestampKeys.add(property.getRuntimeName());