You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/19 16:33:08 UTC

[incubator-streampipes] branch STREAMPIPES-319 updated (11b0832 -> 4b12163)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 11b0832  [STREAMPIPES-385] Remove obsolete container-embedded module
     new 1bb56a2  [STREAMPIPES-386] Use the same declarer for pipeline elements and adapters
     new 4b12163  [STREAMPIPES-386] Add base service for extensions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  2 +
 .../backend/StreamPipesBackendApplication.java     | 15 ++--
 streampipes-connect-api/pom.xml                    | 26 ++++++
 .../apache/streampipes/connect/api}/Connector.java |  6 +-
 .../streampipes/connect/api}/EmitBinaryEvent.java  |  4 +-
 .../apache/streampipes/connect/api/IAdapter.java   | 24 ++++--
 .../streampipes/connect/api/IAdapterPipeline.java  | 16 ++--
 .../connect/api/IAdapterPipelineElement.java       |  4 +-
 .../apache/streampipes/connect/api/IFormat.java    | 16 ++--
 .../apache/streampipes/connect/api/IParser.java    | 25 +++---
 .../apache/streampipes/connect/api/IProtocol.java  | 32 ++++---
 .../connect/api}/exception/AdapterException.java   |  2 +-
 .../connect/api}/exception/ParseException.java     |  2 +-
 .../api}/exception/WorkerAdapterException.java     |  2 +-
 .../master/management/AdapterMasterManagement.java |  2 +-
 .../AdapterTemplateMasterManagement.java           |  2 +-
 .../master/management/DescriptionManagement.java   |  8 +-
 .../master/management/GuessManagement.java         |  6 +-
 .../master/management/SourcesManagement.java       |  2 +-
 .../master/management/UnitMasterManagement.java    |  2 +-
 .../management/WorkerAdministrationManagement.java |  2 +-
 .../master/management/WorkerRestClient.java        |  2 +-
 .../container/master/rest/AdapterResource.java     |  2 +-
 .../master/rest/AdapterTemplateResource.java       |  2 +-
 .../container/master/rest/DescriptionResource.java |  2 +-
 .../container/master/rest/GuessResource.java       |  4 +-
 .../master/rest/RuntimeResolvableResource.java     |  2 +-
 .../container/master/rest/SourcesResource.java     |  2 +-
 .../container/master/rest/UnitResource.java        |  2 +-
 .../container/master/rest/WelcomePageMaster.java   |  2 +-
 .../management/AdapterMasterManagementTest.java    |  2 +-
 .../AdapterTemplateMasterManagementTest.java       |  2 +-
 .../management/DescriptionManagementTest.java      |  4 +-
 .../master/management/SourcesManagementTest.java   |  2 +-
 .../management/UnitMasterManagementTest.java       |  4 +-
 .../master/management/WorkerRestClientTest.java    |  2 +-
 streampipes-connect-container-worker/pom.xml       |  5 ++
 .../worker/init/AdapterDeclarerSingleton.java      | 97 ----------------------
 .../init/AdapterServiceResourceProvider.java       | 34 ++++----
 .../worker/init/AdapterWorkerContainer.java        | 17 ++--
 .../init/AdapterWorkerContainerResourceConfig.java | 28 +++----
 .../worker/management/AdapterWorkerManagement.java | 41 +++++----
 .../worker/management/GuessManagement.java         | 12 +--
 .../worker/management/RuntimeResovable.java        | 15 ++--
 .../container/worker/rest/GuessResource.java       |  2 +-
 .../container/worker/rest/WelcomePageWorker.java   | 12 +--
 .../container/worker/rest/WorkerResource.java      |  2 +-
 .../container/worker/utils/AdapterUtils.java       | 28 ++++---
 .../management/AdapterWorkerManagementTest.java    |  4 +-
 streampipes-connect/pom.xml                        |  4 +-
 .../org/apache/streampipes/connect/GetNEvents.java |  8 +-
 .../connect/RunningAdapterInstances.java           | 10 +--
 .../apache/streampipes/connect/SendToPipeline.java | 13 +--
 .../streampipes/connect/adapter/Adapter.java       | 42 ++++------
 .../connect/adapter/AdapterRegistry.java           | 12 +--
 .../connect/adapter/format/csv/CsvFormat.java      |  8 +-
 .../connect/adapter/format/csv/CsvParser.java      |  6 +-
 .../adapter/format/geojson/GeoJsonFormat.java      |  8 +-
 .../adapter/format/geojson/GeoJsonParser.java      |  4 +-
 .../connect/adapter/format/image/ImageFormat.java  |  8 +-
 .../connect/adapter/format/image/ImageParser.java  |  8 +-
 .../adapter/format/json/AbstractJsonFormat.java    |  6 +-
 .../adapter/format/json/arraykey/JsonFormat.java   |  4 +-
 .../adapter/format/json/arraykey/JsonParser.java   |  6 +-
 .../format/json/arraynokey/JsonArrayFormat.java    |  4 +-
 .../format/json/arraynokey/JsonArrayParser.java    |  6 +-
 .../format/json/object/JsonObjectFormat.java       |  4 +-
 .../format/json/object/JsonObjectParser.java       |  6 +-
 .../connect/adapter/format/xml/XmlFormat.java      |  8 +-
 .../connect/adapter/format/xml/XmlParser.java      |  4 +-
 .../adapter/model/generic/GenericAdapter.java      | 37 +++++----
 .../model/generic/GenericDataSetAdapter.java       |  9 +-
 .../model/generic/GenericDataStreamAdapter.java    |  3 +-
 .../connect/adapter/model/generic/Parser.java      | 19 +----
 .../connect/adapter/model/generic/Protocol.java    | 37 ++-------
 .../adapter/model/pipeline/AdapterPipeline.java    | 28 ++++---
 .../elements/AddTimestampPipelineElement.java      |  4 +-
 .../elements/AddValuePipelineElement.java          |  4 +-
 .../elements/DuplicateFilterPipelineElement.java   |  4 +-
 .../elements/SendToBrokerAdapterSink.java          |  5 +-
 .../elements/SendToBrokerReplayAdapterSink.java    |  4 +-
 .../elements/SendToJmsAdapterSink.java             |  4 +-
 .../elements/SendToKafkaAdapterSink.java           |  4 +-
 .../elements/SendToMqttAdapterSink.java            |  4 +-
 .../TransformSchemaAdapterPipelineElement.java     |  4 +-
 .../elements/TransformStreamAdapterElement.java    |  4 +-
 .../TransformValueAdapterPipelineElement.java      |  4 +-
 .../format/json/arraykey/JsonParserTest.java       |  4 +-
 .../json/arraynokey/JsonArrayParserTest.java       |  4 +-
 .../adapter/format/json/geojson/GeoJsonTest.java   |  2 +-
 .../format/json/object/JsonObjectParserTest.java   |  4 +-
 .../connect/adapter/format/json/xml/XmlTest.java   |  2 +-
 .../container/base/StreamPipesServiceBase.java     | 27 +++---
 .../container/base/rest/BaseResourceConfig.java    | 17 ++--
 streampipes-container-extensions/pom.xml           | 15 ++--
 .../extensions/ExtensionsModelSubmitter.java       | 13 ++-
 .../extensions/ExtensionsResourceConfig.java       | 39 ++++-----
 streampipes-container-standalone/pom.xml           |  4 +-
 .../PipelineElementContainerResourceConfig.java    | 25 +++---
 .../standalone/init/StandaloneModelSubmitter.java  | 79 ++++++------------
 streampipes-container/pom.xml                      | 10 +++
 ... => BaseExtensionsServiceResourceProvider.java} | 12 ++-
 .../container/init/DeclarersSingleton.java         | 58 +++++++++++++
 ...mitter.java => ExtensionsResourceProvider.java} |  7 +-
 .../PipelineElementServiceResourceProvider.java    | 31 +++----
 .../container/model/SpServiceDefinition.java       | 17 ++++
 .../model/SpServiceDefinitionBuilder.java          | 25 ++++--
 .../container/util/ServiceDefinitionUtil.java      |  3 +-
 .../streampipes/svcdiscovery/SpServiceGroups.java  |  7 +-
 streampipes-service-extensions-base/pom.xml        | 32 +++++++
 .../base/StreamPipesExtensionsServiceBase.java     | 78 +++++++++++++++++
 111 files changed, 744 insertions(+), 661 deletions(-)
 create mode 100644 streampipes-connect-api/pom.xml
 rename {streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api}/Connector.java (92%)
 rename {streampipes-connect/src/main/java/org/apache/streampipes/connect => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api}/EmitBinaryEvent.java (91%)
 copy streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterStorage.java => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapter.java (54%)
 copy streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapterPipeline.java (68%)
 rename streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapterPipelineElement.java (89%)
 rename streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Format.java => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IFormat.java (73%)
 copy streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java (63%)
 copy streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IProtocol.java (55%)
 rename {streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api}/exception/AdapterException.java (94%)
 rename {streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api}/exception/ParseException.java (94%)
 rename {streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter => streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api}/exception/WorkerAdapterException.java (95%)
 delete mode 100644 streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterDeclarerSingleton.java
 copy streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Filetypes.java => streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java (54%)
 copy streampipes-maven-plugin/src/main/java/org/apache/streampipes/smp/generator/OutputGenerator.java => streampipes-container-base/src/main/java/org/apache/streampipes/container/base/rest/BaseResourceConfig.java (70%)
 copy streampipes-container/src/main/java/org/apache/streampipes/container/init/{ModelSubmitter.java => BaseExtensionsServiceResourceProvider.java} (70%)
 rename streampipes-container/src/main/java/org/apache/streampipes/container/init/{ModelSubmitter.java => ExtensionsResourceProvider.java} (88%)
 copy streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Filetypes.java => streampipes-container/src/main/java/org/apache/streampipes/container/init/PipelineElementServiceResourceProvider.java (62%)
 copy streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java => streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java (82%)
 create mode 100644 streampipes-service-extensions-base/pom.xml
 create mode 100644 streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java

[incubator-streampipes] 02/02: [STREAMPIPES-386] Add base service for extensions

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 4b1216311f3ead07e05c3cdf8f82eb923db1ac54
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Jun 19 18:32:50 2021 +0200

    [STREAMPIPES-386] Add base service for extensions
---
 pom.xml                                            |  1 +
 .../backend/StreamPipesBackendApplication.java     | 15 ++--
 .../container/base/StreamPipesServiceBase.java     | 27 ++++----
 streampipes-container-extensions/pom.xml           | 15 ++--
 .../extensions/ExtensionsModelSubmitter.java       |  4 +-
 streampipes-container-standalone/pom.xml           |  4 +-
 .../standalone/init/StandaloneModelSubmitter.java  | 79 ++++++++--------------
 .../container/init/DeclarersSingleton.java         |  9 +++
 .../container/util/ServiceDefinitionUtil.java      |  3 +-
 .../streampipes/svcdiscovery/SpServiceGroups.java  |  9 ++-
 streampipes-service-extensions-base/pom.xml        | 32 +++++++++
 .../base/StreamPipesExtensionsServiceBase.java     | 78 +++++++++++++++++++++
 12 files changed, 185 insertions(+), 91 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0a0c3d9..a6f561f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -923,6 +923,7 @@
         <module>streampipes-service-discovery-consul</module>
         <module>streampipes-service-discovery-api</module>
         <module>streampipes-connect-api</module>
+        <module>streampipes-service-extensions-base</module>
     </modules>
 
     <profiles>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 6c2ee97..e8af7a7 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -41,6 +41,7 @@ import org.springframework.context.annotation.Import;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.servlet.ServletContextListener;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -70,7 +71,14 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
 
   public static void main(String[] args) {
       StreamPipesBackendApplication application = new StreamPipesBackendApplication();
-      application.startStreamPipesService(StreamPipesBackendApplication.class, "core", "core");
+      try {
+        application.startStreamPipesService(StreamPipesBackendApplication.class,
+                "core",
+                "core",
+                8030);
+      } catch (UnknownHostException e) {
+        LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
+      }
   }
 
   @PostConstruct
@@ -225,11 +233,6 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
   }
 
   @Override
-  protected Integer getDefaultPort() {
-    return 8030;
-  }
-
-  @Override
   protected List<String> getServiceTags() {
     return Arrays.asList(SpServiceTags.CORE, SpServiceTags.CONNECT_MASTER);
   }
