You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/25 14:39:47 UTC

[incubator-streampipes] branch dev updated (68a9c4461 -> 0f5e68a31)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


    from 68a9c4461 Merge pull request #106 from apache/STREAMPIPES-579
     new fcb325599 [hotfix] Remove MySQL connectors due to license conflict
     new b84110b04 [hotfix] Improve resilience of OPC-UA adapter in subscription mode
     new 0f5e68a31 [hotfix] Add button to delete an adapter description

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../master/management/DescriptionManagement.java   |  19 ++
 .../streampipes-connect-adapters-iiot/pom.xml      |   4 -
 .../connect/iiot/ConnectAdapterIiotInit.java       |   4 -
 .../connect/iiot/adapters/mysql/Column.java        |  90 -------
 .../connect/iiot/adapters/mysql/MySqlAdapter.java  | 269 ---------------------
 .../connect/iiot/adapters/mysql/MySqlClient.java   | 215 ----------------
 .../iiot/adapters/mysql/MySqlSetAdapter.java       | 203 ----------------
 .../iiot/adapters/mysql/MySqlStreamAdapter.java    | 201 ---------------
 .../connect/iiot/adapters/opcua/SpOpcUaClient.java |  22 +-
 .../streampipes-sinks-databases-jvm/pom.xml        |   4 -
 .../sinks/databases/jvm/DatabasesJvmInit.java      |   4 +-
 .../sinks/databases/jvm/mysql/Mysql.java           |  91 -------
 .../sinks/databases/jvm/mysql/MysqlController.java |  78 ------
 .../sinks/databases/jvm/mysql/MysqlParameters.java |  41 ----
 .../rest/impl/connect/DescriptionResource.java     |  17 +-
 ui/cypress/tests/thirdparty/MySQLDb.ts             |  56 -----
 .../src/lib/apis/adapter.service.ts                |   5 +-
 .../adapter-description.component.html             |  14 +-
 .../adapter-description.component.ts               |  12 +-
 .../data-marketplace.component.html                |   1 +
 ui/src/app/connect/connect.module.ts               |   2 +
 21 files changed, 84 insertions(+), 1268 deletions(-)
 delete mode 100644 streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/Column.java
 delete mode 100644 streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlAdapter.java
 delete mode 100644 streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlClient.java
 delete mode 100644 streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlSetAdapter.java
 delete mode 100644 streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlStreamAdapter.java
 delete mode 100644 streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
 delete mode 100644 streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlController.java
 delete mode 100644 streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlParameters.java
 delete mode 100644 ui/cypress/tests/thirdparty/MySQLDb.ts


[incubator-streampipes] 01/03: [hotfix] Remove MySQL connectors due to license conflict

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit fcb325599f6a857fc71a2e7574f9bc8cf8fa40ba
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Aug 25 15:21:53 2022 +0200

    [hotfix] Remove MySQL connectors due to license conflict
---
 .../connect/iiot/ConnectAdapterIiotInit.java       |   4 -
 .../connect/iiot/adapters/mysql/Column.java        |  90 -------
 .../connect/iiot/adapters/mysql/MySqlAdapter.java  | 269 ---------------------
 .../connect/iiot/adapters/mysql/MySqlClient.java   | 215 ----------------
 .../iiot/adapters/mysql/MySqlSetAdapter.java       | 203 ----------------
 .../iiot/adapters/mysql/MySqlStreamAdapter.java    | 201 ---------------
 .../streampipes-sinks-databases-jvm/pom.xml        |   4 -
 .../sinks/databases/jvm/DatabasesJvmInit.java      |   4 +-
 .../sinks/databases/jvm/mysql/Mysql.java           |  91 -------
 .../sinks/databases/jvm/mysql/MysqlController.java |  78 ------
 .../sinks/databases/jvm/mysql/MysqlParameters.java |  41 ----
 ui/cypress/tests/thirdparty/MySQLDb.ts             |  56 -----
 12 files changed, 1 insertion(+), 1255 deletions(-)

diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
index 835cd732b..97f0df0ff 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
@@ -20,8 +20,6 @@ package org.apache.streampipes.connect.iiot;
 
 import org.apache.streampipes.connect.iiot.adapters.influxdb.InfluxDbSetAdapter;
 import org.apache.streampipes.connect.iiot.adapters.influxdb.InfluxDbStreamAdapter;
-import org.apache.streampipes.connect.iiot.adapters.mysql.MySqlSetAdapter;
-import org.apache.streampipes.connect.iiot.adapters.mysql.MySqlStreamAdapter;
 import org.apache.streampipes.connect.iiot.adapters.opcua.OpcUaAdapter;
 import org.apache.streampipes.connect.iiot.adapters.plc4x.modbus.Plc4xModbusAdapter;
 import org.apache.streampipes.connect.iiot.adapters.plc4x.s7.Plc4xS7Adapter;
@@ -46,8 +44,6 @@ public class ConnectAdapterIiotInit extends ExtensionsModelSubmitter {
 						"StreamPipes connect worker containing adapters relevant for the IIoT",
 						"",
 						8001)
