You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/25 14:38:29 UTC
[incubator-streampipes] 01/03: [hotfix] Remove MySQL connectors due to license conflict
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit a49dff311b4e6bba72c701fd325e2cc6236e4478
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Aug 25 15:21:53 2022 +0200
[hotfix] Remove MySQL connectors due to license conflict
---
.../connect/iiot/ConnectAdapterIiotInit.java | 4 -
.../connect/iiot/adapters/mysql/Column.java | 90 -------
.../connect/iiot/adapters/mysql/MySqlAdapter.java | 269 ---------------------
.../connect/iiot/adapters/mysql/MySqlClient.java | 215 ----------------
.../iiot/adapters/mysql/MySqlSetAdapter.java | 203 ----------------
.../iiot/adapters/mysql/MySqlStreamAdapter.java | 201 ---------------
.../streampipes-sinks-databases-jvm/pom.xml | 4 -
.../sinks/databases/jvm/DatabasesJvmInit.java | 4 +-
.../sinks/databases/jvm/mysql/Mysql.java | 91 -------
.../sinks/databases/jvm/mysql/MysqlController.java | 78 ------
.../sinks/databases/jvm/mysql/MysqlParameters.java | 41 ----
ui/cypress/tests/thirdparty/MySQLDb.ts | 56 -----
12 files changed, 1 insertion(+), 1255 deletions(-)
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
index 835cd732b..97f0df0ff 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
@@ -20,8 +20,6 @@ package org.apache.streampipes.connect.iiot;
import org.apache.streampipes.connect.iiot.adapters.influxdb.InfluxDbSetAdapter;
import org.apache.streampipes.connect.iiot.adapters.influxdb.InfluxDbStreamAdapter;
-import org.apache.streampipes.connect.iiot.adapters.mysql.MySqlSetAdapter;
-import org.apache.streampipes.connect.iiot.adapters.mysql.MySqlStreamAdapter;
import org.apache.streampipes.connect.iiot.adapters.opcua.OpcUaAdapter;
import org.apache.streampipes.connect.iiot.adapters.plc4x.modbus.Plc4xModbusAdapter;
import org.apache.streampipes.connect.iiot.adapters.plc4x.s7.Plc4xS7Adapter;
@@ -46,8 +44,6 @@ public class ConnectAdapterIiotInit extends ExtensionsModelSubmitter {
"StreamPipes connect worker containing adapters relevant for the IIoT",
"",
8001)
- .registerAdapter(new MySqlStreamAdapter())
- .registerAdapter(new MySqlSetAdapter())
.registerAdapter(new MachineDataStreamAdapter())
.registerAdapter(new RosBridgeAdapter())
.registerAdapter(new OpcUaAdapter())
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/Column.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/Column.java
deleted file mode 100644
index 08ee0c55d..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/Column.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.vocabulary.SO;
-
-class Column {
- private String name;
- private Datatypes type;
- private Object def;
- private String domainProperty;
-
- Column(String name, String dataType, String columnType) {
- this.name = name;
- switch (dataType) {
- case "tinyint":
- case "smallint":
- case "mediumint":
- case "int":
- case "bit":
- this.type = Datatypes.Integer;
- def = 0;
- break;
- case "bigint":
- this.type = Datatypes.Long;
- def = 0L;
- break;
- case "float":
- case "decimal": // Watch out for loss of precision
- case "double":
- this.type = Datatypes.Float;
- def = 0.0f;
- break;
- case "text":
- case "varchar":
- case "char":
- this.type = Datatypes.String;
- def = "";
- break;
- case "date":
- case "datetime":
- case "time":
- case "timestamp":
- case "year":
- this.type = Datatypes.Float;
- def = System.currentTimeMillis();
- this.domainProperty = SO.DateTime;
- break;
- default:
- throw new IllegalArgumentException("Type " + type + " not supported.");
- }
- if (columnType.equals("tinyint(1)") || columnType.equals("bit(1)")) {
- this.type = Datatypes.Boolean;
- def = Boolean.FALSE;
- }
- System.out.println("Found column: " + name + ", type: " + this.type + " (sql-type: "
- + dataType + ", column-tpye: " + columnType + ")");
- }
-
- public String getName() {
- return name;
- }
- public Datatypes getType() {
- return type;
- }
- public Object getDefault() {
- return def;
- }
-
- public String getDomainProperty() {
- return domainProperty;
- }
-}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlAdapter.java
deleted file mode 100644
index 59fecb94a..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlAdapter.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.event.*;
-import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.vocabulary.SO;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class MySqlAdapter extends SpecificDataStreamAdapter {
-
- public static final String ID = "http://streampipes.org/adapter/specific/mysql";
-
- private static String MYSQL_HOST = "MYSQL_HOST";
- private static String MYSQL_USER = "MYSQL_USER";
- private static String MYSQL_PASS = "MYSQL_PASS";
- private static String MYSQL_DB = "MYSQL_DB";
- private static String MYSQL_TABLE = "MYSQL_TABLE";
- private static String MYSQL_PORT = "MYSQL_PORT";
-
- private String host;
- private String user;
- private String pass;
- private String database;
- private String table;
- private String port;
-
- private boolean dataComing = false;
- private List<Column> tableSchema;
- private BinaryLogClient client;
-
- public MySqlAdapter() {
- }
-
- public MySqlAdapter(SpecificAdapterStreamDescription adapterDescription) {
- super(adapterDescription);
-
- getConfigurations(adapterDescription);
- }
-
- @Override
- public SpecificAdapterStreamDescription declareModel() {
- //TODO: Add Icon
- SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID,
- "MySql Adapter",
- "Creates a data stream for a SQL table")
- .iconUrl("sql.png")
- .requiredTextParameter(Labels.from(MYSQL_HOST, "Hostname", "Hostname of the MySql Server"))
- .requiredTextParameter(Labels.from(MYSQL_USER, "Username", "Username of the user"))
- .requiredTextParameter(Labels.from(MYSQL_PASS, "Password", "Password of the user"))
- .requiredTextParameter(Labels.from(MYSQL_DB, "Database", "Database in which the table is located"))
- .requiredTextParameter(Labels.from(MYSQL_TABLE, "Table", "Table which should be watched"))
- .requiredIntegerParameter(Labels.from(MYSQL_PORT, "Port", "Port of the MySql Server. Default: 3306"), 3306)
- .build();
-
- description.setAppId(ID);
- return description;
- }
-
- @Override
- public void startAdapter() throws AdapterException {
- checkJdbcDriver();
- extractTableInformation();
-
- // Connect BinaryLogClient
- client = new BinaryLogClient(host, Integer.parseInt(port), user, pass);
- EventDeserializer eventDeserializer = new EventDeserializer();
- eventDeserializer.setCompatibilityMode(
- EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
- EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
- );
- client.setEventDeserializer(eventDeserializer);
- client.registerEventListener(event -> sendEvent(event));
- try {
- client.connect();
- } catch (IOException e) {
- throw new AdapterException(e.getMessage());
- }
- }
-
- private void sendEvent(Event event) {
- // An event can contain multiple insertions/updates
- if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
- // Check table and database, if the next event should be streamed
- if (((TableMapEventData) event.getData()).getDatabase().equals(database)
- && ((TableMapEventData) event.getData()).getTable().equals((table))) {
- dataComing = true;
- }
- }
- if (dataComing) {
- if (EventType.isUpdate(event.getHeader().getEventType())) {
- for (Entry<Serializable[], Serializable[]> en : ((UpdateRowsEventData) event.getData()).getRows()) {
- sendChange(en.getValue());
- }
- dataComing = false;
- } else if (EventType.isWrite(event.getHeader().getEventType())) {
- for (Serializable[] s : ((WriteRowsEventData) event.getData()).getRows()) {
- sendChange(s);
- }
- dataComing = false;
- }
- }
- }
-
- private void sendChange(Serializable[] rows) {
- Map<String, Object> out = new HashMap<>();
- for (int i = 0; i < rows.length; i++) {
- if (rows[i] != null) {
- if (rows[i] instanceof byte[]) {
- // Strings are sent in byte arrays and have to be converted. TODO: Check that encoding is correct
- out.put(tableSchema.get(i).getName(), new String((byte[])rows[i]));
- } else {
- out.put(tableSchema.get(i).getName(), rows[i]);
- }
- } else {
- out.put(tableSchema.get(i).getName(), tableSchema.get(i).getDefault());
- }
- }
- adapterPipeline.process(out);
- }
-
- @Override
- public void stopAdapter() throws AdapterException {
- try {
- client.disconnect();
- } catch (IOException e) {
- throw new AdapterException("Thrown exception: " + e.getMessage());
- }
- }
-
- @Override
- public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
- return new MySqlAdapter(adapterDescription);
- }
-
- @Override
- public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription)
- throws AdapterException, ParseException {
- // Load JDBC Driver, connect JDBC Driver, Extract information, disconnect JDBC Driver
- EventSchema eventSchema = new EventSchema();
- GuessSchema guessSchema = new GuessSchema();
- List<EventProperty> allProperties = new ArrayList<>();
-
- getConfigurations(adapterDescription);
-
- checkJdbcDriver();
- extractTableInformation();
-
- for (Column column : tableSchema) {
- if (SO.DateTime.equals(column.getDomainProperty())) {
- allProperties.add(PrimitivePropertyBuilder
- .create(column.getType(), column.getName())
- .label(column.getName())
- .domainProperty(SO.DateTime)
- .build());
- } else {
- allProperties.add(PrimitivePropertyBuilder
- .create(column.getType(), column.getName())
- .label(column.getName())
- .build());
- }
-
- }
-
- eventSchema.setEventProperties(allProperties);
- guessSchema.setEventSchema(eventSchema);
-
- return guessSchema;
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- private void getConfigurations(SpecificAdapterStreamDescription adapterDescription) {
- ParameterExtractor extractor = new ParameterExtractor(adapterDescription.getConfig());
-
- this.host = extractor.singleValue(MYSQL_HOST, String.class);
- this.user = extractor.singleValue(MYSQL_USER, String.class);
- this.pass = extractor.singleValue(MYSQL_PASS, String.class);
- this.database = extractor.singleValue(MYSQL_DB, String.class);
- this.table = extractor.singleValue(MYSQL_TABLE, String.class);
- this.port = extractor.singleValue(MYSQL_PORT, String.class);
- }
-
- private void checkJdbcDriver() throws AdapterException {
- try {
- Class.forName("com.mysql.cj.jdbc.Driver");
- } catch (ClassNotFoundException e) {
- throw new AdapterException("MySql Driver not found.");
- }
- }
-
- private void extractTableInformation() throws AdapterException {
- String server = "jdbc:mysql://" + host + ":" + port + "/" + "?sslMode=DISABLED&allowPublicKeyRetrieval=true";
- ResultSet resultSet = null;
- tableSchema = new ArrayList<>();
-
- String query = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE FROM "
- + "INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA = ? ORDER BY "
- + "ORDINAL_POSITION ASC;";
-
- try (Connection con = DriverManager.getConnection(server, user, pass);
- PreparedStatement statement = con.prepareStatement(query)) {
-
- statement.setString(1, table);
- statement.setString(2, database);
- resultSet = statement.executeQuery();
-
- if (resultSet.next()) {
- do {
- String name = resultSet.getString("COLUMN_NAME");
- String dataType = resultSet.getString("DATA_TYPE");
- String columnType = resultSet.getString("COLUMN_TYPE");
- tableSchema.add(new Column(name, dataType, columnType));
- } while(resultSet.next());
- } else {
- // No columns found -> Table/Database does not exist
- throw new IllegalArgumentException("Database/table not found");
- }
- } catch (SQLException e) {
- throw new AdapterException("SqlException: " + e.getMessage()
- + ", Error code: " + e.getErrorCode()
- + ", SqlState: " + e.getSQLState());
- } finally {
- try {
- resultSet.close();
- } catch (Exception e) {}
- }
- }
-}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlClient.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlClient.java
deleted file mode 100644
index 251f16a5a..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlClient.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
-import org.apache.streampipes.vocabulary.SO;
-
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.List;
-
-public class MySqlClient {
-
- public static final String ID = "http://streampipes.org/adapter/specific/mysql";
-
- static final String HOST = "mysqlHost";
- static final String PORT = "mysqlPort";
- static final String DATABASE = "mysqlDatabase";
- static final String TABLE = "mysqlTable";
- static final String USER = "mysqlUser";
- static final String PASSWORD = "mysqlPassword";
-
- static final String REPLACE_NULL_VALUES = "replaceNullValues";
- static final String DO_REPLACE_NULL_VALUES = "doReplaceNullValues";
- static final String DO_NOT_REPLACE_NULL_VALUES = "doNotReplaceNullValues";
-
- private String host;
- private Integer port;
- private String database;
- private String table;
-
- private String username;
- private String password;
-
-
- private List<Column> columns;
-
- Connection connection;
-
- MySqlClient(String host,
- int port,
- String database,
- String table,
- String username,
- String password) {
- this.host = host;
- this.port = port;
- this.database = database;
- this.table = table;
- this.username = username;
- this.password = password;
-
- connection = null;
- }
-
- public void connect() throws AdapterException {
- checkJdbcDriver();
- String server = "jdbc:mysql://" + host + ":" + port + "/" + "?sslMode=DISABLED&allowPublicKeyRetrieval=true";
- try {
- connection = DriverManager.getConnection(server, username, password);
- } catch (SQLException e) {
- throw new AdapterException("Could not connect to server: " + e.getMessage());
- }
- }
-
- public void disconnect() throws AdapterException {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- throw new AdapterException("Error while disconnecting: " + e.getMessage());
- }
- connection = null;
- }
- }
-
- public GuessSchema getSchema() throws AdapterException {
- connect();
- loadColumns();
-
- EventSchema eventSchema = new EventSchema();
- GuessSchema guessSchema = new GuessSchema();
- List<EventProperty> allProperties = new ArrayList<>();
-
- for (Column column : columns) {
- if (SO.DateTime.equals(column.getDomainProperty())) {
- allProperties.add(PrimitivePropertyBuilder
- .create(column.getType(), column.getName())
- .label(column.getName())
- .domainProperty(SO.DateTime)
- .build());
- } else {
- allProperties.add(PrimitivePropertyBuilder
- .create(column.getType(), column.getName())
- .label(column.getName())
- .build());
- }
- }
-
- eventSchema.setEventProperties(allProperties);
- guessSchema.setEventSchema(eventSchema);
-
- disconnect();
- return guessSchema;
- }
-
- /**
- * Checks that the MySql-JDBC-Driver is "installed". Throws an AdapterException otherwise
- */
- private void checkJdbcDriver() throws AdapterException {
- try {
- Class.forName("com.mysql.cj.jdbc.Driver");
- } catch (ClassNotFoundException e) {
- throw new AdapterException("MySql Driver not found.");
- }
- }
-
- /**
- * Fills the columns with the columns from the SQL Table
- */
- public void loadColumns() throws AdapterException {
- if (connection == null) {
- throw new AdapterException("Client must be connected in order to load the columns");
- }
- ResultSet resultSet = null;
- columns = new ArrayList<>();
-
- String query = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE FROM "
- + "INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA = ? ORDER BY "
- + "ORDINAL_POSITION ASC;";
-
- try (PreparedStatement statement = connection.prepareStatement(query)) {
-
- statement.setString(1, table);
- statement.setString(2, database);
- resultSet = statement.executeQuery();
-
- if (resultSet.next()) {
- do {
- String name = resultSet.getString("COLUMN_NAME");
- String dataType = resultSet.getString("DATA_TYPE");
- String columnType = resultSet.getString("COLUMN_TYPE");
- columns.add(new Column(name, dataType, columnType));
- } while(resultSet.next());
- } else {
- // No columns found -> Table/Database does not exist
- throw new IllegalArgumentException("Database/table not found");
- }
- } catch (SQLException e) {
- throw new AdapterException("SqlException while loading columns: " + e.getMessage()
- + ", Error code: " + e.getErrorCode()
- + ", SqlState: " + e.getSQLState());
- } finally {
- try {
- resultSet.close();
- } catch (Exception e) {}
- }
- }
-
- public String getHost() {
- return host;
- }
-
- public Integer getPort() {
- return port;
- }
-
- public String getDatabase() {
- return database;
- }
-
- public String getTable() {
- return table;
- }
-
- public String getUsername() {
- return username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public List<Column> getColumns() {
- return columns;
- }
-
- public boolean isConnected() {
- return connection != null;
- }
-
- Connection getConnection() {
- return connection;
- }
-}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlSetAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlSetAdapter.java
deleted file mode 100644
index 87769ee7c..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlSetAdapter.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataSetAdapter;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataSetAdapterBuilder;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.sdk.utils.Assets;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MySqlSetAdapter extends SpecificDataSetAdapter {
-
- public static final String ID = "org.apache.streampipes.connect.iiot.adapters.mysql.set";
-
- private MySqlClient mySqlClient;
- private Thread fetchDataThread;
-
- private boolean replaceNullValues;
-
- public static class FetchDataThread implements Runnable {
-
- MySqlSetAdapter mySqlSetAdapter;
- MySqlClient mySqlClient;
-
- public FetchDataThread(MySqlSetAdapter mySqlSetAdapter) throws AdapterException {
- this.mySqlSetAdapter = mySqlSetAdapter;
- this.mySqlClient = mySqlSetAdapter.getMySqlClient();
-
- mySqlClient.connect();
- mySqlClient.loadColumns();
- }
-
- @Override
- public void run() {
- if (!mySqlClient.isConnected()) {
- System.out.println("Cannot start PollingThread, when the client is not connected");
- return;
- }
- // No batch approach like in the influx adapter due to the lack of a unique key in the table
- // Create the columnString:
- StringBuilder sb = new StringBuilder();
- for (Column column : mySqlClient.getColumns()) {
- sb.append(column.getName()).append(", ");
- }
- sb.setLength(Math.max(0, sb.length() - 2));
-
- String query = "SELECT " + sb.toString() + " FROM " + mySqlClient.getDatabase() + "." + mySqlClient.getTable();
-
- try (Statement statement = mySqlClient.getConnection().createStatement()) {
- boolean executed = statement.execute(query);
- if (executed) {
- ResultSet resultSet = statement.getResultSet();
- while (resultSet.next()) {
-
- // Retrieve by column name
- Map<String, Object> event = new HashMap<>();
- for (Column column : mySqlClient.getColumns()) {
- Object in = resultSet.getObject(column.getName());
- if (in == null) {
- if (mySqlSetAdapter.replaceNullValues) {
- in = column.getDefault();
- } else {
- // We do not want to send this event (replaceNullValues == false)
- event = null;
- break;
- }
- }
- event.put(column.getName(), in);
- }
- if (event != null) {
- mySqlSetAdapter.send(event);
- }
- }
- resultSet.close();
- }
- } catch (SQLException e) {
- System.out.println(e.getMessage());
- }
-
- try {
- mySqlClient.disconnect();
- } catch (AdapterException e) {
- e.printStackTrace();
- }
- }
- }
-
- public MySqlSetAdapter() {
- }
-
- public MySqlSetAdapter(SpecificAdapterSetDescription adapterDescription) {
- super(adapterDescription);
-
- getConfigurations(adapterDescription);
- }
-
-
- @Override
- public SpecificAdapterSetDescription declareModel() {
- SpecificAdapterSetDescription description = SpecificDataSetAdapterBuilder.create(ID)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .requiredTextParameter(Labels.withId(MySqlClient.HOST))
- .requiredIntegerParameter(Labels.withId(MySqlClient.PORT), 3306)
- .requiredTextParameter(Labels.withId(MySqlClient.DATABASE))
- .requiredTextParameter(Labels.withId(MySqlClient.TABLE))
- .requiredTextParameter(Labels.withId(MySqlClient.USER))
- .requiredSecret(Labels.withId(MySqlClient.PASSWORD))
- .requiredSingleValueSelection(Labels.withId(MySqlClient.REPLACE_NULL_VALUES),
- Options.from(
- new Tuple2<>("Yes", MySqlClient.DO_REPLACE_NULL_VALUES),
- new Tuple2<>("No", MySqlClient.DO_NOT_REPLACE_NULL_VALUES)))
- .build();
-
- description.setAppId(ID);
- return description;
- }
-
- @Override
- public void startAdapter() throws AdapterException {
- fetchDataThread = new Thread(new FetchDataThread(this));
- fetchDataThread.start();
- }
-
- @Override
- public void stopAdapter() throws AdapterException {
- fetchDataThread.interrupt();
- try {
- fetchDataThread.join();
- } catch (InterruptedException e) {
- throw new AdapterException("Unexpected Error while joining polling thread: " + e.getMessage());
- }
- }
-
- @Override
- public Adapter getInstance(SpecificAdapterSetDescription adapterDescription) {
- return new MySqlSetAdapter(adapterDescription);
- }
-
- @Override
- public GuessSchema getSchema(SpecificAdapterSetDescription adapterDescription) throws AdapterException, ParseException {
- getConfigurations(adapterDescription);
- return mySqlClient.getSchema();
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- private void send(Map<String, Object> map) {
- adapterPipeline.process(map);
- }
-
- private void getConfigurations(SpecificAdapterSetDescription adapterDescription) {
- ParameterExtractor extractor = new ParameterExtractor(adapterDescription.getConfig());
-
- String replace = extractor.selectedSingleValueInternalName(MySqlClient.REPLACE_NULL_VALUES);
- replaceNullValues = replace.equals(MySqlClient.DO_REPLACE_NULL_VALUES);
-
- mySqlClient = new MySqlClient(
- extractor.singleValue(MySqlClient.HOST, String.class),
- extractor.singleValue(MySqlClient.PORT, Integer.class),
- extractor.singleValue(MySqlClient.DATABASE, String.class),
- extractor.singleValue(MySqlClient.TABLE, String.class),
- extractor.singleValue(MySqlClient.USER, String.class),
- extractor.secretValue(MySqlClient.PASSWORD));
- }
-
- public MySqlClient getMySqlClient() {
- return mySqlClient;
- }
-}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlStreamAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlStreamAdapter.java
deleted file mode 100644
index c36e14a4c..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlStreamAdapter.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.event.*;
-import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.sdk.utils.Assets;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MySqlStreamAdapter extends SpecificDataStreamAdapter {
-
- public static final String ID = "org.apache.streampipes.connect.iiot.adapters.mysql.stream";
-
- private MySqlClient mySqlClient;
- private BinaryLogClient binaryLogClient;
-
- private Thread subscriptionThread = new Thread(()-> {
- try {
- binaryLogClient.connect();
- } catch (IOException e) {
- e.printStackTrace();
- }
- });
-
- private boolean replaceNullValues;
- private boolean dataComing = false;
-
- public MySqlStreamAdapter() {
- }
-
- public MySqlStreamAdapter(SpecificAdapterStreamDescription adapterDescription) {
- super(adapterDescription);
-
- getConfigurations(adapterDescription);
- }
-
- @Override
- public SpecificAdapterStreamDescription declareModel() {
- SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .requiredTextParameter(Labels.withId(MySqlClient.HOST))
- .requiredIntegerParameter(Labels.withId(MySqlClient.PORT), 3306)
- .requiredTextParameter(Labels.withId(MySqlClient.DATABASE))
- .requiredTextParameter(Labels.withId(MySqlClient.TABLE))
- .requiredTextParameter(Labels.withId(MySqlClient.USER))
- .requiredSecret(Labels.withId(MySqlClient.PASSWORD))
- .requiredSingleValueSelection(Labels.withId(MySqlClient.REPLACE_NULL_VALUES),
- Options.from(
- new Tuple2<>("Yes", MySqlClient.DO_REPLACE_NULL_VALUES),
- new Tuple2<>("No", MySqlClient.DO_NOT_REPLACE_NULL_VALUES)))
- .build();
-
- description.setAppId(ID);
- return description;
- }
-
- @Override
- public void startAdapter() throws AdapterException {
- // Making sure, that the columns are all loaded
- mySqlClient.connect();
- mySqlClient.loadColumns();
- mySqlClient.disconnect();
-
- // Connect BinaryLogClient
- binaryLogClient = new BinaryLogClient(
- mySqlClient.getHost(),
- mySqlClient.getPort(),
- mySqlClient.getUsername(),
- mySqlClient.getPassword());
-
- EventDeserializer eventDeserializer = new EventDeserializer();
- eventDeserializer.setCompatibilityMode(
- EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
- EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
- );
- binaryLogClient.setEventDeserializer(eventDeserializer);
- binaryLogClient.registerEventListener(event -> sendEvent(event));
- subscriptionThread.start();
- }
-
-
- private void sendEvent(Event event) {
- // An event can contain multiple insertions/updates
- if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
- // Check table and database, if the next event should be streamed
- if (((TableMapEventData) event.getData()).getDatabase().equals(mySqlClient.getDatabase())
- && ((TableMapEventData) event.getData()).getTable().equals((mySqlClient.getTable()))) {
- dataComing = true;
- }
- }
- if (dataComing) {
- if (EventType.isUpdate(event.getHeader().getEventType())) {
- for (Map.Entry<Serializable[], Serializable[]> en : ((UpdateRowsEventData) event.getData()).getRows()) {
- sendChange(en.getValue());
- }
- dataComing = false;
- } else if (EventType.isWrite(event.getHeader().getEventType())) {
- for (Serializable[] s : ((WriteRowsEventData) event.getData()).getRows()) {
- sendChange(s);
- }
- dataComing = false;
- }
- }
- }
-
- private void sendChange(Serializable[] rows) {
- Map<String, Object> out = new HashMap<>();
- for (int i = 0; i < rows.length; i++) {
- if (rows[i] != null) {
- if (rows[i] instanceof byte[]) {
- // Strings are sent in byte arrays and have to be converted.
- //TODO: Check that encoding is correct
- out.put(mySqlClient.getColumns().get(i).getName(), new String((byte[])rows[i]));
- } else {
- out.put(mySqlClient.getColumns().get(i).getName(), rows[i]);
- }
- } else if (replaceNullValues) {
- out.put(mySqlClient.getColumns().get(i).getName(), mySqlClient.getColumns().get(i).getDefault());
- } else {
- // We should skip events with null values
- return;
- }
- }
- adapterPipeline.process(out);
- }
-
- @Override
- public void stopAdapter() throws AdapterException {
- try {
- binaryLogClient.disconnect();
- subscriptionThread.join();
- } catch (IOException | InterruptedException e) {
- throw new AdapterException("Thrown exception: " + e.getMessage());
- }
- }
-
- @Override
- public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
- return new MySqlStreamAdapter(adapterDescription);
- }
-
- @Override
- public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
- getConfigurations(adapterDescription);
- return mySqlClient.getSchema();
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- private void getConfigurations(SpecificAdapterStreamDescription adapterDescription) {
- ParameterExtractor extractor = new ParameterExtractor(adapterDescription.getConfig());
-
- String replace = extractor.selectedSingleValueInternalName(MySqlClient.REPLACE_NULL_VALUES);
- replaceNullValues = replace.equals(MySqlClient.DO_REPLACE_NULL_VALUES);
-
- mySqlClient = new MySqlClient(
- extractor.singleValue(MySqlClient.HOST, String.class),
- extractor.singleValue(MySqlClient.PORT, Integer.class),
- extractor.singleValue(MySqlClient.DATABASE, String.class),
- extractor.singleValue(MySqlClient.TABLE, String.class),
- extractor.singleValue(MySqlClient.USER, String.class),
- extractor.secretValue(MySqlClient.PASSWORD));
- }
-}
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
index ce55b064d..ebf8bbc4d 100644
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
@@ -75,10 +75,6 @@
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- </dependency>
<!-- 3rd party dependencies to avoid convergence errors -->
<dependency>
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabasesJvmInit.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabasesJvmInit.java
index 528b53806..cb1a38abe 100644
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabasesJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabasesJvmInit.java
@@ -32,7 +32,6 @@ import org.apache.streampipes.sinks.databases.jvm.couchdb.CouchDbController;
import org.apache.streampipes.sinks.databases.jvm.ditto.DittoController;
import org.apache.streampipes.sinks.databases.jvm.influxdb.InfluxDbController;
import org.apache.streampipes.sinks.databases.jvm.iotdb.IotDbController;
-import org.apache.streampipes.sinks.databases.jvm.mysql.MysqlController;
import org.apache.streampipes.sinks.databases.jvm.opcua.UpcUaController;
import org.apache.streampipes.sinks.databases.jvm.postgresql.PostgreSqlController;
import org.apache.streampipes.sinks.databases.jvm.redis.RedisController;
@@ -56,8 +55,7 @@ public class DatabasesJvmInit extends StandaloneModelSubmitter {
new PostgreSqlController(),
new IotDbController(),
new DittoController(),
- new RedisController(),
- new MysqlController())
+ new RedisController())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
deleted file mode 100644
index 510d1fd87..000000000
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.databases.jvm.mysql;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.SupportedDbEngines;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-
-public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
-
- private MysqlParameters params;
- private final SupportedDbEngines dbEngine = SupportedDbEngines.MY_SQL;
-
- @Override
- public void onInvocation(MysqlParameters params, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
-
- this.params = params;
- Logger LOG = params.getGraph().getLogger(Mysql.class);
-
- initializeJdbc(
- params.getGraph().getInputStreams().get(0).getEventSchema(),
- params,
- dbEngine,
- LOG);
- }
-
-
- @Override
- public void onEvent(Event inputEvent) {
- try {
- save(inputEvent);
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
- }
-
-
- @Override
- public void onDetach() throws SpRuntimeException {
- closeAll();
- }
-
-
- @Override
- protected void ensureDatabaseExists(String databaseName) throws SpRuntimeException {
-
- String createStatement = "CREATE DATABASE IF NOT EXISTS ";
-
- ensureDatabaseExists(createStatement, databaseName);
-
- }
-
- @Override
- protected void extractTableInformation() throws SpRuntimeException {
-
- String query = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE FROM "
- + "INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA = ? ORDER BY "
- + "ORDINAL_POSITION ASC;";
-
- String[] queryParameter = new String[]{params.getDbTable(), params.getDbName()};
-
- this.tableDescription.extractTableInformation(
- this.statementHandler.preparedStatement, this.connection,
- query, queryParameter);
- }
-
-
-}
-
-
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlController.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlController.java
deleted file mode 100644
index 927ac7de9..000000000
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlController.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.databases.jvm.mysql;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class MysqlController extends StandaloneEventSinkDeclarer<MysqlParameters> {
-
- private static final String HOST_KEY = "host";
- private static final String USER_KEY = "user";
- private static final String PASSWORD_KEY = "password";
- private static final String DB_KEY = "db";
- private static final String TABLE_KEY = "table";
- private static final String PORT_KEY = "port";
-
- @Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.mysql")
- .category(DataSinkType.DATABASE)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
- .requiredIntegerParameter(Labels.withId(PORT_KEY), 3306)
- .requiredTextParameter(Labels.withId(USER_KEY), false, false)
- .requiredSecret(Labels.withId(PASSWORD_KEY))
- .requiredTextParameter(Labels.withId(DB_KEY), false, false)
- .requiredTextParameter(Labels.withId(TABLE_KEY), false, false)
- .build();
- }
-
- @Override
- public ConfiguredEventSink<MysqlParameters> onInvocation(DataSinkInvocation graph,
- DataSinkParameterExtractor extractor) {
-
- String host = extractor.singleValueParameter(HOST_KEY, String.class);
- String user = extractor.singleValueParameter(USER_KEY, String.class);
- String password = extractor.secretValue(PASSWORD_KEY);
- String db = extractor.singleValueParameter(DB_KEY, String.class);
- String table = extractor.singleValueParameter(TABLE_KEY, String.class);
- Integer port = extractor.singleValueParameter(PORT_KEY, Integer.class);
-
- // SSL connection is not yet implemented for MySQL client
- MysqlParameters params = new MysqlParameters(graph, host, user, password, db, table, port, false);
- return new ConfiguredEventSink<>(params, Mysql::new);
- }
-
-}
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlParameters.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlParameters.java
deleted file mode 100644
index 40180caa4..000000000
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlParameters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.databases.jvm.mysql;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.JdbcConnectionParameters;
-
-public class MysqlParameters extends JdbcConnectionParameters {
-
- public MysqlParameters(DataSinkInvocation graph, String mySqlHost, String mySqlUser, String mySqlPassword,
- String mySqlDb, String mySqlTable, Integer mySqlPort, boolean sslEnabled) {
- super(
- graph,
- mySqlHost,
- mySqlPort,
- mySqlDb,
- mySqlUser,
- mySqlPassword,
- mySqlTable,
- sslEnabled,
- null,
- false
- );
- }
-}
diff --git a/ui/cypress/tests/thirdparty/MySQLDb.ts b/ui/cypress/tests/thirdparty/MySQLDb.ts
deleted file mode 100644
index ac661ba41..000000000
--- a/ui/cypress/tests/thirdparty/MySQLDb.ts
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import { SpecificAdapterBuilder } from '../../support/builder/SpecificAdapterBuilder';
-import { PipelineElementBuilder } from '../../support/builder/PipelineElementBuilder';
-import { ThirdPartyIntegrationUtils } from '../../support/utils/ThirdPartyIntegrationUtils';
-import { PipelineElementInput } from '../../support/model/PipelineElementInput';
-import { ParameterUtils } from '../../support/utils/ParameterUtils';
-
-describe('Test MySQL Integration', () => {
- beforeEach('Setup Test', () => {
- cy.initStreamPipesTest();
- });
-
- it('Perform Test', () => {
- const dbName = 'cypressDatabase';
- const host: string = ParameterUtils.get('localhost', 'mysql');
-
- const sink: PipelineElementInput = PipelineElementBuilder.create('mysql_database')
- .addInput('input', 'host', host)
- .addInput('input', 'user', 'root')
- .addInput('input', 'password', '7uc4rAymrPhxv6a5')
- .addInput('input', 'db', 'sp')
- .addInput('input', 'table', dbName)
- .build();
-
- const adapter = SpecificAdapterBuilder
- .create('MySql_Stream_Adapter')
- .setName('MySQL Adapter')
- .setTimestampProperty('timestamp')
- .addInput('input', 'mysqlHost', host)
- .addInput('input', 'mysqlUser', 'root')
- .addInput('input', 'mysqlPassword', '7uc4rAymrPhxv6a5')
- .addInput('input', 'mysqlDatabase', 'sp')
- .addInput('input', 'mysqlTable', dbName)
- .build();
-
- ThirdPartyIntegrationUtils.runTest(sink, adapter);
- });
-
-});