You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/03 05:27:57 UTC
[incubator-streampipes-extensions] branch dev updated: Fix in
ValidateTable for MySQL Sink
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new d29c984 Fix in ValidateTable for MySQL Sink
new e66169e Merge pull request #21 from Madabaru/dev
d29c984 is described below
commit d29c9843898ec36c8b901ab3cb4faa773cbae66f
Author: Madabaru <jo...@axantu.com>
AuthorDate: Tue Jun 2 11:33:06 2020 +0200
Fix in ValidateTable for MySQL Sink
---
.../streampipes/sinks/databases/jvm/mysql/Mysql.java | 19 +++++++++++--------
1 file changed, 11 insertions(+), 8 deletions(-)
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 5b6de68..8f7f247 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -100,9 +100,13 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
extractTableInformation();
for (EventProperty property : eventProperties) {
- if (tableColumns.get(property.getRuntimeName()) != null) {
+ if (this.tableColumns.get(property.getRuntimeName()) != null) {
if (property instanceof EventPropertyPrimitive) {
- Column col = tableColumns.get(property.getRuntimeName());
+ if (property.getDomainProperties().stream().anyMatch(x ->
+ SO.DateTime.equals(x.toString()))) {
+ this.timestampKeys.add(property.getRuntimeName());
+ }
+ Column col = this.tableColumns.get(property.getRuntimeName());
if (((EventPropertyPrimitive) property).getRuntimeType().equals(col.getType().toString())) {
continue;
} else {
@@ -115,16 +119,11 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
}
}
- public static void main(String... args) {
- long mil = System.currentTimeMillis();
- java.sql.Timestamp timestamp = new java.sql.Timestamp(mil);
-
- System.out.println(timestamp);
- }
@Override
protected void save(final Event event) throws SpRuntimeException {
checkConnected();
+
try {
Statement statement;
statement = c.createStatement();
@@ -198,6 +197,10 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
String columnName = resultSet.getString("COLUMN_NAME");
String dataType = resultSet.getString("DATA_TYPE");
String columnType = resultSet.getString("COLUMN_TYPE");
+
+ System.out.println((dataType));
+ System.out.println((columnType));
+
tableColumns.put(columnName, new Column(dataType, columnType));
} while (resultSet.next());
} else {