-				.registerAdapter(new MySqlStreamAdapter())
-				.registerAdapter(new MySqlSetAdapter())
 				.registerAdapter(new MachineDataStreamAdapter())
 				.registerAdapter(new RosBridgeAdapter())
 				.registerAdapter(new OpcUaAdapter())
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/Column.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/Column.java
deleted file mode 100644
index 08ee0c55d..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/Column.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.vocabulary.SO;
-
-class Column {
-  private String name;
-  private Datatypes type;
-  private Object def;
-  private String domainProperty;
-
-  Column(String name, String dataType, String columnType) {
-    this.name = name;
-    switch (dataType) {
-      case "tinyint":
-      case "smallint":
-      case "mediumint":
-      case "int":
-      case "bit":
-        this.type = Datatypes.Integer;
-        def = 0;
-        break;
-      case "bigint":
-        this.type = Datatypes.Long;
-        def = 0L;
-        break;
-      case "float":
-      case "decimal":   // Watch out for loss of precision
-      case "double":
-        this.type = Datatypes.Float;
-        def = 0.0f;
-        break;
-      case "text":
-      case "varchar":
-      case "char":
-        this.type = Datatypes.String;
-        def = "";
-        break;
-      case "date":
-      case "datetime":
-      case "time":
-      case "timestamp":
-      case "year":
-        this.type = Datatypes.Float;
-        def = System.currentTimeMillis();
-        this.domainProperty = SO.DateTime;
-        break;
-      default:
-        throw new IllegalArgumentException("Type " + type + " not supported.");
-    }
-    if (columnType.equals("tinyint(1)") || columnType.equals("bit(1)")) {
-      this.type = Datatypes.Boolean;
-      def = Boolean.FALSE;
-    }
-    System.out.println("Found column: " + name + ", type: " + this.type + " (sql-type: "
-        + dataType + ", column-tpye: " + columnType + ")");
-  }
-
-  public String getName() {
-    return name;
-  }
-  public Datatypes getType() {
-    return type;
-  }
-  public Object getDefault() {
-    return def;
-  }
-
-  public String getDomainProperty() {
-    return domainProperty;
-  }
-}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlAdapter.java
deleted file mode 100644
index 59fecb94a..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlAdapter.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.event.*;
-import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.vocabulary.SO;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class MySqlAdapter extends SpecificDataStreamAdapter {
-
-  public static final String ID = "http://streampipes.org/adapter/specific/mysql";
-
-  private static String MYSQL_HOST = "MYSQL_HOST";
-  private static String MYSQL_USER = "MYSQL_USER";
-  private static String MYSQL_PASS = "MYSQL_PASS";
-  private static String MYSQL_DB = "MYSQL_DB";
-  private static String MYSQL_TABLE = "MYSQL_TABLE";
-  private static String MYSQL_PORT = "MYSQL_PORT";
-
-  private String host;
-  private String user;
-  private String pass;
-  private String database;
-  private String table;
-  private String port;
-
-  private boolean dataComing = false;
-  private List<Column> tableSchema;
-  private BinaryLogClient client;
-
-  public MySqlAdapter() {
-  }
-
-  public MySqlAdapter(SpecificAdapterStreamDescription adapterDescription) {
-    super(adapterDescription);
-
-    getConfigurations(adapterDescription);
-  }
-
-  @Override
-  public SpecificAdapterStreamDescription declareModel() {
-    //TODO: Add Icon
-    SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID,
-            "MySql Adapter",
-            "Creates a data stream for a SQL table")
-            .iconUrl("sql.png")
-            .requiredTextParameter(Labels.from(MYSQL_HOST, "Hostname", "Hostname of the MySql Server"))
-            .requiredTextParameter(Labels.from(MYSQL_USER, "Username", "Username of the user"))
-            .requiredTextParameter(Labels.from(MYSQL_PASS, "Password", "Password of the user"))
-            .requiredTextParameter(Labels.from(MYSQL_DB, "Database", "Database in which the table is located"))
-            .requiredTextParameter(Labels.from(MYSQL_TABLE, "Table", "Table which should be watched"))
-            .requiredIntegerParameter(Labels.from(MYSQL_PORT, "Port", "Port of the MySql Server. Default: 3306"), 3306)
-            .build();
-
-    description.setAppId(ID);
-    return  description;
-  }
-
-  @Override
-  public void startAdapter() throws AdapterException {
-    checkJdbcDriver();
-    extractTableInformation();
-
-    // Connect BinaryLogClient
-    client = new BinaryLogClient(host, Integer.parseInt(port), user, pass);
-    EventDeserializer eventDeserializer = new EventDeserializer();
-    eventDeserializer.setCompatibilityMode(
-            EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
-            EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
-    );
-    client.setEventDeserializer(eventDeserializer);
-    client.registerEventListener(event -> sendEvent(event));
-    try {
-      client.connect();
-    } catch (IOException e) {
-      throw new AdapterException(e.getMessage());
-    }
-  }
-
-  private void sendEvent(Event event) {
-    // An event can contain multiple insertions/updates
-    if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
-      // Check table and database, if the next event should be streamed
-      if (((TableMapEventData) event.getData()).getDatabase().equals(database)
-              && ((TableMapEventData) event.getData()).getTable().equals((table))) {
-        dataComing = true;
-      }
-    }
-    if (dataComing) {
-      if (EventType.isUpdate(event.getHeader().getEventType())) {
-        for (Entry<Serializable[], Serializable[]> en : ((UpdateRowsEventData) event.getData()).getRows()) {
-          sendChange(en.getValue());
-        }
-        dataComing = false;
-      } else if (EventType.isWrite(event.getHeader().getEventType())) {
-        for (Serializable[] s : ((WriteRowsEventData) event.getData()).getRows()) {
-          sendChange(s);
-        }
-        dataComing = false;
-      }
-    }
-  }
-
-  private void sendChange(Serializable[] rows) {
-    Map<String, Object> out = new HashMap<>();
-    for (int i = 0; i < rows.length; i++) {
-      if (rows[i] != null) {
-        if (rows[i] instanceof byte[]) {
-          // Strings are sent in byte arrays and have to be converted. TODO: Check that encoding is correct
-          out.put(tableSchema.get(i).getName(), new String((byte[])rows[i]));
-        } else {
-          out.put(tableSchema.get(i).getName(), rows[i]);
-        }
-      } else {
-        out.put(tableSchema.get(i).getName(), tableSchema.get(i).getDefault());
-      }
-    }
-    adapterPipeline.process(out);
-  }
-
-  @Override
-  public void stopAdapter() throws AdapterException {
-    try {
-      client.disconnect();
-    } catch (IOException e) {
-      throw new AdapterException("Thrown exception: " + e.getMessage());
-    }
-  }
-
-  @Override
-  public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
-    return new MySqlAdapter(adapterDescription);
-  }
-
-  @Override
-  public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription)
-          throws AdapterException, ParseException {
-    // Load JDBC Driver, connect JDBC Driver, Extract information, disconnect JDBC Driver
-    EventSchema eventSchema = new EventSchema();
-    GuessSchema guessSchema = new GuessSchema();
-    List<EventProperty> allProperties = new ArrayList<>();
-
-    getConfigurations(adapterDescription);
-
-    checkJdbcDriver();
-    extractTableInformation();
-
-    for (Column column : tableSchema) {
-      if (SO.DateTime.equals(column.getDomainProperty())) {
-        allProperties.add(PrimitivePropertyBuilder
-                .create(column.getType(), column.getName())
-                .label(column.getName())
-                .domainProperty(SO.DateTime)
-                .build());
-      } else {
-        allProperties.add(PrimitivePropertyBuilder
-                .create(column.getType(), column.getName())
-                .label(column.getName())
-                .build());
-      }
-
-    }
-
-    eventSchema.setEventProperties(allProperties);
-    guessSchema.setEventSchema(eventSchema);
-
-    return guessSchema;
-  }
-
-  @Override
-  public String getId() {
-    return ID;
-  }
-
-  private void getConfigurations(SpecificAdapterStreamDescription adapterDescription) {
-    ParameterExtractor extractor = new ParameterExtractor(adapterDescription.getConfig());
-
-    this.host = extractor.singleValue(MYSQL_HOST, String.class);
-    this.user = extractor.singleValue(MYSQL_USER, String.class);
-    this.pass = extractor.singleValue(MYSQL_PASS, String.class);
-    this.database = extractor.singleValue(MYSQL_DB, String.class);
-    this.table = extractor.singleValue(MYSQL_TABLE, String.class);
-    this.port = extractor.singleValue(MYSQL_PORT, String.class);
-  }
-
-  private void checkJdbcDriver() throws AdapterException {
-    try {
-      Class.forName("com.mysql.cj.jdbc.Driver");
-    } catch (ClassNotFoundException e) {
-      throw new AdapterException("MySql Driver not found.");
-    }
-  }
-
-  private void extractTableInformation() throws AdapterException {
-    String server = "jdbc:mysql://" + host + ":" + port + "/" + "?sslMode=DISABLED&allowPublicKeyRetrieval=true";
-    ResultSet resultSet = null;
-    tableSchema = new ArrayList<>();
-
-    String query = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE FROM "
-            + "INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA = ? ORDER BY "
-            + "ORDINAL_POSITION ASC;";
-
-    try (Connection con = DriverManager.getConnection(server, user, pass);
-         PreparedStatement statement = con.prepareStatement(query)) {
-
-      statement.setString(1, table);
-      statement.setString(2, database);
-      resultSet = statement.executeQuery();
-
-      if (resultSet.next()) {
-        do {
-          String name = resultSet.getString("COLUMN_NAME");
-          String dataType = resultSet.getString("DATA_TYPE");
-          String columnType = resultSet.getString("COLUMN_TYPE");
-          tableSchema.add(new Column(name, dataType, columnType));
-        } while(resultSet.next());
-      } else {
-        // No columns found -> Table/Database does not exist
-        throw new IllegalArgumentException("Database/table not found");
-      }
-    } catch (SQLException e) {
-      throw new AdapterException("SqlException: " + e.getMessage()
-              + ", Error code: " + e.getErrorCode()
-              + ", SqlState: " + e.getSQLState());
-    } finally {
-      try {
-        resultSet.close();
-      } catch (Exception e) {}
-    }
-  }
-}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlClient.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlClient.java
deleted file mode 100644
index 251f16a5a..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlClient.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
-import org.apache.streampipes.vocabulary.SO;
-
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.List;
-
-public class MySqlClient {
-
-  public static final String ID = "http://streampipes.org/adapter/specific/mysql";
-
-  static final String HOST = "mysqlHost";
-  static final String PORT = "mysqlPort";
-  static final String DATABASE = "mysqlDatabase";
-  static final String TABLE = "mysqlTable";
-  static final String USER = "mysqlUser";
-  static final String PASSWORD = "mysqlPassword";
-
-  static final String REPLACE_NULL_VALUES = "replaceNullValues";
-  static final String DO_REPLACE_NULL_VALUES = "doReplaceNullValues";
-  static final String DO_NOT_REPLACE_NULL_VALUES = "doNotReplaceNullValues";
-
-  private String host;
-  private Integer port;
-  private String database;
-  private String table;
-
-  private String username;
-  private String password;
-
-
-  private List<Column> columns;
-
-  Connection connection;
-
-  MySqlClient(String host,
-              int port,
-              String database,
-              String table,
-              String username,
-              String password) {
-    this.host = host;
-    this.port = port;
-    this.database = database;
-    this.table = table;
-    this.username = username;
-    this.password = password;
-
-    connection = null;
-  }
-
-  public void connect() throws AdapterException {
-    checkJdbcDriver();
-    String server = "jdbc:mysql://" + host + ":" + port + "/" + "?sslMode=DISABLED&allowPublicKeyRetrieval=true";
-    try {
-      connection = DriverManager.getConnection(server, username, password);
-    } catch (SQLException e) {
-      throw new AdapterException("Could not connect to server: " + e.getMessage());
-    }
-  }
-
-  public void disconnect() throws AdapterException {
-    if (connection != null) {
-      try {
-        connection.close();
-      } catch (SQLException e) {
-        throw new AdapterException("Error while disconnecting: " + e.getMessage());
-      }
-      connection = null;
-    }
-  }
-
-  public GuessSchema getSchema() throws AdapterException {
-    connect();
-    loadColumns();
-
-    EventSchema eventSchema = new EventSchema();
-    GuessSchema guessSchema = new GuessSchema();
-    List<EventProperty> allProperties = new ArrayList<>();
-
-    for (Column column : columns) {
-      if (SO.DateTime.equals(column.getDomainProperty())) {
-        allProperties.add(PrimitivePropertyBuilder
-                .create(column.getType(), column.getName())
-                .label(column.getName())
-                .domainProperty(SO.DateTime)
-                .build());
-      } else {
-        allProperties.add(PrimitivePropertyBuilder
-                .create(column.getType(), column.getName())
-                .label(column.getName())
-                .build());
-      }
-    }
-
-    eventSchema.setEventProperties(allProperties);
-    guessSchema.setEventSchema(eventSchema);
-
-    disconnect();
-    return guessSchema;
-  }
-
-  /**
-   * Checks that the MySql-JDBC-Driver is "installed". Throws an AdapterException otherwise
-   */
-  private void checkJdbcDriver() throws AdapterException {
-    try {
-      Class.forName("com.mysql.cj.jdbc.Driver");
-    } catch (ClassNotFoundException e) {
-      throw new AdapterException("MySql Driver not found.");
-    }
-  }
-
-  /**
-   * Fills the columns with the columns from the SQL Table
-   */
-  public void loadColumns() throws AdapterException {
-    if (connection == null) {
-      throw new AdapterException("Client must be connected in order to load the columns");
-    }
-    ResultSet resultSet = null;
-    columns = new ArrayList<>();
-
-    String query = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE FROM "
-            + "INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA = ? ORDER BY "
-            + "ORDINAL_POSITION ASC;";
-
-    try (PreparedStatement statement = connection.prepareStatement(query)) {
-
-      statement.setString(1, table);
-      statement.setString(2, database);
-      resultSet = statement.executeQuery();
-
-      if (resultSet.next()) {
-        do {
-          String name = resultSet.getString("COLUMN_NAME");
-          String dataType = resultSet.getString("DATA_TYPE");
-          String columnType = resultSet.getString("COLUMN_TYPE");
-          columns.add(new Column(name, dataType, columnType));
-        } while(resultSet.next());
-      } else {
-        // No columns found -> Table/Database does not exist
-        throw new IllegalArgumentException("Database/table not found");
-      }
-    } catch (SQLException e) {
-      throw new AdapterException("SqlException while loading columns: " + e.getMessage()
-              + ", Error code: " + e.getErrorCode()
-              + ", SqlState: " + e.getSQLState());
-    } finally {
-      try {
-        resultSet.close();
-      } catch (Exception e) {}
-    }
-  }
-
-  public String getHost() {
-    return host;
-  }
-
-  public Integer getPort() {
-    return port;
-  }
-
-  public String getDatabase() {
-    return database;
-  }
-
-  public String getTable() {
-    return table;
-  }
-
-  public String getUsername() {
-    return username;
-  }
-
-  public String getPassword() {
-    return password;
-  }
-
-  public List<Column> getColumns() {
-    return columns;
-  }
-
-  public boolean isConnected() {
-    return connection != null;
-  }
-
-  Connection getConnection() {
-    return connection;
-  }
-}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlSetAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlSetAdapter.java
deleted file mode 100644
index 87769ee7c..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlSetAdapter.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataSetAdapter;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataSetAdapterBuilder;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.sdk.utils.Assets;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MySqlSetAdapter extends SpecificDataSetAdapter {
-
-    public static final String ID = "org.apache.streampipes.connect.iiot.adapters.mysql.set";
-
-    private MySqlClient mySqlClient;
-    private Thread fetchDataThread;
-
-    private boolean replaceNullValues;
-
-    public static class FetchDataThread implements Runnable {
-
-        MySqlSetAdapter mySqlSetAdapter;
-        MySqlClient mySqlClient;
-
-        public FetchDataThread(MySqlSetAdapter mySqlSetAdapter) throws AdapterException {
-            this.mySqlSetAdapter = mySqlSetAdapter;
-            this.mySqlClient = mySqlSetAdapter.getMySqlClient();
-
-            mySqlClient.connect();
-            mySqlClient.loadColumns();
-        }
-
-        @Override
-        public void run() {
-            if (!mySqlClient.isConnected()) {
-                System.out.println("Cannot start PollingThread, when the client is not connected");
-                return;
-            }
-            // No batch approach like in the influx adapter due to the lack of a unique key in the table
-            // Create the columnString:
-            StringBuilder sb = new StringBuilder();
-            for (Column column : mySqlClient.getColumns()) {
-                sb.append(column.getName()).append(", ");
-            }
-            sb.setLength(Math.max(0, sb.length() - 2));
-
-            String query = "SELECT " + sb.toString() + " FROM " + mySqlClient.getDatabase() + "." + mySqlClient.getTable();
-
-            try (Statement statement = mySqlClient.getConnection().createStatement()) {
-                boolean executed = statement.execute(query);
-                if (executed) {
-                    ResultSet resultSet = statement.getResultSet();
-                    while (resultSet.next()) {
-
-                        // Retrieve by column name
-                        Map<String, Object> event = new HashMap<>();
-                        for (Column column : mySqlClient.getColumns()) {
-                            Object in = resultSet.getObject(column.getName());
-                            if (in == null) {
-                                if (mySqlSetAdapter.replaceNullValues) {
-                                    in = column.getDefault();
-                                } else {
-                                    // We do not want to send this event (replaceNullValues == false)
-                                    event = null;
-                                    break;
-                                }
-                            }
-                            event.put(column.getName(), in);
-                        }
-                        if (event != null) {
-                            mySqlSetAdapter.send(event);
-                        }
-                    }
-                    resultSet.close();
-                }
-            } catch (SQLException e) {
-                System.out.println(e.getMessage());
-            }
-
-            try {
-                mySqlClient.disconnect();
-            } catch (AdapterException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    public MySqlSetAdapter() {
-    }
-
-    public MySqlSetAdapter(SpecificAdapterSetDescription adapterDescription) {
-        super(adapterDescription);
-
-        getConfigurations(adapterDescription);
-    }
-
-
-    @Override
-    public SpecificAdapterSetDescription declareModel() {
-        SpecificAdapterSetDescription description = SpecificDataSetAdapterBuilder.create(ID)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .withLocales(Locales.EN)
-                .requiredTextParameter(Labels.withId(MySqlClient.HOST))
-                .requiredIntegerParameter(Labels.withId(MySqlClient.PORT), 3306)
-                .requiredTextParameter(Labels.withId(MySqlClient.DATABASE))
-                .requiredTextParameter(Labels.withId(MySqlClient.TABLE))
-                .requiredTextParameter(Labels.withId(MySqlClient.USER))
-                .requiredSecret(Labels.withId(MySqlClient.PASSWORD))
-                .requiredSingleValueSelection(Labels.withId(MySqlClient.REPLACE_NULL_VALUES),
-                        Options.from(
-                                new Tuple2<>("Yes", MySqlClient.DO_REPLACE_NULL_VALUES),
-                                new Tuple2<>("No", MySqlClient.DO_NOT_REPLACE_NULL_VALUES)))
-                .build();
-
-        description.setAppId(ID);
-        return description;
-    }
-
-    @Override
-    public void startAdapter() throws AdapterException {
-        fetchDataThread = new Thread(new FetchDataThread(this));
-        fetchDataThread.start();
-    }
-
-    @Override
-    public void stopAdapter() throws AdapterException {
-        fetchDataThread.interrupt();
-        try {
-            fetchDataThread.join();
-        } catch (InterruptedException e) {
-            throw new AdapterException("Unexpected Error while joining polling thread: " + e.getMessage());
-        }
-    }
-
-    @Override
-    public Adapter getInstance(SpecificAdapterSetDescription adapterDescription) {
-        return new MySqlSetAdapter(adapterDescription);
-    }
-
-    @Override
-    public GuessSchema getSchema(SpecificAdapterSetDescription adapterDescription) throws AdapterException, ParseException {
-        getConfigurations(adapterDescription);
-        return mySqlClient.getSchema();
-    }
-
-    @Override
-    public String getId() {
-        return ID;
-    }
-
-    private void send(Map<String, Object> map) {
-        adapterPipeline.process(map);
-    }
-
-    private void getConfigurations(SpecificAdapterSetDescription adapterDescription) {
-        ParameterExtractor extractor = new ParameterExtractor(adapterDescription.getConfig());
-
-        String replace = extractor.selectedSingleValueInternalName(MySqlClient.REPLACE_NULL_VALUES);
-        replaceNullValues = replace.equals(MySqlClient.DO_REPLACE_NULL_VALUES);
-
-        mySqlClient = new MySqlClient(
-                extractor.singleValue(MySqlClient.HOST, String.class),
-                extractor.singleValue(MySqlClient.PORT, Integer.class),
-                extractor.singleValue(MySqlClient.DATABASE, String.class),
-                extractor.singleValue(MySqlClient.TABLE, String.class),
-                extractor.singleValue(MySqlClient.USER, String.class),
-                extractor.secretValue(MySqlClient.PASSWORD));
-    }
-
-    public MySqlClient getMySqlClient() {
-        return mySqlClient;
-    }
-}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlStreamAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlStreamAdapter.java
deleted file mode 100644
index c36e14a4c..000000000
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/mysql/MySqlStreamAdapter.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.iiot.adapters.mysql;
-
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.event.*;
-import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.sdk.utils.Assets;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MySqlStreamAdapter extends SpecificDataStreamAdapter {
-
-    public static final String ID = "org.apache.streampipes.connect.iiot.adapters.mysql.stream";
-
-    private MySqlClient mySqlClient;
-    private BinaryLogClient binaryLogClient;
-
-    private Thread subscriptionThread  = new Thread(()-> {
-        try {
-            binaryLogClient.connect();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    });
-
-    private boolean replaceNullValues;
-    private boolean dataComing = false;
-
-    public MySqlStreamAdapter() {
-    }
-
-    public MySqlStreamAdapter(SpecificAdapterStreamDescription adapterDescription) {
-        super(adapterDescription);
-
-        getConfigurations(adapterDescription);
-    }
-
-    @Override
-    public SpecificAdapterStreamDescription declareModel() {
-        SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .withLocales(Locales.EN)
-                .requiredTextParameter(Labels.withId(MySqlClient.HOST))
-                .requiredIntegerParameter(Labels.withId(MySqlClient.PORT), 3306)
-                .requiredTextParameter(Labels.withId(MySqlClient.DATABASE))
-                .requiredTextParameter(Labels.withId(MySqlClient.TABLE))
-                .requiredTextParameter(Labels.withId(MySqlClient.USER))
-                .requiredSecret(Labels.withId(MySqlClient.PASSWORD))
-                .requiredSingleValueSelection(Labels.withId(MySqlClient.REPLACE_NULL_VALUES),
-                        Options.from(
-                                new Tuple2<>("Yes", MySqlClient.DO_REPLACE_NULL_VALUES),
-                                new Tuple2<>("No", MySqlClient.DO_NOT_REPLACE_NULL_VALUES)))
-                .build();
-
-        description.setAppId(ID);
-        return description;
-    }
-
-    @Override
-    public void startAdapter() throws AdapterException {
-        // Making sure, that the columns are all loaded
-        mySqlClient.connect();
-        mySqlClient.loadColumns();
-        mySqlClient.disconnect();
-
-        // Connect BinaryLogClient
-        binaryLogClient = new BinaryLogClient(
-                mySqlClient.getHost(),
-                mySqlClient.getPort(),
-                mySqlClient.getUsername(),
-                mySqlClient.getPassword());
-
-        EventDeserializer eventDeserializer = new EventDeserializer();
-        eventDeserializer.setCompatibilityMode(
-                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
-                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
-        );
-        binaryLogClient.setEventDeserializer(eventDeserializer);
-        binaryLogClient.registerEventListener(event -> sendEvent(event));
-        subscriptionThread.start();
-    }
-
-
-    private void sendEvent(Event event) {
-        // An event can contain multiple insertions/updates
-        if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
-            // Check table and database, if the next event should be streamed
-            if (((TableMapEventData) event.getData()).getDatabase().equals(mySqlClient.getDatabase())
-                    && ((TableMapEventData) event.getData()).getTable().equals((mySqlClient.getTable()))) {
-                dataComing = true;
-            }
-        }
-        if (dataComing) {
-            if (EventType.isUpdate(event.getHeader().getEventType())) {
-                for (Map.Entry<Serializable[], Serializable[]> en : ((UpdateRowsEventData) event.getData()).getRows()) {
-                    sendChange(en.getValue());
-                }
-                dataComing = false;
-            } else if (EventType.isWrite(event.getHeader().getEventType())) {
-                for (Serializable[] s : ((WriteRowsEventData) event.getData()).getRows()) {
-                    sendChange(s);
-                }
-                dataComing = false;
-            }
-        }
-    }
-
-    private void sendChange(Serializable[] rows) {
-        Map<String, Object> out = new HashMap<>();
-        for (int i = 0; i < rows.length; i++) {
-            if (rows[i] != null) {
-                if (rows[i] instanceof byte[]) {
-                    // Strings are sent in byte arrays and have to be converted.
-                    //TODO: Check that encoding is correct
-                    out.put(mySqlClient.getColumns().get(i).getName(), new String((byte[])rows[i]));
-                } else {
-                    out.put(mySqlClient.getColumns().get(i).getName(), rows[i]);
-                }
-            } else if (replaceNullValues) {
-                out.put(mySqlClient.getColumns().get(i).getName(), mySqlClient.getColumns().get(i).getDefault());
-            } else {
-                // We should skip events with null values
-                return;
-            }
-        }
-        adapterPipeline.process(out);
-    }
-
-    @Override
-    public void stopAdapter() throws AdapterException {
-        try {
-            binaryLogClient.disconnect();
-            subscriptionThread.join();
-        } catch (IOException | InterruptedException e) {
-            throw new AdapterException("Thrown exception: " + e.getMessage());
-        }
-    }
-
-    @Override
-    public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
-        return new MySqlStreamAdapter(adapterDescription);
-    }
-
-    @Override
-    public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
-        getConfigurations(adapterDescription);
-        return mySqlClient.getSchema();
-    }
-
-    @Override
-    public String getId() {
-        return ID;
-    }
-
-    private void getConfigurations(SpecificAdapterStreamDescription adapterDescription) {
-        ParameterExtractor extractor = new ParameterExtractor(adapterDescription.getConfig());
-
-        String replace = extractor.selectedSingleValueInternalName(MySqlClient.REPLACE_NULL_VALUES);
-        replaceNullValues = replace.equals(MySqlClient.DO_REPLACE_NULL_VALUES);
-
-        mySqlClient = new MySqlClient(
-                extractor.singleValue(MySqlClient.HOST, String.class),
-                extractor.singleValue(MySqlClient.PORT, Integer.class),
-                extractor.singleValue(MySqlClient.DATABASE, String.class),
-                extractor.singleValue(MySqlClient.TABLE, String.class),
-                extractor.singleValue(MySqlClient.USER, String.class),
-                extractor.secretValue(MySqlClient.PASSWORD));
-    }
-}
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
index fbc00d700..6b71b9f07 100644
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
@@ -75,10 +75,6 @@
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
         </dependency>
-        <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-        </dependency>
 
         <!-- 3rd party dependencies to avoid convergence errors -->
         <dependency>
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabasesJvmInit.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabasesJvmInit.java
index 528b53806..cb1a38abe 100644
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabasesJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabasesJvmInit.java
@@ -32,7 +32,6 @@ import org.apache.streampipes.sinks.databases.jvm.couchdb.CouchDbController;
 import org.apache.streampipes.sinks.databases.jvm.ditto.DittoController;
 import org.apache.streampipes.sinks.databases.jvm.influxdb.InfluxDbController;
 import org.apache.streampipes.sinks.databases.jvm.iotdb.IotDbController;
-import org.apache.streampipes.sinks.databases.jvm.mysql.MysqlController;
 import org.apache.streampipes.sinks.databases.jvm.opcua.UpcUaController;
 import org.apache.streampipes.sinks.databases.jvm.postgresql.PostgreSqlController;
 import org.apache.streampipes.sinks.databases.jvm.redis.RedisController;
@@ -56,8 +55,7 @@ public class DatabasesJvmInit extends StandaloneModelSubmitter {
                     new PostgreSqlController(),
                     new IotDbController(),
                     new DittoController(),
-                    new RedisController(),
-                    new MysqlController())
+                    new RedisController())
             .registerMessagingFormats(
                     new JsonDataFormatFactory(),
                     new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
deleted file mode 100644
index 510d1fd87..000000000
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.databases.jvm.mysql;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.SupportedDbEngines;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-
-public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
-
-    private MysqlParameters params;
-    private final SupportedDbEngines dbEngine = SupportedDbEngines.MY_SQL;
-
-    @Override
-    public void onInvocation(MysqlParameters params, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
-
-        this.params = params;
-        Logger LOG = params.getGraph().getLogger(Mysql.class);
-
-        initializeJdbc(
-                params.getGraph().getInputStreams().get(0).getEventSchema(),
-                params,
-                dbEngine,
-                LOG);
-    }
-
-
-    @Override
-    public void onEvent(Event inputEvent) {
-        try {
-            save(inputEvent);
-        } catch (SpRuntimeException e) {
-            e.printStackTrace();
-        }
-    }
-
-
-    @Override
-    public void onDetach() throws SpRuntimeException {
-        closeAll();
-    }
-
-
-    @Override
-    protected void ensureDatabaseExists(String databaseName) throws SpRuntimeException {
-
-        String createStatement = "CREATE DATABASE IF NOT EXISTS ";
-
-        ensureDatabaseExists(createStatement, databaseName);
-
-    }
-
-    @Override
-    protected void extractTableInformation() throws SpRuntimeException {
-
-        String query = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE FROM "
-                + "INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA = ? ORDER BY "
-                + "ORDINAL_POSITION ASC;";
-
-        String[] queryParameter = new String[]{params.getDbTable(), params.getDbName()};
-
-        this.tableDescription.extractTableInformation(
-                this.statementHandler.preparedStatement, this.connection,
-                query, queryParameter);
-    }
-
-
-}
-
-
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlController.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlController.java
deleted file mode 100644
index 927ac7de9..000000000
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlController.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.databases.jvm.mysql;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class MysqlController extends StandaloneEventSinkDeclarer<MysqlParameters> {
-
-    private static final String HOST_KEY = "host";
-    private static final String USER_KEY = "user";
-    private static final String PASSWORD_KEY = "password";
-    private static final String DB_KEY = "db";
-    private static final String TABLE_KEY = "table";
-    private static final String PORT_KEY = "port";
-
-    @Override
-    public DataSinkDescription declareModel() {
-        return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.mysql")
-                .category(DataSinkType.DATABASE)
-                .withLocales(Locales.EN)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .requiredStream(StreamRequirementsBuilder
-                        .create()
-                        .requiredProperty(EpRequirements.anyProperty())
-                        .build())
-                .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
-                .requiredIntegerParameter(Labels.withId(PORT_KEY), 3306)
-                .requiredTextParameter(Labels.withId(USER_KEY), false, false)
-                .requiredSecret(Labels.withId(PASSWORD_KEY))
-                .requiredTextParameter(Labels.withId(DB_KEY), false, false)
-                .requiredTextParameter(Labels.withId(TABLE_KEY), false, false)
-                .build();
-    }
-
-    @Override
-    public ConfiguredEventSink<MysqlParameters> onInvocation(DataSinkInvocation graph,
-                                                             DataSinkParameterExtractor extractor) {
-
-        String host = extractor.singleValueParameter(HOST_KEY, String.class);
-        String user = extractor.singleValueParameter(USER_KEY, String.class);
-        String password = extractor.secretValue(PASSWORD_KEY);
-        String db = extractor.singleValueParameter(DB_KEY, String.class);
-        String table = extractor.singleValueParameter(TABLE_KEY, String.class);
-        Integer port = extractor.singleValueParameter(PORT_KEY, Integer.class);
-
-        // SSL connection is not yet implemented for MySQL client
-        MysqlParameters params = new MysqlParameters(graph, host, user, password, db, table, port, false);
-        return new ConfiguredEventSink<>(params, Mysql::new);
-    }
-
-}
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlParameters.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlParameters.java
deleted file mode 100644
index 40180caa4..000000000
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/MysqlParameters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.databases.jvm.mysql;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.JdbcConnectionParameters;
-
-public class MysqlParameters extends JdbcConnectionParameters {
-
-    public MysqlParameters(DataSinkInvocation graph, String mySqlHost, String mySqlUser, String mySqlPassword,
-                           String mySqlDb, String mySqlTable, Integer mySqlPort, boolean sslEnabled) {
-        super(
-                graph,
-                mySqlHost,
-                mySqlPort,
-                mySqlDb,
-                mySqlUser,
-                mySqlPassword,
-                mySqlTable,
-                sslEnabled,
-                null,
-                false
-        );
-    }
-}
diff --git a/ui/cypress/tests/thirdparty/MySQLDb.ts b/ui/cypress/tests/thirdparty/MySQLDb.ts
deleted file mode 100644
index ac661ba41..000000000
--- a/ui/cypress/tests/thirdparty/MySQLDb.ts
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import { SpecificAdapterBuilder } from '../../support/builder/SpecificAdapterBuilder';
-import { PipelineElementBuilder } from '../../support/builder/PipelineElementBuilder';
-import { ThirdPartyIntegrationUtils } from '../../support/utils/ThirdPartyIntegrationUtils';
-import { PipelineElementInput } from '../../support/model/PipelineElementInput';
-import { ParameterUtils } from '../../support/utils/ParameterUtils';
-
-describe('Test MySQL Integration', () => {
-  beforeEach('Setup Test', () => {
-    cy.initStreamPipesTest();
-  });
-
-  it('Perform Test', () => {
-    const dbName = 'cypressDatabase';
-    const host: string = ParameterUtils.get('localhost', 'mysql');
-
-    const sink: PipelineElementInput = PipelineElementBuilder.create('mysql_database')
-      .addInput('input', 'host', host)
-      .addInput('input', 'user', 'root')
-      .addInput('input', 'password', '7uc4rAymrPhxv6a5')
-      .addInput('input', 'db', 'sp')
-      .addInput('input', 'table', dbName)
-      .build();
-
-    const adapter = SpecificAdapterBuilder
-      .create('MySql_Stream_Adapter')
-      .setName('MySQL Adapter')
-      .setTimestampProperty('timestamp')
-      .addInput('input', 'mysqlHost', host)
-      .addInput('input', 'mysqlUser', 'root')
-      .addInput('input', 'mysqlPassword', '7uc4rAymrPhxv6a5')
-      .addInput('input', 'mysqlDatabase', 'sp')
-      .addInput('input', 'mysqlTable', dbName)
-      .build();
-
-    ThirdPartyIntegrationUtils.runTest(sink, adapter);
-  });
-
-});


