You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/03 05:27:57 UTC

[incubator-streampipes-extensions] branch dev updated: Fix in ValidateTable for MySQL Sink

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new d29c984  Fix in ValidateTable  for MySQL Sink
     new e66169e  Merge pull request #21 from Madabaru/dev
d29c984 is described below

commit d29c9843898ec36c8b901ab3cb4faa773cbae66f
Author: Madabaru <jo...@axantu.com>
AuthorDate: Tue Jun 2 11:33:06 2020 +0200

    Fix in ValidateTable  for MySQL Sink
---
 .../streampipes/sinks/databases/jvm/mysql/Mysql.java  | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 5b6de68..8f7f247 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -100,9 +100,13 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
         extractTableInformation();
 
         for (EventProperty property : eventProperties) {
-            if (tableColumns.get(property.getRuntimeName()) != null) {
+            if (this.tableColumns.get(property.getRuntimeName()) != null) {
                 if (property instanceof EventPropertyPrimitive) {
-                    Column col = tableColumns.get(property.getRuntimeName());
+                    if (property.getDomainProperties().stream().anyMatch(x ->
+                            SO.DateTime.equals(x.toString()))) {
+                        this.timestampKeys.add(property.getRuntimeName());
+                    }
+                    Column col = this.tableColumns.get(property.getRuntimeName());
                     if (((EventPropertyPrimitive) property).getRuntimeType().equals(col.getType().toString())) {
                         continue;
                     } else {
@@ -115,16 +119,11 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
         }
     }
 
-    public static void main(String... args) {
-        long mil = System.currentTimeMillis();
-        java.sql.Timestamp timestamp = new java.sql.Timestamp(mil);
-
-        System.out.println(timestamp);
-    }
 
     @Override
     protected void save(final Event event) throws SpRuntimeException {
         checkConnected();
+
         try {
             Statement statement;
             statement = c.createStatement();
@@ -198,6 +197,10 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
                     String columnName = resultSet.getString("COLUMN_NAME");
                     String dataType = resultSet.getString("DATA_TYPE");
                     String columnType = resultSet.getString("COLUMN_TYPE");
+
+                    System.out.println((dataType));
+                    System.out.println((columnType));
+
                     tableColumns.put(columnName, new Column(dataType, columnType));
                 } while (resultSet.next());
             } else {