diff --git a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
index 33ab0fd..d51a1ee 100644
--- a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
+++ b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
@@ -33,29 +33,28 @@ public abstract class StreamPipesServiceBase {
 
   protected void startStreamPipesService(Class<?> serviceClass,
                                          String serviceGroup,
-                                         String serviceName) {
-    try {
-      registerService(serviceGroup, serviceName);
-      runApplication(serviceClass);
-    } catch (UnknownHostException e) {
-      LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
-    }
+                                         String serviceName,
+                                         Integer defaultPort) throws UnknownHostException {
+      registerService(serviceGroup, serviceName, defaultPort);
+      runApplication(serviceClass, defaultPort);
   }
 
-  private void runApplication(Class<?> serviceClass) {
+  private void runApplication(Class<?> serviceClass,
+                              Integer defaultPort) {
     SpringApplication app = new SpringApplication(serviceClass);
-    app.setDefaultProperties(Collections.singletonMap("server.port", getPort()));
+    app.setDefaultProperties(Collections.singletonMap("server.port", getPort(defaultPort)));
     app.run();
   }
 
   private void registerService(String serviceGroup,
-                               String serviceName) throws UnknownHostException {
+                               String serviceName,
+                               Integer defaultPort) throws UnknownHostException {
     SpServiceDiscovery
             .getServiceDiscovery()
             .registerService(serviceGroup,
                     serviceName,
                     getHostname(),
-                    getPort(),
+                    getPort(defaultPort),
                     getServiceTags());
   }
 
@@ -63,12 +62,10 @@ public abstract class StreamPipesServiceBase {
     return Networking.getHostname();
   }
 
-  protected Integer getPort() {
-    return Networking.getPort(getDefaultPort());
+  protected Integer getPort(Integer defaultPort) {
+    return Networking.getPort(defaultPort);
   }
 
-  protected abstract Integer getDefaultPort();
-
   protected abstract List<String> getServiceTags();
 
 
diff --git a/streampipes-container-extensions/pom.xml b/streampipes-container-extensions/pom.xml
index f0def38..0df5985 100644
--- a/streampipes-container-extensions/pom.xml
+++ b/streampipes-container-extensions/pom.xml
@@ -33,12 +33,17 @@
         <!-- StreamPipes dependencies -->
         <dependency>
             <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-connect-container-worker</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-container</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-container-base</artifactId>
+            <artifactId>streampipes-service-extensions-base</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
 
@@ -47,12 +52,6 @@
             <groupId>com.fasterxml.jackson.datatype</groupId>
             <artifactId>jackson-datatype-jdk8</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-connect-container-worker</artifactId>
-            <version>0.68.0-SNAPSHOT</version>
-        </dependency>
-
         <!-- Test dependencies -->
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 45491e1..1bb2b9c 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -21,7 +21,7 @@ import org.apache.streampipes.connect.api.IAdapter;
 import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
 import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.init.ModelSubmitter;
+import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
 import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.container.locales.LabelGenerator;
 import org.apache.streampipes.container.model.ExtensionsConfig;
@@ -53,7 +53,7 @@ import java.util.List;
 @Configuration
 @EnableAutoConfiguration
 @Import({ ExtensionsResourceConfig.class })
-public abstract class ExtensionsModelSubmitter extends ModelSubmitter<ExtensionsConfig> {
+public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServiceBase<ExtensionsConfig> {
     private static final Logger LOG =
             LoggerFactory.getLogger(ExtensionsModelSubmitter.class.getCanonicalName());
 
diff --git a/streampipes-container-standalone/pom.xml b/streampipes-container-standalone/pom.xml
index 8b2abdc..0064633 100644
--- a/streampipes-container-standalone/pom.xml
+++ b/streampipes-container-standalone/pom.xml
@@ -36,7 +36,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-container-base</artifactId>
+            <artifactId>streampipes-service-extensions-base</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
 
@@ -48,4 +48,4 @@
 
         <!-- Test dependencies -->
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 16a869a..52ff8e4 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -19,90 +19,51 @@
 package org.apache.streampipes.container.standalone.init;
 
 
-import org.apache.streampipes.commons.networking.Networking;
+import org.apache.streampipes.container.declarer.Declarer;
 import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.init.ModelSubmitter;
 import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.container.model.PeConfig;
-import org.apache.streampipes.container.model.SpServiceDefinition;
 import org.apache.streampipes.container.util.ServiceDefinitionUtil;
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
 import org.apache.streampipes.svcdiscovery.SpServiceTags;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
 
-import javax.annotation.PreDestroy;
 import java.net.UnknownHostException;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.List;
 
 @Configuration
 @EnableAutoConfiguration
 @Import({ PipelineElementContainerResourceConfig.class })
-public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig> {
+public abstract class StandaloneModelSubmitter extends StreamPipesExtensionsServiceBase<PeConfig> {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(StandaloneModelSubmitter.class.getCanonicalName());
 
-    public void init() {
-        SpServiceDefinition serviceDef = provideServiceDefinition();
-        init(serviceDef);
-    }
-
-    public void init(SpServiceDefinition serviceDef) {
-        try {
-            String host = Networking.getHostname();
-            Integer port = Networking.getPort(serviceDef.getDefaultPort());
-            List<String> serviceTags = ServiceDefinitionUtil.extractAppIds(serviceDef.getDeclarers());
-            serviceTags.add(SpServiceTags.PE);
-
-            DeclarersSingleton.getInstance().populate(host, port, serviceDef);
-
-            SpServiceDiscovery
-                    .getServiceDiscovery()
-                    .registerPeService("pe/" +serviceDef.getServiceId(), host, port, serviceTags);
-
-            startService(port);
-        } catch (UnknownHostException e) {
-            LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
-        }
-    }
-
     @Deprecated
     public void init(PeConfig peConfig) {
-
         DeclarersSingleton.getInstance()
                 .setHostName(peConfig.getHost());
         DeclarersSingleton.getInstance()
                 .setPort(peConfig.getPort());
 
-        startService(peConfig.getPort());
-
-        SpServiceDiscovery.getServiceDiscovery().registerPeService(
-                peConfig.getId(),
-                peConfig.getHost(),
-                peConfig.getPort()
-        );
-    }
-
-    private void startService(Integer port) {
-        SpringApplication app = new SpringApplication(StandaloneModelSubmitter.class);
-        app.setDefaultProperties(Collections.singletonMap("server.port", port));
-        app.run();
-    }
-
-    public SpServiceDefinition provideServiceDefinition() {
-        return null;
+        try {
+            startExtensionsService(this.getClass(),
+                    peConfig.getId(),
+                    DeclarersSingleton.getInstance().getPort());
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
     }
 
-    @PreDestroy
+    @Override
     public void onExit() {
         LOG.info("Shutting down StreamPipes pipeline element container...");
-        Integer runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
+        int runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
 
         while (runningInstancesCount > 0) {
             LOG.info("Waiting for {} running pipeline elements to be stopped...", runningInstancesCount);
@@ -114,4 +75,18 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig>
             runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
         }
     }
+
+    @Override
+    protected List<String> getServiceTags() {
+        Collection<Declarer<?>> declarers = DeclarersSingleton.getInstance().getDeclarers().values();
+        List<String> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
+        serviceTags.add(SpServiceTags.PE);
+
+        return serviceTags;
+    }
+
+    @Override
+    public void afterServiceRegistered() {
+
+    }
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
index 1ec7d1c..0ff7bcd 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
@@ -81,6 +81,7 @@ public class DeclarersSingleton {
     return DeclarersSingleton.instance;
   }
 
+
   public void populate(String host, Integer port, SpServiceDefinition serviceDef) {
     this.setHostName(host);
     this.setPort(port);
@@ -96,6 +97,10 @@ public class DeclarersSingleton {
     }
   }
 
+  @Deprecated
+  /**
+   * @Deprecated Use ServiceDefinitionBuilder instead
+   */
   public DeclarersSingleton add(Declarer<?> d) {
     if (d instanceof SemanticEventProcessingAgentDeclarer) {
       addEpaDeclarer((SemanticEventProcessingAgentDeclarer) d);
@@ -262,6 +267,10 @@ public class DeclarersSingleton {
     this.port = port;
   }
 
+  public int getPort() {
+    return this.port;
+  }
+
   public void setHostName(String host) {
     this.hostName = host;
   }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
index 101cc1c..7e8af0a 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
@@ -19,12 +19,13 @@ package org.apache.streampipes.container.util;
 
 import org.apache.streampipes.container.declarer.Declarer;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 
 public class ServiceDefinitionUtil {
 
-  public static List<String> extractAppIds(List<Declarer<?>> declarers) {
+  public static List<String> extractAppIds(Collection<Declarer<?>> declarers) {
     return declarers
             .stream()
             .map(d -> d.declareModel().getAppId())
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
similarity index 82%
rename from streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
rename to streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
index b2734b6..c6c5b02 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
+++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.svcdiscovery;
 
-package org.apache.streampipes.container.init;
-
-public abstract class ModelSubmitter<T> {
-
-    public abstract void init(T conf);
+public class SpServiceGroups {
 
+  public static final String CORE = "core";
+  public static final String EXTENSIONS = "ext";
 }
diff --git a/streampipes-service-extensions-base/pom.xml b/streampipes-service-extensions-base/pom.xml
new file mode 100644
index 0000000..6038ab4
--- /dev/null
+++ b/streampipes-service-extensions-base/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streampipes-parent</artifactId>
+        <groupId>org.apache.streampipes</groupId>
+        <version>0.68.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streampipes-service-extensions-base</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-container</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-container-base</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+</project>
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
new file mode 100644
index 0000000..4226820
--- /dev/null
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.extensions.base;
+
+import org.apache.streampipes.container.base.StreamPipesServiceBase;
+import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.svcdiscovery.SpServiceGroups;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PreDestroy;
+import java.net.UnknownHostException;
+
+public abstract class StreamPipesExtensionsServiceBase<T> extends StreamPipesServiceBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class);
+
+    public void init() {
+        SpServiceDefinition serviceDef = provideServiceDefinition();
+        init(serviceDef);
+    }
+
+    public void init(SpServiceDefinition serviceDef) {
+        try {
+        String host = getHostname();
+        Integer port = getPort(serviceDef.getDefaultPort());
+        DeclarersSingleton.getInstance().populate(host, port, serviceDef);
+
+        String serviceId = serviceDef.getServiceId();
+
+        startExtensionsService(this.getClass(), serviceId, serviceDef.getDefaultPort());
+        } catch (UnknownHostException e) {
+            LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
+        }
+    }
+
+    public SpServiceDefinition provideServiceDefinition() {
+        return null;
+    }
+
+    @Deprecated
+    public abstract void init(T conf);
+
+    public abstract void afterServiceRegistered();
+
+    public void startExtensionsService(Class<?> serviceClass,
+                                       String serviceName,
+                                       Integer defaultPort) throws UnknownHostException {
+        this.startStreamPipesService(
+                serviceClass,
+                SpServiceGroups.EXTENSIONS,
+                serviceName,
+                defaultPort
+        );
+        this.afterServiceRegistered();
+    }
+
+    @PreDestroy
+    public abstract void onExit();
+
+}

[incubator-streampipes] 01/02: [STREAMPIPES-386] Use the same declarer for pipeline elements and adapters

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 1bb56a2c19f259ef0173704cd633abd8520bd6be
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Jun 19 16:24:39 2021 +0200

    [STREAMPIPES-386] Use the same declarer for pipeline elements and adapters
---
 pom.xml                                            |  1 +
 streampipes-connect-api/pom.xml                    | 26 ++++++
 .../apache/streampipes/connect/api}/Connector.java |  6 +-
 .../streampipes/connect/api}/EmitBinaryEvent.java  |  4 +-
 .../apache/streampipes/connect/api/IAdapter.java   | 33 ++++----
 .../streampipes/connect/api/IAdapterPipeline.java  | 15 +++-
 .../connect/api/IAdapterPipelineElement.java       |  4 +-
 .../apache/streampipes/connect/api/IFormat.java    | 16 ++--
 .../apache/streampipes/connect/api/IParser.java    | 41 ++++-----
 .../apache/streampipes/connect/api/IProtocol.java  | 40 +++++----
 .../connect/api}/exception/AdapterException.java   |  2 +-
 .../connect/api}/exception/ParseException.java     |  2 +-
 .../api}/exception/WorkerAdapterException.java     |  2 +-
 .../master/management/AdapterMasterManagement.java |  2 +-
 .../AdapterTemplateMasterManagement.java           |  2 +-
 .../master/management/DescriptionManagement.java   |  8 +-
 .../master/management/GuessManagement.java         |  6 +-
 .../master/management/SourcesManagement.java       |  2 +-
 .../master/management/UnitMasterManagement.java    |  2 +-
 .../management/WorkerAdministrationManagement.java |  2 +-
 .../master/management/WorkerRestClient.java        |  2 +-
 .../container/master/rest/AdapterResource.java     |  2 +-
 .../master/rest/AdapterTemplateResource.java       |  2 +-
 .../container/master/rest/DescriptionResource.java |  2 +-
 .../container/master/rest/GuessResource.java       |  4 +-
 .../master/rest/RuntimeResolvableResource.java     |  2 +-
 .../container/master/rest/SourcesResource.java     |  2 +-
 .../container/master/rest/UnitResource.java        |  2 +-
 .../container/master/rest/WelcomePageMaster.java   |  2 +-
 .../management/AdapterMasterManagementTest.java    |  2 +-
 .../AdapterTemplateMasterManagementTest.java       |  2 +-
 .../management/DescriptionManagementTest.java      |  4 +-
 .../master/management/SourcesManagementTest.java   |  2 +-
 .../management/UnitMasterManagementTest.java       |  4 +-
 .../master/management/WorkerRestClientTest.java    |  2 +-
 streampipes-connect-container-worker/pom.xml       |  5 ++
 .../worker/init/AdapterDeclarerSingleton.java      | 97 ----------------------
 .../init/AdapterServiceResourceProvider.java       | 47 ++++-------
 .../worker/init/AdapterWorkerContainer.java        | 17 ++--
 .../init/AdapterWorkerContainerResourceConfig.java | 28 +++----
 .../worker/management/AdapterWorkerManagement.java | 41 +++++----
 .../worker/management/GuessManagement.java         | 12 +--
 .../worker/management/RuntimeResovable.java        | 15 ++--
 .../container/worker/rest/GuessResource.java       |  2 +-
 .../container/worker/rest/WelcomePageWorker.java   | 12 +--
 .../container/worker/rest/WorkerResource.java      |  2 +-
 .../container/worker/utils/AdapterUtils.java       | 28 ++++---
 .../management/AdapterWorkerManagementTest.java    |  4 +-
 streampipes-connect/pom.xml                        |  4 +-
 .../org/apache/streampipes/connect/GetNEvents.java |  8 +-
 .../connect/RunningAdapterInstances.java           | 10 +--
 .../apache/streampipes/connect/SendToPipeline.java | 13 +--
 .../streampipes/connect/adapter/Adapter.java       | 42 ++++------
 .../connect/adapter/AdapterRegistry.java           | 12 +--
 .../connect/adapter/format/csv/CsvFormat.java      |  8 +-
 .../connect/adapter/format/csv/CsvParser.java      |  6 +-
 .../adapter/format/geojson/GeoJsonFormat.java      |  8 +-
 .../adapter/format/geojson/GeoJsonParser.java      |  4 +-
 .../connect/adapter/format/image/ImageFormat.java  |  8 +-
 .../connect/adapter/format/image/ImageParser.java  |  8 +-
 .../adapter/format/json/AbstractJsonFormat.java    |  6 +-
 .../adapter/format/json/arraykey/JsonFormat.java   |  4 +-
 .../adapter/format/json/arraykey/JsonParser.java   |  6 +-
 .../format/json/arraynokey/JsonArrayFormat.java    |  4 +-
 .../format/json/arraynokey/JsonArrayParser.java    |  6 +-
 .../format/json/object/JsonObjectFormat.java       |  4 +-
 .../format/json/object/JsonObjectParser.java       |  6 +-
 .../connect/adapter/format/xml/XmlFormat.java      |  8 +-
 .../connect/adapter/format/xml/XmlParser.java      |  4 +-
 .../adapter/model/generic/GenericAdapter.java      | 37 +++++----
 .../model/generic/GenericDataSetAdapter.java       |  9 +-
 .../model/generic/GenericDataStreamAdapter.java    |  3 +-
 .../connect/adapter/model/generic/Parser.java      | 19 +----
 .../connect/adapter/model/generic/Protocol.java    | 37 ++-------
 .../adapter/model/pipeline/AdapterPipeline.java    | 28 ++++---
 .../elements/AddTimestampPipelineElement.java      |  4 +-
 .../elements/AddValuePipelineElement.java          |  4 +-
 .../elements/DuplicateFilterPipelineElement.java   |  4 +-
 .../elements/SendToBrokerAdapterSink.java          |  5 +-
 .../elements/SendToBrokerReplayAdapterSink.java    |  4 +-
 .../elements/SendToJmsAdapterSink.java             |  4 +-
 .../elements/SendToKafkaAdapterSink.java           |  4 +-
 .../elements/SendToMqttAdapterSink.java            |  4 +-
 .../TransformSchemaAdapterPipelineElement.java     |  4 +-
 .../elements/TransformStreamAdapterElement.java    |  4 +-
 .../TransformValueAdapterPipelineElement.java      |  4 +-
 .../format/json/arraykey/JsonParserTest.java       |  4 +-
 .../json/arraynokey/JsonArrayParserTest.java       |  4 +-
 .../adapter/format/json/geojson/GeoJsonTest.java   |  2 +-
 .../format/json/object/JsonObjectParserTest.java   |  4 +-
 .../connect/adapter/format/json/xml/XmlTest.java   |  2 +-
 .../container/base/rest/BaseResourceConfig.java    | 14 +++-
 .../extensions/ExtensionsModelSubmitter.java       |  9 +-
 .../extensions/ExtensionsResourceConfig.java       | 39 ++++-----
 .../PipelineElementContainerResourceConfig.java    | 25 +++---
 streampipes-container/pom.xml                      | 10 +++
 .../BaseExtensionsServiceResourceProvider.java     | 19 ++---
 .../container/init/DeclarersSingleton.java         | 49 +++++++++++
 .../container/init/ExtensionsResourceProvider.java |  9 +-
 .../PipelineElementServiceResourceProvider.java    | 42 +++-------
 .../container/model/SpServiceDefinition.java       | 17 ++++
 .../model/SpServiceDefinitionBuilder.java          | 25 ++++--
 102 files changed, 575 insertions(+), 618 deletions(-)

diff --git a/pom.xml b/pom.xml
index cc77fca..0a0c3d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -922,6 +922,7 @@
         <module>streampipes-service-discovery</module>
         <module>streampipes-service-discovery-consul</module>
         <module>streampipes-service-discovery-api</module>
+        <module>streampipes-connect-api</module>
     </modules>
 
     <profiles>
diff --git a/streampipes-connect-api/pom.xml b/streampipes-connect-api/pom.xml
new file mode 100644
index 0000000..da8cabf
--- /dev/null
+++ b/streampipes-connect-api/pom.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streampipes-parent</artifactId>
+        <groupId>org.apache.streampipes</groupId>
+        <version>0.68.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streampipes-connect-api</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-model</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+</project>
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/Connector.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/Connector.java
similarity index 92%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/Connector.java
rename to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/Connector.java
index 7d04644..694ee9f5 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/Connector.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/Connector.java
@@ -16,6 +16,10 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.model;
+package org.apache.streampipes.connect.api;
+
 public interface Connector {
+
+  String getId();
+
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/EmitBinaryEvent.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/EmitBinaryEvent.java
similarity index 91%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/EmitBinaryEvent.java
copy to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/EmitBinaryEvent.java
index da76c6c..0fe98ab 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/EmitBinaryEvent.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/EmitBinaryEvent.java
@@ -16,8 +16,8 @@
  *
  */
 
-package org.apache.streampipes.connect;
+package org.apache.streampipes.connect.api;
 
 public interface EmitBinaryEvent {
-    public Boolean emit(byte[] event);
+    Boolean emit(byte[] event);
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapter.java
similarity index 53%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
copy to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapter.java
index 350b124..4eea9c4 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapter.java
@@ -15,23 +15,28 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.connect.adapter.preprocessing.elements;
+package org.apache.streampipes.connect.api;
 
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
-import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 
-public class SendToKafkaAdapterSink extends SendToBrokerAdapterSink<KafkaTransportProtocol>
-        implements AdapterPipelineElement  {
+public interface IAdapter<T extends AdapterDescription> extends Connector {
 
-    public SendToKafkaAdapterSink(AdapterDescription adapterDescription) {
-        super(adapterDescription, SpKafkaProducer::new, KafkaTransportProtocol.class);
-    }
+  T declareModel();
 
-    @Override
-    public void modifyProtocolForDebugging() {
-        this.protocol.setBrokerHostname("localhost");
-        this.protocol.setKafkaPort(9094);
-    }
+  // Decide which adapter to call
+  void startAdapter() throws AdapterException;
+
+  void stopAdapter() throws AdapterException;
+
+  IAdapter getInstance(T adapterDescription);
+
+  GuessSchema getSchema(T adapterDescription) throws AdapterException, ParseException;
+
+  void changeEventGrounding(TransportProtocol transportProtocol);
+
+  boolean isDebug();
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapterPipeline.java
similarity index 68%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapterPipeline.java
index 52bbd9f..9f53528 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapterPipeline.java
@@ -15,13 +15,20 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.connect.api;
 
-package org.apache.streampipes.connect.adapter.model.pipeline;
-
+import java.util.List;
 import java.util.Map;
 
-public interface AdapterPipelineElement {
+public interface IAdapterPipeline {
+
+  void process(Map<String, Object> event);
+
+  List<IAdapterPipelineElement> getPipelineElements();
+
+  void setPipelineElements(List<IAdapterPipelineElement> pipelineElements);
 
-    Map<String, Object> process(Map<String, Object> event);
+  void changePipelineSink(IAdapterPipelineElement pipelineSink);
 
+  IAdapterPipelineElement getPipelineSink();
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapterPipelineElement.java
similarity index 89%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapterPipelineElement.java
index 52bbd9f..87a6f64 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapterPipelineElement.java
@@ -16,11 +16,11 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.model.pipeline;
+package org.apache.streampipes.connect.api;
 
 import java.util.Map;
 
-public interface AdapterPipelineElement {
+public interface IAdapterPipelineElement {
 
     Map<String, Object> process(Map<String, Object> event);
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Format.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IFormat.java
similarity index 73%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Format.java
rename to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IFormat.java
index d3723d5..6c4e52b 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Format.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IFormat.java
@@ -16,33 +16,33 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.model.generic;
+package org.apache.streampipes.connect.api;
 
 
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 
 import java.util.Map;
 
-public abstract class Format {
+public interface IFormat {
 
-    public abstract Format getInstance(FormatDescription formatDescription);
+    IFormat getInstance(FormatDescription formatDescription);
 
-    public abstract FormatDescription declareModel();
+    FormatDescription declareModel();
 
-    public abstract String getId();
+    String getId();
 
     /**
      * This method parses a byte[] and transforms the event object into a serialized version of the internal
      * representation
      */
-    public abstract Map<String, Object> parse(byte[] object) throws ParseException;
+    Map<String, Object> parse(byte[] object) throws ParseException;
 
     /**
      * Needed for example for the CSV format in iterative protocols to ensure header is not send again
      * When the reset is not required it can be ignored
      */
-    public void reset() {
+    default void reset() {
 
     }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
similarity index 55%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
copy to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
index b20ce52..482bf9f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
@@ -15,37 +15,28 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.connect.api;
 
-package org.apache.streampipes.connect;
+import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.model.connect.grounding.FormatDescription;
+import org.apache.streampipes.model.schema.EventSchema;
 
-
-import java.util.ArrayList;
+import java.io.InputStream;
 import java.util.List;
 
-public class GetNEvents implements EmitBinaryEvent {
-
-    private int n;
-    private List<byte[]> events;
-
-    public GetNEvents(int n) {
-        this.n = n;
-        this.events = new ArrayList<>();
-    }
+public interface IParser {
 
-    @Override
-    public Boolean emit(byte[] event) {
+  IParser getInstance(FormatDescription formatDescription);
 
-        events.add(event);
-        this.n = n - 1;
+  void parse(InputStream data, EmitBinaryEvent emitBinaryEvent) throws ParseException;
 
-        if (n == 0) {
-            return false;
-        } else {
-            return true;
-        }
-    }
+  List<byte[]> parseNEvents(InputStream data, int n) throws ParseException;
 
-    public List<byte[]> getEvents() {
-        return events;
-    }
+  /**
+   * Pass one event to Parser to get the event schema
+   *
+   * @param oneEvent
+   * @return
+   */
+  EventSchema getEventSchema(List<byte[]> oneEvent);
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/AbstractJsonFormat.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IProtocol.java
similarity index 50%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/AbstractJsonFormat.java
copy to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IProtocol.java
index c259ee4..7c29109 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/AbstractJsonFormat.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IProtocol.java
@@ -15,33 +15,37 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.connect.adapter.format.json;
+package org.apache.streampipes.connect.api;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.schema.EventSchema;
 
+import java.util.List;
 import java.util.Map;
 
-public abstract class AbstractJsonFormat extends Format {
+public interface IProtocol extends Connector {
 
-  @Override
-  public Map<String, Object> parse(byte[] object) throws ParseException {
-    EventSchema resultSchema = new EventSchema();
+  IProtocol getInstance(ProtocolDescription protocolDescription,
+                        IParser parser,
+                        IFormat format);
 
-    JsonDataFormatDefinition jsonDefinition = new JsonDataFormatDefinition();
+  ProtocolDescription declareModel();
 
-    Map<String, Object> result = null;
+  GuessSchema getGuessSchema() throws ParseException;
 
-    try {
-      result = jsonDefinition.toMap(object);
-    } catch (SpRuntimeException e) {
-      throw new ParseException("Could not parse Data: " + e.toString());
-    }
+  List<Map<String, Object>> getNElements(int n) throws ParseException;
 
-    return  result;
-  }
+  void run(IAdapterPipeline adapterPipeline);
 
+  /*
+       Stops the running protocol. Mainly relevant for streaming protocols
+     */
+  void stop();
+
+  String getId();
+
+  //TODO remove
+  void setEventSchema(EventSchema eventSchema);
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/AdapterException.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/AdapterException.java
similarity index 94%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/AdapterException.java
copy to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/AdapterException.java
index 63f3ac3..7f60951 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/AdapterException.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/AdapterException.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.exception;
+package org.apache.streampipes.connect.api.exception;
 
 public class AdapterException extends Exception {
     public AdapterException() {}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/ParseException.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
similarity index 94%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/ParseException.java
rename to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
index 9419f11..2064f29 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/ParseException.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.exception;
+package org.apache.streampipes.connect.api.exception;
 
 public class ParseException extends RuntimeException {
     public ParseException() {}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/WorkerAdapterException.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/WorkerAdapterException.java
similarity index 95%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/WorkerAdapterException.java
rename to streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/WorkerAdapterException.java
index 7cd678b..c067586 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/WorkerAdapterException.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/WorkerAdapterException.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.exception;
+package org.apache.streampipes.connect.api.exception;
 
 import org.apache.streampipes.model.message.Message;
 
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index c5f2a9d..c5703fc 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.connect.container.master.management;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.connect.adapter.GroundingService;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
 import org.apache.streampipes.manager.storage.UserService;
 import org.apache.streampipes.manager.verification.DataStreamVerifier;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
index f24808b..e8bf239 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
index e4724b0..18a21a8 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
@@ -19,8 +19,8 @@
 package org.apache.streampipes.connect.container.master.management;
 
 import org.apache.streampipes.connect.adapter.AdapterRegistry;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
+import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
 import org.apache.streampipes.model.connect.grounding.FormatDescriptionList;
@@ -50,11 +50,11 @@ public class DescriptionManagement {
     }
 
     public FormatDescriptionList getFormats() {
-        Map<String, Format> allFormats = AdapterRegistry.getAllFormats();
+        Map<String, IFormat> allFormats = AdapterRegistry.getAllFormats();
 
         FormatDescriptionList result = new FormatDescriptionList();
 
-        for (Format f : allFormats.values()) {
+        for (IFormat f : allFormats.values()) {
            result.getList().add(f.declareModel());
         }
 
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
index aac8f65..e825369 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
@@ -25,9 +25,9 @@ import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
 import org.apache.http.util.EntityUtils;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.connect.adapter.exception.WorkerAdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.WorkerAdapterException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.message.ErrorMessage;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index 81f6d4b..94f9148 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
 import org.apache.streampipes.container.html.JSONGenerator;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagement.java
index 39c636a..a31e30c 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagement.java
@@ -22,7 +22,7 @@ import com.github.jqudt.Unit;
 import com.google.gson.Gson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.unit.UnitDescription;
 import org.apache.streampipes.units.UnitCollector;
 import org.apache.streampipes.units.UnitProvider;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index fa231a9..ed3e99a 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.connect.container.master.management;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 74ccaab..fb91f5f 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.connect.container.master.management;
 
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
index 8f49202..8a9887d 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.connect.container.master.management.Utils;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterTemplateResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterTemplateResource.java
index a601c17..3b2d0bd 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterTemplateResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterTemplateResource.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterTemplateMasterManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
index 0105f73..7132e09 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.DescriptionManagement;
 import org.apache.streampipes.connect.container.master.management.Utils;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/GuessResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/GuessResource.java
index 3cd744e..340a7b3 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/GuessResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/GuessResource.java
@@ -18,8 +18,8 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.connect.adapter.exception.WorkerAdapterException;
+import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.WorkerAdapterException;
 import org.apache.streampipes.connect.container.master.management.GuessManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
index 1cc2d75..df640df 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.WorkerAdministrationManagement;
 import org.apache.streampipes.connect.container.master.management.WorkerRestClient;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
index bd73114..9cfb19b 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.SourcesManagement;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.message.Notifications;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/UnitResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/UnitResource.java
index fe4037e..888803d 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/UnitResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/UnitResource.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.UnitMasterManagement;
 import org.apache.streampipes.model.connect.unit.UnitDescription;
 import org.slf4j.Logger;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WelcomePageMaster.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WelcomePageMaster.java
index e0bc80b..7fa69da 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WelcomePageMaster.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WelcomePageMaster.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.connect.container.master.rest;
 
 
 import org.apache.streampipes.connect.adapter.GroundingService;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.storage.couchdb.utils.CouchDbConfig;
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
index b39252a..19ca5f5 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
 import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
index 1081104..6fc7133 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
index dd9a803..58f82ed 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.connect.container.master.management;
 
 import org.apache.streampipes.connect.adapter.AdapterRegistry;
 import org.apache.streampipes.connect.adapter.format.json.arraykey.JsonFormat;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
+import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.model.connect.grounding.FormatDescriptionList;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -44,7 +44,7 @@ public class DescriptionManagementTest {
 
     @Test
     public void getFormats() {
-        Map<String, Format> allFormats = new HashMap<>();
+        Map<String, IFormat> allFormats = new HashMap<>();
         allFormats.put(JsonFormat.ID, new JsonFormat());
 
         PowerMockito.mockStatic(AdapterRegistry.class);
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index 16a64d2..a11fa93 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagementTest.java
index 0b14141..0c8f8bb 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagementTest.java
@@ -33,7 +33,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.unit.UnitDescription;
 import org.apache.streampipes.units.UnitProvider;
 
@@ -123,4 +123,4 @@ public class UnitMasterManagementTest {
         return unitDescription;
     }
 
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
index 5999b60..ae186f3 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
 import org.junit.Before;
diff --git a/streampipes-connect-container-worker/pom.xml b/streampipes-connect-container-worker/pom.xml
index 4d563dd..5ee292b 100644
--- a/streampipes-connect-container-worker/pom.xml
+++ b/streampipes-connect-container-worker/pom.xml
@@ -41,6 +41,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-container</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-container-base</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterDeclarerSingleton.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterDeclarerSingleton.java
deleted file mode 100644
index ea3d872..0000000
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterDeclarerSingleton.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.container.worker.init;
-
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.model.Connector;
-import org.apache.streampipes.connect.adapter.model.generic.GenericDataSetAdapter;
-import org.apache.streampipes.connect.adapter.model.generic.GenericDataStreamAdapter;
-import org.apache.streampipes.connect.adapter.model.generic.Protocol;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-public class AdapterDeclarerSingleton {
-
-    private Map<String, Protocol> allProtocols;
-    private Map<String, Adapter> allAdapters;
-
-    private static AdapterDeclarerSingleton instance;
-
-    public AdapterDeclarerSingleton() {
-        this.allProtocols = new HashMap<>();
-        this.allAdapters = new HashMap<>();
-        this.allAdapters.put(GenericAdapterStreamDescription.ID, new GenericDataStreamAdapter());
-        this.allAdapters.put(GenericAdapterSetDescription.ID, new GenericDataSetAdapter());
-    }
-
-    public static AdapterDeclarerSingleton getInstance() {
-        if (AdapterDeclarerSingleton.instance == null) {
-            AdapterDeclarerSingleton.instance = new AdapterDeclarerSingleton();
-        }
-
-        return AdapterDeclarerSingleton.instance;
-    }
-
-    public AdapterDeclarerSingleton add(Connector c) {
-
-        if (c instanceof Protocol) {
-            this.allProtocols.put(((Protocol) c).getId(), (Protocol) c);
-        } else if (c instanceof Adapter) {
-            this.allAdapters.put(((Adapter) c).getId(), (Adapter) c);
-        }
-
-        return getInstance();
-    }
-
-    public Map<String, Protocol> getAllProtocolsMap() {
-        return this.allProtocols;
-    }
-
-    public Map<String, Adapter> getAllAdaptersMap() {
-        return this.allAdapters;
-    }
-
-
-    public Collection<Protocol> getAllProtocols() {
-        return this.allProtocols.values();
-    }
-
-    public Collection<Adapter> getAllAdapters() {
-        return this.allAdapters.values();
-    }
-
-    public Protocol getProtocol(String id) {
-        return getAllProtocols().stream()
-                .filter(protocol -> protocol.getId().equals(id))
-                .findAny()
-                .orElse(null);
-    }
-
-    public Adapter getAdapter(String id) {
-        return getAllAdapters().stream()
-                .filter(adapter -> adapter.getId().equals(id))
-                .findAny()
-                .orElse(null);
-    }
-
-}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
similarity index 52%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
copy to streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
index b20ce52..2db077a 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
@@ -15,37 +15,26 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.connect.container.worker.init;
 
-package org.apache.streampipes.connect;
+import org.apache.streampipes.connect.container.worker.rest.*;
+import org.apache.streampipes.container.init.ExtensionsResourceProvider;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
-
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-public class GetNEvents implements EmitBinaryEvent {
-
-    private int n;
-    private List<byte[]> events;
-
-    public GetNEvents(int n) {
-        this.n = n;
-        this.events = new ArrayList<>();
-    }
-
-    @Override
-    public Boolean emit(byte[] event) {
-
-        events.add(event);
-        this.n = n - 1;
-
-        if (n == 0) {
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-    public List<byte[]> getEvents() {
-        return events;
-    }
+public class AdapterServiceResourceProvider implements ExtensionsResourceProvider {
+
+  @Override
+  public List<Class<?>> getResourceClasses() {
+    return Arrays.asList(WelcomePageWorker.class,
+            GuessResource.class,
+            RuntimeResolvableResource.class,
+            WorkerResource.class,
+            MultiPartFeature.class,
+            AdapterResource.class,
+            ProtocolResource.class,
+            HttpServerAdapterResource.class);
+  }
 }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
index a0f86a0..6fda314 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
@@ -18,21 +18,22 @@
 
 package org.apache.streampipes.connect.container.worker.init;
 
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IProtocol;
+import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
+import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.locales.LabelGenerator;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.model.generic.Protocol;
-import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -77,13 +78,13 @@ public abstract class AdapterWorkerContainer {
   private ConnectWorkerContainer getContainerDescription(String endpointUrl) {
 
     List<AdapterDescription> adapters = new ArrayList<>();
-    for (Adapter a : AdapterDeclarerSingleton.getInstance().getAllAdapters()) {
+    for (IAdapter<?> a : DeclarersSingleton.getInstance().getAllAdapters()) {
       AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel(), endpointUrl);
       adapters.add(desc);
     }
 
     List<ProtocolDescription> protocols = new ArrayList<>();
-    for (Protocol p : AdapterDeclarerSingleton.getInstance().getAllProtocols()) {
+    for (IProtocol p : DeclarersSingleton.getInstance().getAllProtocols()) {
       ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel(), endpointUrl);
       protocols.add(desc);
     }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainerResourceConfig.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainerResourceConfig.java
index 2897876..07b0d29 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainerResourceConfig.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainerResourceConfig.java
@@ -16,26 +16,20 @@
  */
 package org.apache.streampipes.connect.container.worker.init;
 
-import org.apache.streampipes.connect.container.worker.rest.*;
-import org.apache.streampipes.rest.shared.serializer.JacksonSerializationProvider;
-import org.glassfish.jersey.media.multipart.MultiPartFeature;
-import org.glassfish.jersey.server.ResourceConfig;
+import org.apache.streampipes.container.base.rest.BaseResourceConfig;
+import org.apache.streampipes.container.init.BaseExtensionsServiceResourceProvider;
 import org.springframework.stereotype.Component;
 
-@Component
-public class AdapterWorkerContainerResourceConfig extends ResourceConfig {
+import java.util.Arrays;
+import java.util.List;
 
-  public AdapterWorkerContainerResourceConfig() {
-    super();
-    register(WelcomePageWorker.class);
-    register(GuessResource.class);
-    register(RuntimeResolvableResource.class);
-    register(WorkerResource.class);
-    register(MultiPartFeature.class);
-    register(AdapterResource.class);
-    register(ProtocolResource.class);
-    register(HttpServerAdapterResource.class);
+@Component
+public class AdapterWorkerContainerResourceConfig extends BaseResourceConfig {
 
-    register(JacksonSerializationProvider.class);
+  @Override
+  public List<List<Class<?>>> getClassesToRegister() {
+    return Arrays.asList(
+            new BaseExtensionsServiceResourceProvider().getResourceClasses(),
+            new AdapterServiceResourceProvider().getResourceClasses());
   }
 }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 620ab48..0ff1dd0 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -20,12 +20,12 @@ package org.apache.streampipes.connect.container.worker.management;
 
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.connect.RunningAdapterInstances;
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.model.generic.GenericAdapter;
-import org.apache.streampipes.connect.adapter.model.generic.Protocol;
-import org.apache.streampipes.connect.container.worker.init.AdapterDeclarerSingleton;
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IProtocol;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.worker.utils.AdapterUtils;
+import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.connect.adapter.*;
 import org.slf4j.Logger;
@@ -37,35 +37,35 @@ public class AdapterWorkerManagement {
 
     private static final Logger logger = LoggerFactory.getLogger(AdapterWorkerManagement.class);
 
-    public Collection<Protocol> getAllProtocols() {
-        return AdapterDeclarerSingleton.getInstance().getAllProtocols();
+    public Collection<IProtocol> getAllProtocols() {
+        return DeclarersSingleton.getInstance().getAllProtocols();
     }
 
-    public Protocol getProtocol(String id) {
-        return AdapterDeclarerSingleton.getInstance().getProtocol(id);
+    public IProtocol getProtocol(String id) {
+        return DeclarersSingleton.getInstance().getProtocol(id);
     }
 
-    public Collection<Adapter> getAllAdapters() {
-        return AdapterDeclarerSingleton.getInstance().getAllAdapters();
+    public Collection<IAdapter> getAllAdapters() {
+        return DeclarersSingleton.getInstance().getAllAdapters();
     }
 
-    public Adapter getAdapter(String id) {
-        return AdapterDeclarerSingleton.getInstance().getAdapter(id);
+    public IAdapter<?> getAdapter(String id) {
+        return DeclarersSingleton.getInstance().getAdapter(id);
     }
 
     public void invokeStreamAdapter(AdapterStreamDescription adapterStreamDescription) throws AdapterException {
 
 
 //        Adapter adapter = AdapterDeclarerSingleton.getInstance().getAdapter(adapterStreamDescription.getAppId());
-       Adapter adapter = AdapterUtils.setAdapter(adapterStreamDescription);
+       IAdapter<?> adapter = AdapterUtils.setAdapter(adapterStreamDescription);
 
-        Protocol protocol = null;
+        IProtocol protocol = null;
         if (adapterStreamDescription instanceof GenericAdapterStreamDescription) {
             //TODO Need to check with ElementId?
             //protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterStreamDescription) adapterStreamDescription).getProtocolDescription().getElementId());
-            protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterStreamDescription) adapterStreamDescription).getProtocolDescription().getAppId());
+            protocol = DeclarersSingleton.getInstance().getProtocol(((GenericAdapterStreamDescription) adapterStreamDescription).getProtocolDescription().getAppId());
             if (protocol == null) {
-                protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterStreamDescription) adapterStreamDescription).getProtocolDescription().getAppId());
+                protocol = DeclarersSingleton.getInstance().getProtocol(((GenericAdapterStreamDescription) adapterStreamDescription).getProtocolDescription().getAppId());
             }
             ((GenericAdapter) adapter).setProtocol(protocol);
         }
@@ -82,12 +82,11 @@ public class AdapterWorkerManagement {
 
     public void invokeSetAdapter (AdapterSetDescription adapterSetDescription) throws AdapterException {
 
-//        Adapter adapter = AdapterDeclarerSingleton.getInstance().getAdapter(adapterSetDescription.getAppId());
-        Adapter adapter = AdapterUtils.setAdapter(adapterSetDescription);
+        IAdapter<?> adapter = AdapterUtils.setAdapter(adapterSetDescription);
 
-        Protocol protocol = null;
+        IProtocol protocol = null;
         if (adapterSetDescription instanceof GenericAdapterSetDescription) {
-            protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterSetDescription) adapterSetDescription).getProtocolDescription().getAppId());
+            protocol = DeclarersSingleton.getInstance().getProtocol(((GenericAdapterSetDescription) adapterSetDescription).getProtocolDescription().getAppId());
             ((GenericAdapter) adapter).setProtocol(protocol);
         }
 
@@ -131,7 +130,7 @@ public class AdapterWorkerManagement {
 
         String adapterUri = adapterDescription.getUri();
 
-        Adapter adapter = RunningAdapterInstances.INSTANCE.removeAdapter(adapterUri);
+        IAdapter<?> adapter = RunningAdapterInstances.INSTANCE.removeAdapter(adapterUri);
 
         if (adapter == null) {
             throw new AdapterException("Adapter with id " + adapterUri + " was not found in this container and cannot be stopped.");
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
index cac2312..ce91faa 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
@@ -18,15 +18,15 @@
 
 package org.apache.streampipes.connect.container.worker.management;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.container.worker.utils.AdapterUtils;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Optional;
@@ -38,7 +38,7 @@ public class GuessManagement {
     public GuessSchema guessSchema(AdapterDescription adapterDescription) throws AdapterException, ParseException {
 
         LOG.info("Start guessing schema for: " + adapterDescription.getAppId());
-        Adapter adapter = AdapterUtils.setAdapter(adapterDescription);
+        IAdapter adapter = AdapterUtils.setAdapter(adapterDescription);
 
         GuessSchema guessSchema;
         try {
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/RuntimeResovable.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/RuntimeResovable.java
index 9208973..a4252ce 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/RuntimeResovable.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/RuntimeResovable.java
@@ -18,12 +18,12 @@
 
 package org.apache.streampipes.connect.container.worker.management;
 
-import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.AdapterRegistry;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
-import org.apache.streampipes.connect.adapter.model.generic.Protocol;
-import org.apache.streampipes.connect.container.worker.init.AdapterDeclarerSingleton;
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.container.init.DeclarersSingleton;
 
 import java.util.Map;
 
@@ -33,7 +33,7 @@ public class RuntimeResovable {
 
     public static ResolvesContainerProvidedOptions getRuntimeResolvableFormat(String id) throws IllegalArgumentException {
         id = id.replaceAll("sp:", SP_NS);
-        Map<String, Format> allFormats = AdapterRegistry.getAllFormats();
+        Map<String, IFormat> allFormats = AdapterRegistry.getAllFormats();
 
         if (allFormats.containsKey(id)) {
             return (ResolvesContainerProvidedOptions) allFormats.get(id);
@@ -44,8 +44,8 @@ public class RuntimeResovable {
 
      public static ResolvesContainerProvidedOptions getRuntimeResolvableAdapter(String id) throws IllegalArgumentException {
         id = id.replaceAll("sp:", SP_NS);
-        Map<String, Adapter> allAdapters = AdapterDeclarerSingleton.getInstance().getAllAdaptersMap();
-        Map<String, Protocol> allProtocols =  AdapterDeclarerSingleton.getInstance().getAllProtocolsMap();
+        Map<String, IAdapter> allAdapters = DeclarersSingleton.getInstance().getAllAdaptersMap();
+        Map<String, IProtocol> allProtocols =  DeclarersSingleton.getInstance().getAllProtocolsMap();
 
         if (allAdapters.containsKey(id)) {
             return (ResolvesContainerProvidedOptions) allAdapters.get(id);
@@ -55,5 +55,4 @@ public class RuntimeResovable {
             throw new IllegalArgumentException("Could not find adapter with id " + id);
         }
     }
-
 }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
index 75527c4..8642e86 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.worker.rest;
 
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.container.worker.management.GuessManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
index ecfe056..7237360 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
@@ -19,8 +19,8 @@
 package org.apache.streampipes.connect.container.worker.rest;
 
 
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.model.generic.Protocol;
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.connect.container.worker.management.AdapterWorkerManagement;
 import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
 import org.rendersnake.HtmlCanvas;
@@ -79,11 +79,11 @@ public class WelcomePageWorker extends AbstractSharedRestInterface {
     }
 
     private HtmlCanvas getAllRegisteredProtocols(HtmlCanvas canvas) throws IOException {
-        Collection<Protocol> protocols = this.adapterWorkerManagement.getAllProtocols();
+        Collection<IProtocol> protocols = this.adapterWorkerManagement.getAllProtocols();
 
         canvas.h2().write("Protocols")._h2().ol();
 
-        for (Protocol p : protocols) {
+        for (IProtocol p : protocols) {
             canvas.li().write(p.getId())._li();
         }
 
@@ -93,11 +93,11 @@ public class WelcomePageWorker extends AbstractSharedRestInterface {
     }
 
     private HtmlCanvas getAllRegisteredAdapters(HtmlCanvas canvas) throws IOException {
-        Collection<Adapter> adapters = this.adapterWorkerManagement.getAllAdapters();
+        Collection<IAdapter> adapters = this.adapterWorkerManagement.getAllAdapters();
 
         canvas.h2().write("Adapters")._h2().ol();
 
-        for (Adapter a : adapters) {
+        for (IAdapter a : adapters) {
             canvas.li().write(a.getId())._li();
         }
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
index e608caa..1397fd2 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.container.worker.rest;
 
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.worker.management.AdapterWorkerManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
index 3dd9e8d..3ca0ecb 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
@@ -19,17 +19,17 @@
 package org.apache.streampipes.connect.container.worker.utils;
 
 import org.apache.http.client.fluent.Request;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.model.generic.GenericAdapter;
 import org.apache.streampipes.connect.adapter.model.generic.GenericDataSetAdapter;
 import org.apache.streampipes.connect.adapter.model.generic.GenericDataStreamAdapter;
-import org.apache.streampipes.connect.adapter.model.generic.Protocol;
-import org.apache.streampipes.connect.container.worker.init.AdapterDeclarerSingleton;
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IProtocol;
+import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -59,28 +59,30 @@ public class AdapterUtils {
         return "http://" +baseUrl + "api/v2/pipelines/" + pipelineId + "/stopAdapter";
     }
 
-    public static Adapter setAdapter(AdapterDescription adapterDescription) {
-        Adapter adapter = null;
+    public static IAdapter setAdapter(AdapterDescription adapterDescription) {
+        IAdapter adapter = null;
 
         if (adapterDescription instanceof GenericAdapterStreamDescription) {
-           adapter = new GenericDataStreamAdapter().getInstance((GenericAdapterStreamDescription) adapterDescription);
+           adapter = (IAdapter<?>) new GenericDataStreamAdapter().getInstance((GenericAdapterStreamDescription) adapterDescription);
         } else if (adapterDescription instanceof GenericAdapterSetDescription) {
-            adapter = new GenericDataSetAdapter().getInstance((GenericAdapterSetDescription) adapterDescription);
+            adapter = (IAdapter<?>) new GenericDataSetAdapter().getInstance((GenericAdapterSetDescription) adapterDescription);
         }
 
-        Protocol protocol = null;
+        IProtocol protocol = null;
         if (adapterDescription instanceof GenericAdapterSetDescription) {
-            protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterSetDescription) adapterDescription).getProtocolDescription().getAppId());
+            protocol = DeclarersSingleton.getInstance().getProtocol(((GenericAdapterSetDescription) adapterDescription).getProtocolDescription().getAppId());
             ((GenericAdapter) adapter).setProtocol(protocol);
         }
 
         if (adapterDescription instanceof GenericAdapterStreamDescription) {
-            protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterStreamDescription) adapterDescription).getProtocolDescription().getAppId());
+            protocol = DeclarersSingleton.getInstance().getProtocol(((GenericAdapterStreamDescription) adapterDescription).getProtocolDescription().getAppId());
             ((GenericAdapter) adapter).setProtocol(protocol);
         }
 
         if (adapter == null) {
-            adapter = AdapterDeclarerSingleton.getInstance().getAdapter(adapterDescription.getAppId()).getInstance(adapterDescription);
+            adapter = DeclarersSingleton
+                    .getInstance()
+                    .getAdapter(adapterDescription.getAppId()).getInstance(adapterDescription);
         }
 
         return adapter;
diff --git a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java b/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
index 947545a..cf579c3 100644
--- a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
+++ b/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
@@ -28,7 +28,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.apache.streampipes.connect.RunningAdapterInstances;
 import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.AdapterRegistry;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.model.specific.SpecificDataSetAdapter;
 import org.apache.streampipes.connect.container.worker.utils.Utils;
 import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
@@ -147,4 +147,4 @@ public class AdapterWorkerManagementTest {
         }
 
     }
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect/pom.xml b/streampipes-connect/pom.xml
index cf12739..22f04ea 100755
--- a/streampipes-connect/pom.xml
+++ b/streampipes-connect/pom.xml
@@ -36,7 +36,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-container</artifactId>
+            <artifactId>streampipes-connect-api</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
         <dependency>
@@ -168,4 +168,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
index b20ce52..d9cffcc 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
@@ -19,6 +19,8 @@
 package org.apache.streampipes.connect;
 
 
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -38,11 +40,7 @@ public class GetNEvents implements EmitBinaryEvent {
         events.add(event);
         this.n = n - 1;
 
-        if (n == 0) {
-            return false;
-        } else {
-            return true;
-        }
+        return n != 0;
     }
 
     public List<byte[]> getEvents() {
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
index 03aa4f4..2ac2058 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect;
 
-import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.api.IAdapter;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -26,14 +26,14 @@ import java.util.Map;
 public enum RunningAdapterInstances {
     INSTANCE;
 
-    private final Map<String, Adapter> runningInstances = new HashMap<>();
+    private final Map<String, IAdapter<?>> runningInstances = new HashMap<>();
 
-    public void addAdapter(String id, Adapter adapter) {
+    public void addAdapter(String id, IAdapter<?> adapter) {
         runningInstances.put(id, adapter);
     }
 
-    public Adapter removeAdapter(String id) {
-        Adapter result = runningInstances.get(id);
+    public IAdapter<?> removeAdapter(String id) {
+        IAdapter<?> result = runningInstances.get(id);
         runningInstances.remove(id);
         return result;
     }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/SendToPipeline.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/SendToPipeline.java
index d6fb210..9a40b25 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/SendToPipeline.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/SendToPipeline.java
@@ -19,31 +19,32 @@
 package org.apache.streampipes.connect;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.IAdapterPipeline;
+import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 
 import java.util.Map;
 
 public class SendToPipeline implements EmitBinaryEvent {
 
-    private Format format;
+    private IFormat format;
 
     private SpKafkaProducer producer;
     private ObjectMapper objectMapper;
 
-    private AdapterPipeline adapterPipeline;
+    private IAdapterPipeline adapterPipeline;
 
     @Deprecated
     // TODO remove
-    public SendToPipeline(Format format, String brokerUrl, String topic) {
+    public SendToPipeline(IFormat format, String brokerUrl, String topic) {
         this.format = format;
 
         producer = new SpKafkaProducer(brokerUrl, topic);
         objectMapper = new ObjectMapper();
     }
 
-    public SendToPipeline(Format format, AdapterPipeline adapterPipeline) {
+    public SendToPipeline(IFormat format, IAdapterPipeline adapterPipeline) {
        this.format = format;
        this.adapterPipeline = adapterPipeline;
     }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index 28258ec..e6336ca 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -18,33 +18,30 @@
 
 package org.apache.streampipes.connect.adapter;
 
-import org.apache.streampipes.connect.adapter.preprocessing.elements.*;
-import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.config.backend.SpProtocol;
-import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.MqttTransportProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.Connector;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
+import org.apache.streampipes.connect.adapter.preprocessing.elements.*;
+import org.apache.streampipes.connect.api.IAdapter;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.stream.RemoveDuplicatesTransformationRuleDescription;
-import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription;
 import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
+import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public abstract class Adapter<T extends AdapterDescription> implements Connector {
+public abstract class Adapter<T extends AdapterDescription> implements IAdapter<T> {
     Logger logger = LoggerFactory.getLogger(Adapter.class);
 
     private boolean debug;
@@ -71,19 +68,7 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
         this(false);
     }
 
-    public abstract T declareModel();
-
-    // Decide which adapter to call
-    public abstract void startAdapter() throws AdapterException;
-
-    public abstract void stopAdapter() throws AdapterException;
-
-    public abstract Adapter getInstance(T adapterDescription);
-
-    public abstract GuessSchema getSchema(T adapterDescription) throws AdapterException, ParseException;
-
-    public abstract String getId();
-
+    @Override
     public void changeEventGrounding(TransportProtocol transportProtocol) {
 
         if (transportProtocol instanceof JmsTransportProtocol) {
@@ -114,7 +99,7 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
 
     private AdapterPipeline getAdapterPipeline(T adapterDescription) {
 
-        List<AdapterPipelineElement> pipelineElements = new ArrayList<>();
+        List<IAdapterPipelineElement> pipelineElements = new ArrayList<>();
 
         // Must be before the schema transformations to ensure that user can move this event property
         AddTimestampRuleDescription timestampTransformationRuleDescription = getTimestampRule(adapterDescription);
@@ -206,6 +191,7 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
         return null;
     }
 
+    @Override
     public boolean isDebug() {
         return debug;
     }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterRegistry.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterRegistry.java
index 7a70b9b..3447caa 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterRegistry.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterRegistry.java
@@ -32,8 +32,8 @@ import org.apache.streampipes.connect.adapter.format.json.object.JsonObjectForma
 import org.apache.streampipes.connect.adapter.format.json.object.JsonObjectParser;
 import org.apache.streampipes.connect.adapter.format.xml.XmlFormat;
 import org.apache.streampipes.connect.adapter.format.xml.XmlParser;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
-import org.apache.streampipes.connect.adapter.model.generic.Parser;
+import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.IParser;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -43,8 +43,8 @@ import java.util.Map;
  */
 public class AdapterRegistry {
 
-  public static Map<String, Format> getAllFormats() {
-    Map<String, Format> allFormats = new HashMap<>();
+  public static Map<String, IFormat> getAllFormats() {
+    Map<String, IFormat> allFormats = new HashMap<>();
 
     allFormats.put(JsonFormat.ID, new JsonFormat());
     allFormats.put(JsonObjectFormat.ID, new JsonObjectFormat());
@@ -57,8 +57,8 @@ public class AdapterRegistry {
     return allFormats;
   }
 
-  public static Map<String, Parser> getAllParsers() {
-    Map<String, Parser> allParsers = new HashMap<>();
+  public static Map<String, IParser> getAllParsers() {
+    Map<String, IParser> allParsers = new HashMap<>();
 
     allParsers.put(JsonFormat.ID, new JsonParser());
     allParsers.put(JsonObjectFormat.ID, new JsonObjectParser());
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvFormat.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvFormat.java
index 78bab13..e0ebe55 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvFormat.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvFormat.java
@@ -20,9 +20,9 @@ package org.apache.streampipes.connect.adapter.format.csv;
 
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
+import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.sdk.builder.adapter.FormatDescriptionBuilder;
@@ -32,7 +32,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-public class CsvFormat extends Format {
+public class CsvFormat implements IFormat {
 
     public static String HEADER_NAME = "header";
     public static String DELIMITER_NAME = "delimiter";
@@ -53,7 +53,7 @@ public class CsvFormat extends Format {
     }
 
     @Override
-    public Format getInstance(FormatDescription formatDescription) {
+    public IFormat getInstance(FormatDescription formatDescription) {
         ParameterExtractor extractor = new ParameterExtractor(formatDescription.getConfig());
         String delimiter = extractor.singleValue(DELIMITER_NAME);
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
index b8ba3e2..4f6b873 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
@@ -19,10 +19,10 @@
 package org.apache.streampipes.connect.adapter.format.csv;
 
 
-import org.apache.streampipes.connect.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.schema.EventSchema;
@@ -197,4 +197,4 @@ public class CsvParser extends Parser {
 
         return result.toArray(new String[0]);
     }
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/geojson/GeoJsonFormat.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/geojson/GeoJsonFormat.java
index 6e232d0..40b4152 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/geojson/GeoJsonFormat.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/geojson/GeoJsonFormat.java
@@ -21,8 +21,8 @@ package org.apache.streampipes.connect.adapter.format.geojson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.sdk.builder.adapter.FormatDescriptionBuilder;
@@ -31,7 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class GeoJsonFormat extends Format {
+public class GeoJsonFormat implements IFormat {
 
     public static final String ID = "https://streampipes.org/vocabulary/v1/format/geojson";
     private static final Logger logger = LoggerFactory.getLogger(GeoJsonFormat.class);
@@ -45,7 +45,7 @@ public class GeoJsonFormat extends Format {
     }
 
     @Override
-    public Format getInstance(FormatDescription formatDescription) {
+    public IFormat getInstance(FormatDescription formatDescription) {
        return new GeoJsonFormat();
     }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/geojson/GeoJsonParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/geojson/GeoJsonParser.java
index d31dde5..292d308 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/geojson/GeoJsonParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/geojson/GeoJsonParser.java
@@ -25,10 +25,10 @@ import com.google.gson.Gson;
 import org.geojson.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.connect.adapter.format.util.JsonEventProperty;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.model.schema.*;
 import org.apache.streampipes.vocabulary.SO;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/image/ImageFormat.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/image/ImageFormat.java
index 55d9c13..8475092 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/image/ImageFormat.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/image/ImageFormat.java
@@ -18,8 +18,8 @@
 
 package org.apache.streampipes.connect.adapter.format.image;
 
-import org.apache.streampipes.connect.adapter.model.generic.Format;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.sdk.builder.adapter.FormatDescriptionBuilder;
 
@@ -27,7 +27,7 @@ import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
 
-public class ImageFormat extends Format {
+public class ImageFormat implements IFormat {
 
     public static final String ID = "https://streampipes.org/vocabulary/v1/format/image";
 
@@ -36,7 +36,7 @@ public class ImageFormat extends Format {
     }
 
     @Override
-    public Format getInstance(FormatDescription formatDescription) {
+    public IFormat getInstance(FormatDescription formatDescription) {
         return new ImageFormat();
     }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/image/ImageParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/image/ImageParser.java
index 8fe436c..2eaa0bb 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/image/ImageParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/image/ImageParser.java
@@ -19,18 +19,14 @@
 package org.apache.streampipes.connect.adapter.format.image;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.streampipes.connect.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.vocabulary.XSD;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URI;
-import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.streampipes.sdk.helpers.EpProperties.imageProperty;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/AbstractJsonFormat.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/AbstractJsonFormat.java
index c259ee4..eaf48a9 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/AbstractJsonFormat.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/AbstractJsonFormat.java
@@ -18,14 +18,14 @@
 package org.apache.streampipes.connect.adapter.format.json;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.schema.EventSchema;
 
 import java.util.Map;
 
-public abstract class AbstractJsonFormat extends Format {
+public abstract class AbstractJsonFormat implements IFormat {
 
   @Override
   public Map<String, Object> parse(byte[] object) throws ParseException {
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonFormat.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonFormat.java
index 4e58b2e..f226c06 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonFormat.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonFormat.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.connect.adapter.format.json.arraykey;
 
 
-import org.apache.streampipes.connect.adapter.model.generic.Format;
+import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.adapter.format.json.AbstractJsonFormat;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.sdk.builder.adapter.FormatDescriptionBuilder;
@@ -30,7 +30,7 @@ public class JsonFormat extends AbstractJsonFormat {
     public static final String ID = "https://streampipes.org/vocabulary/v1/format/json/arraykey";
 
     @Override
-    public Format getInstance(FormatDescription formatDescription) {
+    public IFormat getInstance(FormatDescription formatDescription) {
         return new JsonFormat();
     }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonParser.java
index 3273ffa..5648525 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonParser.java
@@ -22,11 +22,11 @@ package org.apache.streampipes.connect.adapter.format.json.arraykey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.connect.adapter.format.util.JsonEventProperty;
 import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.model.schema.EventProperty;
@@ -231,4 +231,4 @@ public class JsonParser extends Parser {
 
     return result;
   }
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayFormat.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayFormat.java
index f7a46eb..a956832 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayFormat.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayFormat.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.connect.adapter.format.json.arraynokey;
 
 
-import org.apache.streampipes.connect.adapter.model.generic.Format;
+import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.adapter.format.json.AbstractJsonFormat;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.sdk.builder.adapter.FormatDescriptionBuilder;
@@ -29,7 +29,7 @@ public class JsonArrayFormat extends AbstractJsonFormat {
     public static final String ID = "https://streampipes.org/vocabulary/v1/format/json/arraynokey";
 
     @Override
-    public Format getInstance(FormatDescription formatDescription) {
+    public IFormat getInstance(FormatDescription formatDescription) {
         return new JsonArrayFormat();
     }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayParser.java
index 7e09adb..74d5195 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayParser.java
@@ -22,8 +22,8 @@ package org.apache.streampipes.connect.adapter.format.json.arraynokey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.EmitBinaryEvent;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.adapter.format.util.JsonEventProperty;
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
@@ -207,4 +207,4 @@ public class JsonArrayParser extends Parser {
 
     return result;
   }
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectFormat.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectFormat.java
index d0862b1..9eeb69b 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectFormat.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectFormat.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.adapter.format.json.object;
 
-import org.apache.streampipes.connect.adapter.model.generic.Format;
+import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.adapter.format.json.AbstractJsonFormat;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.sdk.builder.adapter.FormatDescriptionBuilder;
@@ -28,7 +28,7 @@ public class JsonObjectFormat extends AbstractJsonFormat {
   public static final String ID = "https://streampipes.org/vocabulary/v1/format/json/object";
 
   @Override
-  public Format getInstance(FormatDescription formatDescription) {
+  public IFormat getInstance(FormatDescription formatDescription) {
     return new JsonObjectFormat();
   }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
index b4def0d..3419bc9 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
@@ -23,10 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.connect.adapter.format.util.JsonEventProperty;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.model.schema.EventProperty;
@@ -188,4 +188,4 @@ public class JsonObjectParser extends Parser {
 
         return result;
     }
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/xml/XmlFormat.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/xml/XmlFormat.java
index 7c6051c..5004085 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/xml/XmlFormat.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/xml/XmlFormat.java
@@ -21,9 +21,9 @@ package org.apache.streampipes.connect.adapter.format.xml;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.adapter.model.generic.Format;
+import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.model.schema.EventSchema;
@@ -32,7 +32,7 @@ import org.apache.streampipes.sdk.helpers.Labels;
 
 import java.util.Map;
 
-public class XmlFormat extends Format {
+public class XmlFormat implements IFormat {
 
     public static String TAG_ID = "tag";
     public static final String ID = "https://streampipes.org/vocabulary/v1/format/xml";
@@ -49,7 +49,7 @@ public class XmlFormat extends Format {
     }
 
     @Override
-    public Format getInstance(FormatDescription formatDescription) {
+    public IFormat getInstance(FormatDescription formatDescription) {
         ParameterExtractor extractor = new ParameterExtractor(formatDescription.getConfig());
         String tag = extractor.singleValue(TAG_ID);
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/xml/XmlParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/xml/XmlParser.java
index 2124fb9..4520b67 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/xml/XmlParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/xml/XmlParser.java
@@ -26,8 +26,8 @@ import com.google.gson.Gson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.EmitBinaryEvent;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.adapter.format.util.JsonEventProperty;
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericAdapter.java
index 3be22d0..36d6284 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericAdapter.java
@@ -18,21 +18,25 @@
 
 package org.apache.streampipes.connect.adapter.model.generic;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.AdapterRegistry;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.model.connect.adapter.*;
+import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.IParser;
+import org.apache.streampipes.connect.api.IProtocol;
+import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.schema.EventSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class GenericAdapter<T extends AdapterDescription> extends Adapter<T> {
 
     private static final Logger logger = LoggerFactory.getLogger(Adapter.class);
-    protected Protocol protocol;
+    protected IProtocol protocol;
 
     public GenericAdapter(T adapterDescription) {
         super(adapterDescription);
@@ -48,20 +52,19 @@ public abstract class GenericAdapter<T extends AdapterDescription> extends Adapt
 
     public abstract GenericAdapterDescription getAdapterDescription();
 
-    public abstract void setProtocol(Protocol protocol);
+    public abstract void setProtocol(IProtocol protocol);
 
     @Override
     public void startAdapter() throws AdapterException {
 
         GenericAdapterDescription adapterDescription = getAdapterDescription();
 
-
-        Parser parser = getParser(adapterDescription);
-        Format format = getFormat(adapterDescription);
+        IParser parser = getParser(adapterDescription);
+        IFormat format = getFormat(adapterDescription);
 
         ProtocolDescription protocolDescription = ((GenericAdapterDescription) adapterDescription).getProtocolDescription();
 
-        Protocol protocolInstance = this.protocol.getInstance(protocolDescription, parser, format);
+        IProtocol protocolInstance = this.protocol.getInstance(protocolDescription, parser, format);
         this.protocol = protocolInstance;
 
         //TODO remove
@@ -75,25 +78,25 @@ public abstract class GenericAdapter<T extends AdapterDescription> extends Adapt
 
 
     @Override
-    public GuessSchema getSchema(T adapterDescription) throws AdapterException, ParseException {
-        Parser parser = getParser((GenericAdapterDescription) adapterDescription);
-        Format format = getFormat((GenericAdapterDescription) adapterDescription);
+    public GuessSchema getSchema(AdapterDescription adapterDescription) throws AdapterException, ParseException {
+        IParser parser = getParser((GenericAdapterDescription) adapterDescription);
+        IFormat format = getFormat((GenericAdapterDescription) adapterDescription);
 
         ProtocolDescription protocolDescription = ((GenericAdapterDescription) adapterDescription).getProtocolDescription();
 
-        Protocol protocolInstance = this.protocol.getInstance(protocolDescription, parser, format);
+        IProtocol protocolInstance = this.protocol.getInstance(protocolDescription, parser, format);
 
         logger.debug("Extract schema with format: " + format.getId() + " and " + protocol.getId());
 
         return protocolInstance.getGuessSchema();
     }
 
-    private Parser getParser(GenericAdapterDescription adapterDescription) throws AdapterException {
+    private IParser getParser(GenericAdapterDescription adapterDescription) throws AdapterException {
          if (adapterDescription.getFormatDescription() == null) throw new AdapterException("Format description of Adapter ist empty");
          return AdapterRegistry.getAllParsers().get(adapterDescription.getFormatDescription().getAppId()).getInstance(adapterDescription.getFormatDescription());
     }
 
-    private Format getFormat(GenericAdapterDescription adapterDescription) {
+    private IFormat getFormat(GenericAdapterDescription adapterDescription) {
         return AdapterRegistry.getAllFormats().get(adapterDescription.getFormatDescription().getAppId()).getInstance(adapterDescription.getFormatDescription());
     }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataSetAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataSetAdapter.java
index 39962fe..88c3172 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataSetAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataSetAdapter.java
@@ -18,11 +18,12 @@
 
 package org.apache.streampipes.connect.adapter.model.generic;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GenericDataSetAdapter extends GenericAdapter<GenericAdapterSetDescription> {
 
@@ -53,7 +54,7 @@ public class GenericDataSetAdapter extends GenericAdapter<GenericAdapterSetDescr
     }
 
     @Override
-    public Adapter getInstance(GenericAdapterSetDescription adapterDescription) {
+    public Adapter<GenericAdapterSetDescription> getInstance(GenericAdapterSetDescription adapterDescription) {
         return  new GenericDataSetAdapter(adapterDescription);
     }
 
@@ -72,7 +73,7 @@ public class GenericDataSetAdapter extends GenericAdapter<GenericAdapterSetDescr
     }
 
     @Override
-    public void setProtocol(Protocol protocol) {
+    public void setProtocol(IProtocol protocol) {
        this.protocol = protocol;
     }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataStreamAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataStreamAdapter.java
index 28179c1..a2efade 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataStreamAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataStreamAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.adapter.model.generic;
 
+import org.apache.streampipes.connect.api.IProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.connect.adapter.Adapter;
@@ -71,7 +72,7 @@ public class GenericDataStreamAdapter extends GenericAdapter<GenericAdapterStrea
     }
 
     @Override
-    public void setProtocol(Protocol protocol) {
+    public void setProtocol(IProtocol protocol) {
        this.protocol = protocol;
     }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Parser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Parser.java
index f807ebb..2c7a72f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Parser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Parser.java
@@ -20,20 +20,15 @@ package org.apache.streampipes.connect.adapter.model.generic;
 
 
 import org.apache.streampipes.connect.GetNEvents;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.model.connect.grounding.FormatDescription;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.connect.EmitBinaryEvent;
+import org.apache.streampipes.connect.api.IParser;
+import org.apache.streampipes.connect.api.exception.ParseException;
 
 import java.io.InputStream;
 import java.util.List;
 
-public abstract class Parser {
-
-    public abstract Parser getInstance(FormatDescription formatDescription);
-
-    public abstract void parse(InputStream data, EmitBinaryEvent emitBinaryEvent) throws ParseException;
+public abstract class Parser implements IParser {
 
+    @Override
     public List<byte[]> parseNEvents(InputStream data, int n) throws ParseException {
         GetNEvents gne = new GetNEvents(n);
 
@@ -49,10 +44,4 @@ public abstract class Parser {
         return gne.getEvents();
     }
 
-    /**
-     * Pass one event to Parser to get the event schema
-     * @param oneEvent
-     * @return
-     */
-    public abstract EventSchema getEventSchema(List<byte[]> oneEvent);
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Protocol.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Protocol.java
index ed12a22..1e7384c 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Protocol.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/Protocol.java
@@ -18,21 +18,16 @@
 
 package org.apache.streampipes.connect.adapter.model.generic;
 
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.Connector;
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
-import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.IParser;
+import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.model.schema.EventSchema;
 
-import java.util.List;
-import java.util.Map;
 
+public abstract class Protocol implements IProtocol {
 
-public abstract class Protocol implements Connector {
-
-    protected Parser parser;
-    protected Format format;
+    protected IParser parser;
+    protected IFormat format;
 
     //TODO remove
     protected EventSchema eventSchema;
@@ -41,29 +36,13 @@ public abstract class Protocol implements Connector {
 
     }
 
-    public Protocol(Parser parser, Format format) {
+    public Protocol(IParser parser, IFormat format) {
         this.parser = parser;
         this.format = format;
     }
 
-    public abstract Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format);
-
-    public abstract ProtocolDescription declareModel();
-
-    public abstract GuessSchema getGuessSchema() throws ParseException;
-
-    public abstract List<Map<String, Object>> getNElements(int n) throws ParseException;
-
-    public abstract void run(AdapterPipeline adapterPipeline);
-
-    /*
-       Stops the running protocol. Mainly relevant for streaming protocols
-     */
-    public abstract void stop();
-
-    public abstract String getId();
-
     //TODO remove
+    @Override
     public void setEventSchema(EventSchema eventSchema) {
         this.eventSchema = eventSchema;
     }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
index 43a537e..98613c8 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -18,24 +18,28 @@
 
 package org.apache.streampipes.connect.adapter.model.pipeline;
 
+import org.apache.streampipes.connect.api.IAdapterPipeline;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
+
 import java.util.List;
 import java.util.Map;
 
-public class AdapterPipeline {
+public class AdapterPipeline implements IAdapterPipeline {
 
-    private List<AdapterPipelineElement> pipelineElements;
-    private AdapterPipelineElement pipelineSink;
+    private List<IAdapterPipelineElement> pipelineElements;
+    private IAdapterPipelineElement pipelineSink;
 
 
-    public AdapterPipeline(List<AdapterPipelineElement> pipelineElements) {
+    public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements) {
         this.pipelineElements = pipelineElements;
     }
 
-    public AdapterPipeline(List<AdapterPipelineElement> pipelineElements, AdapterPipelineElement pipelineSink) {
+    public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterPipelineElement pipelineSink) {
         this.pipelineElements = pipelineElements;
         this.pipelineSink = pipelineSink;
     }
 
+    @Override
     public void process(Map<String, Object> event) {
 
         // TODO remove, just for performance tests
@@ -44,7 +48,7 @@ public class AdapterPipeline {
         }
 
 
-        for (AdapterPipelineElement pipelineElement : pipelineElements) {
+        for (IAdapterPipelineElement pipelineElement : pipelineElements) {
             event = pipelineElement.process(event);
         }
         if (pipelineSink != null) {
@@ -53,19 +57,23 @@ public class AdapterPipeline {
 
     }
 
-    public List<AdapterPipelineElement> getPipelineElements() {
+    @Override
+    public List<IAdapterPipelineElement> getPipelineElements() {
         return pipelineElements;
     }
 
-    public void setPipelineElements(List<AdapterPipelineElement> pipelineElements) {
+    @Override
+    public void setPipelineElements(List<IAdapterPipelineElement> pipelineElements) {
         this.pipelineElements = pipelineElements;
     }
 
-    public void changePipelineSink(AdapterPipelineElement pipelineSink) {
+    @Override
+    public void changePipelineSink(IAdapterPipelineElement pipelineSink) {
         this.pipelineSink = pipelineSink;
     }
 
-    public AdapterPipelineElement getPipelineSink() {
+    @Override
+    public IAdapterPipelineElement getPipelineSink() {
         return pipelineSink;
     }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/AddTimestampPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/AddTimestampPipelineElement.java
index 95b921d..8241906 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/AddTimestampPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/AddTimestampPipelineElement.java
@@ -18,11 +18,11 @@
 
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 
 import java.util.Map;
 
-public class AddTimestampPipelineElement implements AdapterPipelineElement {
+public class AddTimestampPipelineElement implements IAdapterPipelineElement {
 
     private String runtimeKey;
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/AddValuePipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/AddValuePipelineElement.java
index 26dc9fa..e2f66ee 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/AddValuePipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/AddValuePipelineElement.java
@@ -18,11 +18,11 @@
 
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 
 import java.util.Map;
 
-public class AddValuePipelineElement implements AdapterPipelineElement {
+public class AddValuePipelineElement implements IAdapterPipelineElement {
 
     private String runtimeKey;
     private String value;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java
index 7691e8b..ce223f2 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -28,7 +28,7 @@ import java.util.Map;
  *  If the same event is sent multiple times the timer is always reseted to cover polling of rest endpoints
  *  User can configure how long events are stored in cache, it should be minimum 2x the polling intervall
  */
-public class DuplicateFilterPipelineElement implements AdapterPipelineElement {
+public class DuplicateFilterPipelineElement implements IAdapterPipelineElement {
 
     /**
      * Lifetime of events
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
index 4333111..9c3b41c 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
@@ -18,7 +18,7 @@
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.connect.adapter.util.TransportFormatSelector;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.messaging.EventProducer;
@@ -26,11 +26,10 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.grounding.TransportFormat;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 
-import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 
-public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> implements AdapterPipelineElement {
+public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> implements IAdapterPipelineElement {
 
   protected AdapterDescription adapterDescription;
   protected SpDataFormatDefinition dataFormatDefinition;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
index b6fcf5f..4993efb 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
@@ -18,13 +18,13 @@
 
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.connect.adapter.preprocessing.Util;
 
 import java.util.List;
 import java.util.Map;
 
-public class SendToBrokerReplayAdapterSink implements AdapterPipelineElement {
+public class SendToBrokerReplayAdapterSink implements IAdapterPipelineElement {
 
     private final SendToBrokerAdapterSink sendToBrokerAdapterSink;
     private long lastEventTimestamp;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
index 8b57211..e3cc47c 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
@@ -17,13 +17,13 @@
  */
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 
 public class SendToJmsAdapterSink extends SendToBrokerAdapterSink<JmsTransportProtocol>
-        implements AdapterPipelineElement {
+        implements IAdapterPipelineElement {
 
     public SendToJmsAdapterSink(AdapterDescription adapterDescription) {
         super(adapterDescription, ActiveMQPublisher::new, JmsTransportProtocol.class);
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
index 350b124..24f607b 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
@@ -17,13 +17,13 @@
  */
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
 public class SendToKafkaAdapterSink extends SendToBrokerAdapterSink<KafkaTransportProtocol>
-        implements AdapterPipelineElement  {
+        implements IAdapterPipelineElement {
 
     public SendToKafkaAdapterSink(AdapterDescription adapterDescription) {
         super(adapterDescription, SpKafkaProducer::new, KafkaTransportProtocol.class);
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
index d1e9048..a5cf942 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
@@ -17,13 +17,13 @@
  */
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.messaging.mqtt.MqttPublisher;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 
 public class SendToMqttAdapterSink extends SendToBrokerAdapterSink<MqttTransportProtocol>
-        implements AdapterPipelineElement {
+        implements IAdapterPipelineElement {
 
     public SendToMqttAdapterSink(AdapterDescription adapterDescription) {
         super(adapterDescription, MqttPublisher::new, MqttTransportProtocol.class);
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformSchemaAdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformSchemaAdapterPipelineElement.java
index cd860d0..735933f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformSchemaAdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformSchemaAdapterPipelineElement.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.connect.adapter.preprocessing.Util;
 import org.apache.streampipes.connect.adapter.preprocessing.transform.*;
 import org.apache.streampipes.connect.adapter.preprocessing.transform.schema.*;
@@ -31,7 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class TransformSchemaAdapterPipelineElement implements AdapterPipelineElement {
+public class TransformSchemaAdapterPipelineElement implements IAdapterPipelineElement {
 
     private SchemaEventTransformer eventTransformer;
     Logger logger = LoggerFactory.getLogger(TransformSchemaAdapterPipelineElement.class);
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformStreamAdapterElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformStreamAdapterElement.java
index 499c3e0..aaacb6a 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformStreamAdapterElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformStreamAdapterElement.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.connect.adapter.preprocessing.transform.TransformationRule;
 import org.apache.streampipes.connect.adapter.preprocessing.transform.stream.EventRateTransformationRule;
 import org.apache.streampipes.connect.adapter.preprocessing.transform.stream.StreamEventTransformer;
@@ -32,7 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class TransformStreamAdapterElement implements AdapterPipelineElement {
+public class TransformStreamAdapterElement implements IAdapterPipelineElement {
 
     private StreamEventTransformer eventTransformer;
     Logger logger = LoggerFactory.getLogger(TransformStreamAdapterElement.class);
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
index 6b4d6a6..137966c 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.connect.adapter.preprocessing.elements;
 import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.connect.adapter.preprocessing.Util;
 import org.apache.streampipes.connect.adapter.preprocessing.transform.value.*;
 import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
@@ -33,7 +33,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class TransformValueAdapterPipelineElement implements AdapterPipelineElement {
+public class TransformValueAdapterPipelineElement implements IAdapterPipelineElement {
 
     private ValueEventTransformer eventTransformer;
     private Logger logger = LoggerFactory.getLogger(TransformValueAdapterPipelineElement.class);
diff --git a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonParserTest.java b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonParserTest.java
index 7f3fd59..50f6f24 100644
--- a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonParserTest.java
+++ b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/arraykey/JsonParserTest.java
@@ -24,7 +24,7 @@ import static org.apache.streampipes.connect.adapter.TestUtils.*;
 import com.google.gson.JsonObject;
 import org.apache.commons.io.IOUtils;
 import org.junit.Test;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -98,4 +98,4 @@ public class JsonParserTest {
 
     return null;
   }
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayParserTest.java b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayParserTest.java
index 4e80dbe..6c6498c 100644
--- a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayParserTest.java
+++ b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/arraynokey/JsonArrayParserTest.java
@@ -25,7 +25,7 @@ import static org.apache.streampipes.connect.adapter.TestUtils.makeJsonObject;
 import com.google.gson.JsonArray;
 import org.apache.commons.io.IOUtils;
 import org.junit.Test;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -97,4 +97,4 @@ public class JsonArrayParserTest {
     return null;
   }
 
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/geojson/GeoJsonTest.java b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/geojson/GeoJsonTest.java
index 20cbdc9..8eeaf8b 100644
--- a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/geojson/GeoJsonTest.java
+++ b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/geojson/GeoJsonTest.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.connect.adapter.format.json.geojson;
 import org.apache.commons.io.IOUtils;
 import org.junit.Test;
 import org.apache.streampipes.connect.adapter.format.geojson.GeoJsonParser;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.schema.EventSchema;
 
 import java.io.IOException;
diff --git a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParserTest.java b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParserTest.java
index 0e3b632..a322a87 100644
--- a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParserTest.java
+++ b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParserTest.java
@@ -24,7 +24,7 @@ import static org.apache.streampipes.connect.adapter.TestUtils.makeJsonObject;
 import com.google.gson.JsonObject;
 import org.apache.commons.io.IOUtils;
 import org.junit.Test;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -73,4 +73,4 @@ public class JsonObjectParserTest {
 
         return null;
     }
-}
\ No newline at end of file
+}
diff --git a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/xml/XmlTest.java b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/xml/XmlTest.java
index 80ffee0..5127145 100644
--- a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/xml/XmlTest.java
+++ b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/format/json/xml/XmlTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.commons.io.IOUtils;
 import org.junit.Test;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.format.xml.XmlParser;
 import org.apache.streampipes.model.schema.EventSchema;
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/rest/BaseResourceConfig.java
similarity index 68%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
rename to streampipes-container-base/src/main/java/org/apache/streampipes/container/base/rest/BaseResourceConfig.java
index 52bbd9f..bcc4847 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/rest/BaseResourceConfig.java
@@ -15,13 +15,19 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.container.base.rest;
 
-package org.apache.streampipes.connect.adapter.model.pipeline;
+import org.glassfish.jersey.server.ResourceConfig;
 
-import java.util.Map;
+import java.util.List;
 
-public interface AdapterPipelineElement {
+public abstract class BaseResourceConfig extends ResourceConfig {
 
-    Map<String, Object> process(Map<String, Object> event);
+  public BaseResourceConfig() {
+    getClassesToRegister()
+            .forEach(set -> set.forEach(this::register));
+  }
+
+  public abstract List<List<Class<?>>> getClassesToRegister();
 
 }
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 15f7eb7..45491e1 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -17,10 +17,9 @@
  */
 package org.apache.streampipes.container.extensions;
 
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.model.generic.Protocol;
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
-import org.apache.streampipes.connect.container.worker.init.AdapterDeclarerSingleton;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.init.ModelSubmitter;
 import org.apache.streampipes.container.init.RunningInstances;
@@ -106,13 +105,13 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<Extensions
 
     private ConnectWorkerContainer getContainerDescription(String endpointUrl) {
         List<AdapterDescription> adapters = new ArrayList<>();
-        for (Adapter a : AdapterDeclarerSingleton.getInstance().getAllAdapters()) {
+        for (IAdapter<?> a : DeclarersSingleton.getInstance().getAllAdapters()) {
             AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel(), endpointUrl);
             adapters.add(desc);
         }
 
         List<ProtocolDescription> protocols = new ArrayList<>();
-        for (Protocol p : AdapterDeclarerSingleton.getInstance().getAllProtocols()) {
+        for (IProtocol p : DeclarersSingleton.getInstance().getAllProtocols()) {
             ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel(), endpointUrl);
             protocols.add(desc);
         }
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
index cd27658..9a91132 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
@@ -17,32 +17,23 @@
  */
 package org.apache.streampipes.container.extensions;
 
-import org.apache.streampipes.connect.container.worker.rest.*;
-import org.apache.streampipes.container.api.*;
-import org.apache.streampipes.rest.shared.serializer.*;
-import org.glassfish.jersey.media.multipart.MultiPartFeature;
-import org.glassfish.jersey.server.ResourceConfig;
+import org.apache.streampipes.connect.container.worker.init.AdapterServiceResourceProvider;
+import org.apache.streampipes.container.base.rest.BaseResourceConfig;
+import org.apache.streampipes.container.init.BaseExtensionsServiceResourceProvider;
+import org.apache.streampipes.container.init.PipelineElementServiceResourceProvider;
 import org.springframework.stereotype.Component;
 
+import java.util.Arrays;
+import java.util.List;
+
 @Component
-public class ExtensionsResourceConfig extends ResourceConfig {
-    public ExtensionsResourceConfig() {
-        register(DataSinkPipelineElementResource.class);
-        register(DataProcessorPipelineElementResource.class);
-        register(DataStreamPipelineElementResource.class);
-        register(WelcomePage.class);
-        register(PipelineTemplateResource.class);
+public class ExtensionsResourceConfig extends BaseResourceConfig {
 
-        //register(WelcomePageWorker.class);
-        register(GuessResource.class);
-        register(RuntimeResolvableResource.class);
-        register(WorkerResource.class);
-        register(MultiPartFeature.class);
-        register(AdapterResource.class);
-        register(ProtocolResource.class);
-        register(GsonWithIdProvider.class);
-        register(GsonWithoutIdProvider.class);
-        register(GsonClientModelProvider.class);
-        register(JacksonSerializationProvider.class);
-    }
+  @Override
+  public List<List<Class<?>>> getClassesToRegister() {
+    return Arrays.asList(
+            new BaseExtensionsServiceResourceProvider().getResourceClasses(),
+            new AdapterServiceResourceProvider().getResourceClasses(),
+            new PipelineElementServiceResourceProvider().getResourceClasses());
+  }
 }
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
index 1ef0379..96023ff 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
@@ -17,21 +17,22 @@
  */
 package org.apache.streampipes.container.standalone.init;
 
-import org.apache.streampipes.container.api.*;
-import org.apache.streampipes.rest.shared.serializer.JacksonSerializationProvider;
-import org.glassfish.jersey.server.ResourceConfig;
+import org.apache.streampipes.container.base.rest.BaseResourceConfig;
+import org.apache.streampipes.container.init.BaseExtensionsServiceResourceProvider;
+import org.apache.streampipes.container.init.PipelineElementServiceResourceProvider;
 import org.springframework.stereotype.Component;
 
-@Component
-public class PipelineElementContainerResourceConfig extends ResourceConfig {
+import java.util.Arrays;
+import java.util.List;
 
-  public PipelineElementContainerResourceConfig() {
-    register(DataSinkPipelineElementResource.class);
-    register(DataProcessorPipelineElementResource.class);
-    register(DataStreamPipelineElementResource.class);
-    register(WelcomePage.class);
-    register(PipelineTemplateResource.class);
+@Component
+public class PipelineElementContainerResourceConfig extends BaseResourceConfig {
 
-    register(JacksonSerializationProvider.class);
+  @Override
+  public List<List<Class<?>>> getClassesToRegister() {
+    return Arrays.asList(
+            new BaseExtensionsServiceResourceProvider().getResourceClasses(),
+            new PipelineElementServiceResourceProvider().getResourceClasses()
+    );
   }
 }
diff --git a/streampipes-container/pom.xml b/streampipes-container/pom.xml
index 42526cd..e021554 100644
--- a/streampipes-container/pom.xml
+++ b/streampipes-container/pom.xml
@@ -32,6 +32,11 @@
 		<!-- StreamPipes dependencies -->
 		<dependency>
 			<groupId>org.apache.streampipes</groupId>
+			<artifactId>streampipes-connect-api</artifactId>
+			<version>0.68.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.streampipes</groupId>
 			<artifactId>streampipes-dataformat</artifactId>
 			<version>0.68.0-SNAPSHOT</version>
 		</dependency>
@@ -60,6 +65,11 @@
 			<artifactId>streampipes-rest-shared</artifactId>
 			<version>0.68.0-SNAPSHOT</version>
 		</dependency>
+<!--		<dependency>-->
+<!--			<groupId>org.apache.streampipes</groupId>-->
+<!--			<artifactId>streampipes-connect-container-worker</artifactId>-->
+<!--			<version>0.68.0-SNAPSHOT</version>-->
+<!--		</dependency>-->
 
 
 		<!-- External dependencies -->
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/AdapterException.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/BaseExtensionsServiceResourceProvider.java
similarity index 66%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/AdapterException.java
rename to streampipes-container/src/main/java/org/apache/streampipes/container/init/BaseExtensionsServiceResourceProvider.java
index 63f3ac3..16ffe67 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/AdapterException.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/BaseExtensionsServiceResourceProvider.java
@@ -15,18 +15,17 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.container.init;
 
-package org.apache.streampipes.connect.adapter.exception;
+import org.apache.streampipes.rest.shared.serializer.JacksonSerializationProvider;
 
-public class AdapterException extends Exception {
-    public AdapterException() {}
+import java.util.Collections;
+import java.util.List;
 
-    public AdapterException(String message)
-    {
-        super(message);
-    }
+public class BaseExtensionsServiceResourceProvider implements ExtensionsResourceProvider {
 
-    public AdapterException(String message, Throwable cause) {
-        super(message, cause);
-    }
+  @Override
+  public List<Class<?>> getResourceClasses() {
+    return Collections.singletonList(JacksonSerializationProvider.class);
+  }
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
index 1b908e8..1ec7d1c 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
@@ -18,6 +18,9 @@
 
 package org.apache.streampipes.container.init;
 
+import org.apache.streampipes.connect.api.Connector;
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.container.declarer.*;
 import org.apache.streampipes.container.model.SpServiceDefinition;
 import org.apache.streampipes.dataformat.SpDataFormatFactory;
@@ -49,6 +52,9 @@ public class DeclarersSingleton {
   private Map<String, TransportProtocol> supportedProtocols;
   private Map<String, TransportFormat> supportedFormats;
 
+  private Map<String, IProtocol> allProtocols;
+  private Map<String, IAdapter> allAdapters;
+
   private String serviceId;
 
   private int port;
@@ -63,6 +69,8 @@ public class DeclarersSingleton {
     this.pipelineTemplateDeclarers = new HashMap<>();
     this.supportedProtocols = new HashMap<>();
     this.supportedFormats = new HashMap<>();
+    this.allProtocols = new HashMap<>();
+    this.allAdapters = new HashMap<>();
     this.route = "/";
   }
 
@@ -209,6 +217,47 @@ public class DeclarersSingleton {
             .collect(Collectors.toList());
   }
 
+  public DeclarersSingleton add(Connector c) {
+
+    if (c instanceof IProtocol) {
+      this.allProtocols.put(c.getId(), (IProtocol) c);
+    } else if (c instanceof IAdapter) {
+      this.allAdapters.put(c.getId(), (IAdapter<?>) c);
+    }
+
+    return getInstance();
+  }
+
+  public Map<String, IProtocol> getAllProtocolsMap() {
+    return this.allProtocols;
+  }
+
+  public Map<String, IAdapter> getAllAdaptersMap() {
+    return this.allAdapters;
+  }
+
+  public Collection<IProtocol> getAllProtocols() {
+    return this.allProtocols.values();
+  }
+
+  public Collection<IAdapter> getAllAdapters() {
+    return this.allAdapters.values();
+  }
+
+  public IProtocol getProtocol(String id) {
+    return getAllProtocols().stream()
+            .filter(protocol -> protocol.getId().equals(id))
+            .findAny()
+            .orElse(null);
+  }
+
+  public IAdapter getAdapter(String id) {
+    return getAllAdapters().stream()
+            .filter(adapter -> adapter.getId().equals(id))
+            .findAny()
+            .orElse(null);
+  }
+
   public void setPort(int port) {
     this.port = port;
   }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/EmitBinaryEvent.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/ExtensionsResourceProvider.java
similarity index 83%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/EmitBinaryEvent.java
rename to streampipes-container/src/main/java/org/apache/streampipes/container/init/ExtensionsResourceProvider.java
index da76c6c..696e709 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/EmitBinaryEvent.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/ExtensionsResourceProvider.java
@@ -15,9 +15,12 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.container.init;
 
-package org.apache.streampipes.connect;
+import java.util.List;
+
+public interface ExtensionsResourceProvider {
+
+  List<Class<?>> getResourceClasses();
 
-public interface EmitBinaryEvent {
-    public Boolean emit(byte[] event);
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/PipelineElementServiceResourceProvider.java
similarity index 59%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
copy to streampipes-container/src/main/java/org/apache/streampipes/container/init/PipelineElementServiceResourceProvider.java
index b20ce52..f6572ab 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/GetNEvents.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/PipelineElementServiceResourceProvider.java
@@ -15,37 +15,21 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.container.init;
 
-package org.apache.streampipes.connect;
+import org.apache.streampipes.container.api.*;
 
-
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-public class GetNEvents implements EmitBinaryEvent {
-
-    private int n;
-    private List<byte[]> events;
-
-    public GetNEvents(int n) {
-        this.n = n;
-        this.events = new ArrayList<>();
-    }
-
-    @Override
-    public Boolean emit(byte[] event) {
-
-        events.add(event);
-        this.n = n - 1;
-
-        if (n == 0) {
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-    public List<byte[]> getEvents() {
-        return events;
-    }
+public class PipelineElementServiceResourceProvider implements ExtensionsResourceProvider {
+  @Override
+  public List<Class<?>> getResourceClasses() {
+    return Arrays.asList(
+            DataSinkPipelineElementResource.class,
+            DataProcessorPipelineElementResource.class,
+            DataStreamPipelineElementResource.class,
+            WelcomePage.class,
+            PipelineTemplateResource.class);
+  }
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
index 35fffe8..d9e108b 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
@@ -17,12 +17,16 @@
  */
 package org.apache.streampipes.container.model;
 
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.container.declarer.Declarer;
 import org.apache.streampipes.dataformat.SpDataFormatFactory;
 import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class SpServiceDefinition {
 
@@ -35,10 +39,15 @@ public class SpServiceDefinition {
   private List<SpDataFormatFactory> dataFormatFactories;
   private List<SpProtocolDefinitionFactory<?>> protocolDefinitionFactories;
 
+  private Map<String, IProtocol> adapterProtocols;
+  private Map<String, IAdapter<?>> specificAdapters;
+
   public SpServiceDefinition() {
     this.declarers = new ArrayList<>();
     this.dataFormatFactories = new ArrayList<>();
     this.protocolDefinitionFactories = new ArrayList<>();
+    this.adapterProtocols = new HashMap<>();
+    this.specificAdapters = new HashMap<>();
   }
 
   public String getServiceId() {
@@ -97,6 +106,14 @@ public class SpServiceDefinition {
     this.dataFormatFactories.addAll(factories);
   }
 
+  public void addAdapterProtocol(IProtocol protocol) {
+    this.adapterProtocols.put(protocol.getId(), protocol);
+  }
+
+  public void addSpecificAdapter(IAdapter<?> adapter) {
+    this.specificAdapters.put(adapter.getId(), adapter);
+  }
+
   public List<SpDataFormatFactory> getDataFormatFactories() {
     return dataFormatFactories;
   }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
index de8eb90..204d88a 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
@@ -19,6 +19,9 @@ package org.apache.streampipes.container.model;
 
 import org.apache.streampipes.config.SpConfig;
 import org.apache.streampipes.config.consul.ConsulSpConfig;
+import org.apache.streampipes.connect.api.Connector;
+import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.container.declarer.Declarer;
 import org.apache.streampipes.dataformat.SpDataFormatFactory;
 import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
@@ -79,15 +82,19 @@ public class SpServiceDefinitionBuilder {
     return this;
   }
 
-//  public SpServiceDefinitionBuilder registerAdapter(Connector protocolOrAdapter) {
-//
-//    return this;
-//  }
-//
-//  public SpServiceDefinitionBuilder registerAdapters(Connector... protocolOrAdapter) {
-//
-//    return this;
-//  }
+  public SpServiceDefinitionBuilder registerAdapter(Connector protocolOrAdapter) {
+    if (protocolOrAdapter instanceof IProtocol) {
+      this.serviceDefinition.addAdapterProtocol((IProtocol) protocolOrAdapter);
+    } else if (protocolOrAdapter instanceof IAdapter<?>) {
+      this.serviceDefinition.addSpecificAdapter((IAdapter<?>) protocolOrAdapter);
+    }
+    return this;
+  }
+
+  public SpServiceDefinitionBuilder registerAdapters(Connector... protocolOrAdapter) {
+    Arrays.asList(protocolOrAdapter).forEach(this::registerAdapter);
+    return this;
+  }
 
   public SpServiceDefinitionBuilder registerMessagingFormat(SpDataFormatFactory dataFormatDefinition) {
     this.serviceDefinition.addDataFormatFactory(dataFormatDefinition);