[incubator-streampipes] 02/03: [hotfix] Improve resilience of OPC-UA adapter in subscription mode

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit b84110b041124b6ee02914db1d625ea4ebf3e930
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Aug 25 15:57:18 2022 +0200

    [hotfix] Improve resilience of OPC-UA adapter in subscription mode
---
 .../streampipes-connect-adapters-iiot/pom.xml      |  4 ----
 .../connect/iiot/adapters/opcua/SpOpcUaClient.java | 22 +++++++++++++++++++++-
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
index f1bb326c0..f59306d47 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
@@ -66,10 +66,6 @@
             <groupId>com.fasterxml.jackson.module</groupId>
             <artifactId>jackson-module-jaxb-annotations</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.github.shyiko</groupId>
-            <artifactId>mysql-binlog-connector-java</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.influxdb</groupId>
             <artifactId>influxdb-java</artifactId>
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/SpOpcUaClient.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/SpOpcUaClient.java
index 6e7b9af8f..299c4f04f 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/SpOpcUaClient.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/SpOpcUaClient.java
@@ -25,11 +25,13 @@ import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
 import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
@@ -93,7 +95,25 @@ public class SpOpcUaClient {
      * @param opcUaAdapter current instance of {@link OpcUaAdapter}
      * @throws Exception
      */
-    public void createListSubscription(List<NodeId> nodes, OpcUaAdapter opcUaAdapter) throws Exception {
+    public void createListSubscription(List<NodeId> nodes,
+                                       OpcUaAdapter opcUaAdapter) throws Exception {
+        client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
+            @Override
+            public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
+                LOG.warn("Transfer for subscriptionId={} failed: {}", subscription.getSubscriptionId(), statusCode);
+                try {
+                    initSubscription(nodes, opcUaAdapter);
+                } catch (Exception e) {
+                    LOG.error("Re-creating the subscription failed", e);
+                }
+            }
+        });
+
+        initSubscription(nodes, opcUaAdapter);
+    }
+
+
+    public void initSubscription(List<NodeId> nodes, OpcUaAdapter opcUaAdapter) throws Exception {
         /*
          * create a subscription @ 1000ms
          */


[incubator-streampipes] 03/03: [hotfix] Add button to delete an adapter description

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 0f5e68a319a0dfa6d1ff90ad301a308e2582c97a
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Aug 25 16:38:16 2022 +0200

    [hotfix] Add button to delete an adapter description
---
 .../master/management/DescriptionManagement.java      | 19 +++++++++++++++++++
 .../rest/impl/connect/DescriptionResource.java        | 17 +++++++++++++----
 .../platform-services/src/lib/apis/adapter.service.ts |  5 ++++-
 .../adapter-description.component.html                | 14 ++++++++++++--
 .../adapter-description.component.ts                  | 12 +++++++++++-
 .../data-marketplace/data-marketplace.component.html  |  1 +
 ui/src/app/connect/connect.module.ts                  |  2 ++
 7 files changed, 62 insertions(+), 8 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
index 834c76464..da40e4995 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.connect.adapter.AdapterRegistry;
 import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.api.exception.AdapterException;
@@ -25,6 +26,7 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -55,6 +57,15 @@ public class DescriptionManagement {
                 .findFirst();
     }
 
+    public void deleteAdapterDescription(String id) throws SpRuntimeException {
+        var adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
+        var adapter = adapterStorage.getAdapter(id);
+        if (!isAdapterUsed(adapter)) {
+            adapterStorage.deleteAdapter(id);
+        } else {
+            throw new SpRuntimeException("This adapter is used by an existing instance and cannot be deleted");
+        }
+    }
     public String getAssets(String baseUrl) throws AdapterException {
         return WorkerRestClient.getAssets(baseUrl);
     }
@@ -67,4 +78,12 @@ public class DescriptionManagement {
         return WorkerRestClient.getDocumentationAsset(baseUrl);
     }
 
+    private boolean isAdapterUsed(AdapterDescription adapter) {
+        var allAdapters = StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage().getAllAdapters();
+
+        return allAdapters
+          .stream()
+          .anyMatch(runningAdapter -> runningAdapter.getAppId().equals(adapter.getAppId()));
+    }
+
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
index 90e6508a6..56819527e 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
@@ -18,6 +18,7 @@
 package org.apache.streampipes.rest.impl.connect;
 
 import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.DescriptionManagement;
 import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
@@ -27,10 +28,7 @@ import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
+import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.List;
@@ -154,4 +152,15 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
             return fail();
         }
     }
+
+    @DELETE
+    @Path("{adapterId}")
+    public Response deleteAdapter(@PathParam("adapterId") String adapterId) {
+        try {
+            this.managementService.deleteAdapterDescription(adapterId);
+            return ok();
+        } catch (SpRuntimeException e) {
+            return badRequest(e);
+        }
+    }
 }
diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
index d60352000..f337ab43e 100644
--- a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
@@ -38,13 +38,16 @@ export class AdapterService {
 
   getAdapterDescriptions(): Observable<AdapterDescriptionUnion[]> {
     return this.requestAdapterDescriptions('/master/description/adapters');
-
   }
 
   getAdapters(): Observable<AdapterDescriptionUnion[]> {
     return this.requestAdapterDescriptions('/master/adapters');
   }
 
+  deleteAdapterDescription(adapterId: string): Observable<any> {
+    return this.http.delete(`${this.connectPath}/master/description/${adapterId}`);
+  }
+
   requestAdapterDescriptions(path: string): Observable<AdapterDescriptionUnion[]> {
     return this.http
       .get(
diff --git a/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.html b/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.html
index 29c2b0821..52aa74ae5 100644
--- a/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.html
+++ b/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.html
@@ -19,13 +19,23 @@
 <div [className]="className" fxFlex="100"
      fxLayout="column">
     <div fxLayout="column">
-        <div class="type" fxLayout="column" fxLayoutAlign="start start">
-            <div class="type-data" fxLayout="row" fxLayoutAlign="start start">
+        <div class="type" fxLayout="row" fxLayoutAlign="start start" fxFlex="100">
+            <div class="type-data" fxLayout="row" fxLayoutAlign="start center" fxFlex>
                 <mat-icon *ngIf="isDataSetDescription" class="historic">lens</mat-icon>
                 <mat-icon *ngIf="isDataStreamDescription" class="real-time">lens</mat-icon>
                 <p *ngIf="isDataSetDescription">Data Set</p>
                 <p *ngIf="isDataStreamDescription">Data Stream</p>
             </div>
+            <div fxLayout="row" fxLayoutAlign="end end" style="margin-left:5px;">
+                <button class="small-button-add mat-basic no-min-width" (click)="$event.stopPropagation();"
+                        mat-raised-button mat-button [matMenuTriggerFor]="menu"><span style="font-size:12px;">...</span></button>
+                <mat-menu #menu="matMenu">
+                    <button mat-menu-item (click)="removeAdapter()">
+                        <mat-icon>delete</mat-icon>
+                        <span>&nbsp;Remove adapter</span>
+                    </button>
+                </mat-menu>
+            </div>
         </div>
         <div fxLayoutAlign="start center" fxLayout="column" class="icon">
             <img *ngIf="getIconUrl() && !adapter.icon" [src]="getIconUrl()" class="iconImg"/>
diff --git a/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts b/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
index 89bea3356..c65fa3b09 100644
--- a/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
+++ b/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
@@ -22,6 +22,7 @@ import { AdapterExportDialog } from '../../../dialog/adapter-export/adapter-expo
 import { MatDialog } from '@angular/material/dialog';
 import { AdapterDescription, AdapterService } from '@streampipes/platform-services';
 import { DialogService, PanelType } from '@streampipes/shared-ui';
+import { MatSnackBar } from '@angular/material/snack-bar';
 
 @Component({
   selector: 'sp-adapter-description',
@@ -50,7 +51,8 @@ export class AdapterDescriptionComponent implements OnInit {
   constructor(private connectService: ConnectService,
               private dataMarketplaceService: AdapterService,
               private dialogService: DialogService,
-              public dialog: MatDialog) {
+              public dialog: MatDialog,
+              private _snackBar: MatSnackBar) {
   }
 
   ngOnInit() {
@@ -111,4 +113,12 @@ export class AdapterDescriptionComponent implements OnInit {
       return `assets/img/connect/${this.adapter.iconUrl}`;
     }
   }
+
+  removeAdapter(): void {
+    this.dataMarketplaceService.deleteAdapterDescription(this.adapter.elementId).subscribe(res => {
+      this.updateAdapterEmitter.emit();
+    }, error => {
+      this._snackBar.open('Cannot delete an adapter which has an active instance running.');
+    });
+  }
 }
diff --git a/ui/src/app/connect/components/data-marketplace/data-marketplace.component.html b/ui/src/app/connect/components/data-marketplace/data-marketplace.component.html
index 0d279a7c8..61d9949d9 100644
--- a/ui/src/app/connect/components/data-marketplace/data-marketplace.component.html
+++ b/ui/src/app/connect/components/data-marketplace/data-marketplace.component.html
@@ -50,6 +50,7 @@
                     <sp-adapter-description attr.id="{{adapterDescription.name.replaceAll(' ', '_')}}"
                                             class="adapter-description"
                                             fxFlex="33"
+                                            (updateAdapterEmitter)="getAdapterDescriptions()"
                                             (click)="selectAdapter(adapterDescription.appId)"
                                             *ngFor="let adapterDescription of adapterDescriptions | adapterFilter:currentFilter"
                                             [adapter]="adapterDescription"></sp-adapter-description>
diff --git a/ui/src/app/connect/connect.module.ts b/ui/src/app/connect/connect.module.ts
index 1bd77b851..8d7bc1203 100644
--- a/ui/src/app/connect/connect.module.ts
+++ b/ui/src/app/connect/connect.module.ts
@@ -85,6 +85,7 @@ import { SpEpSettingsSectionComponent } from './dialog/edit-event-property/compo
 import { SpAdapterOptionsPanelComponent } from './components/new-adapter/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component';
 import { SpAdapterTemplateDialogComponent } from './dialog/adapter-template/adapter-template-dialog.component';
 import { JsonPrettyPrintPipe } from './filter/json-pretty-print.pipe';
+import { MatSnackBarModule } from '@angular/material/snack-bar';
 
 @NgModule({
   imports: [
@@ -100,6 +101,7 @@ import { JsonPrettyPrintPipe } from './filter/json-pretty-print.pipe';
     MatInputModule,
     MatFormFieldModule,
     MatSliderModule,
+    MatSnackBarModule,
     PlatformServicesModule,
     CoreUiModule,
     TreeModule,