You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/20 09:47:12 UTC

[incubator-streampipes] 02/02: ›[STREAMPIPES-319] Add tag registration and service deregistration to all services

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit b7413302e3eaba6f447ef23bc57475634c0d1542
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Jun 20 11:46:47 2021 +0200

    ›[STREAMPIPES-319] Add tag registration and service deregistration to all services
---
 .../backend/StreamPipesBackendApplication.java     | 28 +++++++++++++-------
 .../worker/init/AdapterWorkerContainer.java        |  6 +++--
 .../worker/init/ConnectWorkerTagProvider.java      |  8 ++++--
 .../container/base/StreamPipesServiceBase.java     | 20 ++++++++++-----
 .../extensions/ExtensionsModelSubmitter.java       |  7 +++--
 .../init/PipelineElementServiceTagProvider.java    | 10 +++++---
 .../standalone/init/StandaloneModelSubmitter.java  |  4 ++-
 .../container/model/SpServiceDefinition.java       | 15 ++++++++---
 .../model/SpServiceDefinitionBuilder.java          | 10 ++++----
 .../container/util/ServiceDefinitionUtil.java      | 30 +++++++++++++++++-----
 .../svcdiscovery/api/ISpServiceDiscovery.java      | 23 +++--------------
 .../api/model/DefaultSpServiceGroups.java          |  6 ++---
 .../api/model/DefaultSpServiceTags.java            |  4 +--
 .../svcdiscovery/api/model/SpServiceTag.java       | 27 ++++++++++++++-----
 .../svcdiscovery/api/model/SpServiceTagPrefix.java | 21 ++++++++++++---
 .../consul/SpConsulServiceDiscovery.java           | 21 ++++++++-------
 .../base/StreamPipesExtensionsServiceBase.java     |  3 +--
 17 files changed, 153 insertions(+), 90 deletions(-)

diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index e8af7a7..43ad4a8 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -28,7 +28,10 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.rest.notifications.NotificationListener;
 import org.apache.streampipes.storage.api.IPipelineStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.svcdiscovery.SpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -42,10 +45,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.servlet.ServletContextListener;
 import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -73,8 +73,8 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
       StreamPipesBackendApplication application = new StreamPipesBackendApplication();
       try {
         application.startStreamPipesService(StreamPipesBackendApplication.class,
-                "core",
-                "core",
+                DefaultSpServiceGroups.CORE,
+                AUTO_GENERATED_SERVICE_ID,
                 8030);
       } catch (UnknownHostException e) {
         LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
@@ -126,6 +126,8 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
       }
     });
 
+    deregisterService(AUTO_GENERATED_SERVICE_ID);
+
     LOG.info("Thanks for using Apache StreamPipes - see you next time!");
   }
 
@@ -233,7 +235,15 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
   }
 
   @Override
-  protected List<String> getServiceTags() {
-    return Arrays.asList(SpServiceTags.CORE, SpServiceTags.CONNECT_MASTER);
+  protected List<SpServiceTag> getServiceTags() {
+    return Arrays.asList(
+            createSysTag(DefaultSpServiceTags.CORE),
+            createSysTag(DefaultSpServiceTags.CONNECT_MASTER),
+            createSysTag(DefaultSpServiceTags.STREAMPIPES_CLIENT)
+    );
+  }
+
+  private SpServiceTag createSysTag(String value) {
+    return SpServiceTag.create(SpServiceTagPrefix.SYSTEM, value);
   }
 }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
index 0829deb..b6b3f6a 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
@@ -18,8 +18,10 @@
 
 package org.apache.streampipes.connect.container.worker.init;
 
+import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.model.SpServiceDefinition;
 import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -41,7 +43,7 @@ public abstract class AdapterWorkerContainer extends StreamPipesExtensionsServic
   }
 
   @Override
-  protected List<String> getServiceTags() {
+  protected List<SpServiceTag> getServiceTags() {
     return new ConnectWorkerTagProvider().extractServiceTags();
   }
 
@@ -52,6 +54,6 @@ public abstract class AdapterWorkerContainer extends StreamPipesExtensionsServic
 
   @Override
   public void onExit() {
-
+    deregisterService(DeclarersSingleton.getInstance().getServiceId());
   }
 }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
index 0f6f81b..d261c89 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
@@ -21,6 +21,9 @@ import org.apache.streampipes.connect.api.IAdapter;
 import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.util.ServiceDefinitionUtil;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -28,12 +31,13 @@ import java.util.List;
 
 public class ConnectWorkerTagProvider {
 
-  public List<String> extractServiceTags() {
-    List<String> tags = new ArrayList<>();
+  public List<SpServiceTag> extractServiceTags() {
+    List<SpServiceTag> tags = new ArrayList<>();
     Collection<IAdapter> adapters = DeclarersSingleton.getInstance().getAllAdapters();
     Collection<IProtocol> protocols = DeclarersSingleton.getInstance().getAllProtocols();
     tags.addAll(ServiceDefinitionUtil.extractAppIdsFromAdapters(adapters));
     tags.addAll(ServiceDefinitionUtil.extractAppIdsFromProtocols(protocols));
+    tags.add(SpServiceTag.create(SpServiceTagPrefix.SYSTEM, DefaultSpServiceTags.CONNECT_WORKER));
 
     return tags;
   }
diff --git a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
index d51a1ee..e915775 100644
--- a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
+++ b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.container.base;
 
 import org.apache.streampipes.commons.networking.Networking;
 import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
@@ -26,17 +27,20 @@ import org.springframework.boot.SpringApplication;
 import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 
 public abstract class StreamPipesServiceBase {
 
   private static final Logger LOG = LoggerFactory.getLogger(StreamPipesServiceBase.class);
 
+  protected static final String AUTO_GENERATED_SERVICE_ID = UUID.randomUUID().toString();
+
   protected void startStreamPipesService(Class<?> serviceClass,
                                          String serviceGroup,
-                                         String serviceName,
+                                         String serviceId,
                                          Integer defaultPort) throws UnknownHostException {
-      registerService(serviceGroup, serviceName, defaultPort);
-      runApplication(serviceClass, defaultPort);
+    registerService(serviceGroup, serviceId, defaultPort);
+    runApplication(serviceClass, defaultPort);
   }
 
   private void runApplication(Class<?> serviceClass,
@@ -47,12 +51,12 @@ public abstract class StreamPipesServiceBase {
   }
 
   private void registerService(String serviceGroup,
-                               String serviceName,
+                               String serviceId,
                                Integer defaultPort) throws UnknownHostException {
     SpServiceDiscovery
             .getServiceDiscovery()
             .registerService(serviceGroup,
-                    serviceName,
+                    serviceId,
                     getHostname(),
                     getPort(defaultPort),
                     getServiceTags());
@@ -66,7 +70,11 @@ public abstract class StreamPipesServiceBase {
     return Networking.getPort(defaultPort);
   }
 
-  protected abstract List<String> getServiceTags();
+  protected abstract List<SpServiceTag> getServiceTags();
 
+  protected void deregisterService(String serviceId) {
+    LOG.info("Deregistering service (id={})...", serviceId);
+    SpServiceDiscovery.getServiceDiscovery().deregisterService(serviceId);
+  }
 
 }
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 33182ef..ac9f572 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -19,10 +19,12 @@ package org.apache.streampipes.container.extensions;
 
 import org.apache.streampipes.connect.container.worker.init.ConnectWorkerRegistrationService;
 import org.apache.streampipes.connect.container.worker.init.ConnectWorkerTagProvider;
+import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.model.SpServiceDefinition;
 import org.apache.streampipes.container.standalone.init.PipelineElementServiceShutdownHandler;
 import org.apache.streampipes.container.standalone.init.PipelineElementServiceTagProvider;
 import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -43,6 +45,7 @@ public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServ
     @PreDestroy
     public void onExit() {
         new PipelineElementServiceShutdownHandler().onShutdown();
+        deregisterService(DeclarersSingleton.getInstance().getServiceId());
     }
 
     @Override
@@ -51,8 +54,8 @@ public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServ
     }
 
     @Override
-    protected List<String> getServiceTags() {
-        List<String> serviceTags = new PipelineElementServiceTagProvider().extractServiceTags();
+    protected List<SpServiceTag> getServiceTags() {
+        List<SpServiceTag> serviceTags = new PipelineElementServiceTagProvider().extractServiceTags();
         serviceTags.addAll(new ConnectWorkerTagProvider().extractServiceTags());
 
         return serviceTags;
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
index 9e78bcc..44ff9ba 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
@@ -20,17 +20,19 @@ package org.apache.streampipes.container.standalone.init;
 import org.apache.streampipes.container.declarer.Declarer;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.util.ServiceDefinitionUtil;
-import org.apache.streampipes.svcdiscovery.SpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 
 import java.util.Collection;
 import java.util.List;
 
 public class PipelineElementServiceTagProvider {
 
-  public List<String> extractServiceTags() {
+  public List<SpServiceTag> extractServiceTags() {
     Collection<Declarer<?>> declarers = DeclarersSingleton.getInstance().getDeclarers().values();
-    List<String> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
-    serviceTags.add(SpServiceTags.PE);
+    List<SpServiceTag> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
+    serviceTags.add(SpServiceTag.create(SpServiceTagPrefix.SYSTEM, DefaultSpServiceTags.PE));
 
     return serviceTags;
   }
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 3fa7d54..547aef9 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.model.PeConfig;
 import org.apache.streampipes.container.model.SpServiceDefinition;
 import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -59,10 +60,11 @@ public abstract class StandaloneModelSubmitter extends StreamPipesExtensionsServ
     @Override
     public void onExit() {
         new PipelineElementServiceShutdownHandler().onShutdown();
+        deregisterService(DeclarersSingleton.getInstance().getServiceId());
     }
 
     @Override
-    protected List<String> getServiceTags() {
+    protected List<SpServiceTag> getServiceTags() {
         return new PipelineElementServiceTagProvider().extractServiceTags();
     }
 
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
index 6bbf0ea..b45d4d7 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
@@ -23,13 +23,11 @@ import org.apache.streampipes.container.declarer.Declarer;
 import org.apache.streampipes.dataformat.SpDataFormatFactory;
 import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class SpServiceDefinition {
 
+  private String serviceGroup;
   private String serviceId;
   private String serviceName;
   private String serviceDescription;
@@ -43,6 +41,7 @@ public class SpServiceDefinition {
   private Map<String, IAdapter> specificAdapters;
 
   public SpServiceDefinition() {
+    this.serviceId = UUID.randomUUID().toString();
     this.declarers = new ArrayList<>();
     this.dataFormatFactories = new ArrayList<>();
     this.protocolDefinitionFactories = new ArrayList<>();
@@ -50,6 +49,14 @@ public class SpServiceDefinition {
     this.specificAdapters = new HashMap<>();
   }
 
+  public String getServiceGroup() {
+    return serviceGroup;
+  }
+
+  public void setServiceGroup(String serviceGroup) {
+    this.serviceGroup = serviceGroup;
+  }
+
   public String getServiceId() {
     return serviceId;
   }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
index 8528fbc..eff9d86 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
@@ -33,23 +33,23 @@ public class SpServiceDefinitionBuilder {
   private SpServiceDefinition serviceDefinition;
   private SpConfig config;
 
-  public static SpServiceDefinitionBuilder create(String serviceId,
+  public static SpServiceDefinitionBuilder create(String serviceGroup,
                                                   String serviceName,
                                                   String serviceDescription,
                                                   Integer defaultPort) {
-    return new SpServiceDefinitionBuilder(serviceId, serviceName, serviceDescription, defaultPort);
+    return new SpServiceDefinitionBuilder(serviceGroup, serviceName, serviceDescription, defaultPort);
   }
 
-  private SpServiceDefinitionBuilder(String serviceId,
+  private SpServiceDefinitionBuilder(String serviceGroup,
                                      String serviceName,
                                      String serviceDescription,
                                      Integer defaultPort) {
     this.serviceDefinition = new SpServiceDefinition();
-    this.serviceDefinition.setServiceId(serviceId);
+    this.serviceDefinition.setServiceGroup(serviceGroup);
     this.serviceDefinition.setServiceName(serviceName);
     this.serviceDefinition.setServiceDescription(serviceDescription);
     this.serviceDefinition.setDefaultPort(defaultPort);
-    this.config = new ConsulSpConfig(serviceId);
+    this.config = new ConsulSpConfig(serviceGroup);
   }
 
   public SpServiceDefinitionBuilder withHostname(String hostname) {
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
index 9729283..6851381 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
@@ -20,6 +20,12 @@ package org.apache.streampipes.container.util;
 import org.apache.streampipes.connect.api.IAdapter;
 import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.container.declarer.Declarer;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 
 import java.util.Collection;
 import java.util.List;
@@ -27,24 +33,36 @@ import java.util.stream.Collectors;
 
 public class ServiceDefinitionUtil {
 
-  public static List<String> extractAppIds(Collection<Declarer<?>> declarers) {
+  public static List<SpServiceTag> extractAppIds(Collection<Declarer<?>> declarers) {
     return declarers
             .stream()
-            .map(d -> d.declareModel().getAppId())
+            .map(d -> SpServiceTag.create(getPrefix(d.declareModel()), d.declareModel().getAppId()))
             .collect(Collectors.toList());
   }
 
-  public static List<String> extractAppIdsFromAdapters(Collection<IAdapter> adapters) {
+  private static SpServiceTagPrefix getPrefix(NamedStreamPipesEntity entity) {
+    if (entity instanceof SpDataStream) {
+      return SpServiceTagPrefix.DATA_STREAM;
+    } else if (entity instanceof DataProcessorDescription) {
+      return SpServiceTagPrefix.DATA_PROCESSOR;
+    } else if (entity instanceof DataSinkDescription) {
+      return SpServiceTagPrefix.DATA_SINK;
+    } else {
+      throw new RuntimeException("Could not find service tag for entity " + entity.getClass().getSimpleName());
+    }
+  }
+
+  public static List<SpServiceTag> extractAppIdsFromAdapters(Collection<IAdapter> adapters) {
     return adapters
             .stream()
-            .map(d -> d.declareModel().getAppId())
+            .map(d -> SpServiceTag.create(SpServiceTagPrefix.ADAPTER, d.declareModel().getAppId()))
             .collect(Collectors.toList());
   }
 
-  public static List<String> extractAppIdsFromProtocols(Collection<IProtocol> protocols) {
+  public static List<SpServiceTag> extractAppIdsFromProtocols(Collection<IProtocol> protocols) {
     return protocols
             .stream()
-            .map(p -> p.declareModel().getAppId())
+            .map(p -> SpServiceTag.create(SpServiceTagPrefix.PROTOCOL, p.declareModel().getAppId()))
             .collect(Collectors.toList());
   }
 }
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
index d455e60..2d6630e 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.svcdiscovery.api;
 
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+
 import java.util.List;
 import java.util.Map;
 
@@ -31,26 +33,7 @@ public interface ISpServiceDiscovery {
    * @param port      port of service endpoint
    * @param tags      tags of service
    */
-  void registerService(String svcGroup, String svcId, String host, int port, List<String> tags);
-
-  /**
-   * Method to register a new pipeline element service endpoint.
-   *
-   * @param svcId unique service id
-   * @param host  host address of pipeline element service endpoint
-   * @param port  port of pipeline element service endpoint
-   */
-  void registerPeService(String svcId, String host, int port);
-
-  /**
-   * Method to register a new pipeline element service endpoint.
-   *
-   * @param svcId unique service id
-   * @param host  host address of pipeline element service endpoint
-   * @param port  port of pipeline element service endpoint
-   * @param tags  tags of service
-   */
-  void registerPeService(String svcId, String host, int port, List<String> tags);
+  void registerService(String svcGroup, String svcId, String host, int port, List<SpServiceTag> tags);
 
   /**
    * Get active pipeline element service endpoints
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
similarity index 87%
copy from streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
copy to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
index c6c5b02..d263f07 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery;
+package org.apache.streampipes.svcdiscovery.api.model;
 
-public class SpServiceGroups {
+public class DefaultSpServiceGroups {
 
   public static final String CORE = "core";
-  public static final String EXTENSIONS = "ext";
+
 }
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
similarity index 92%
copy from streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java
copy to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
index 4520bd9..093df8c 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery;
+package org.apache.streampipes.svcdiscovery.api.model;
 
-public class SpServiceTags {
+public class DefaultSpServiceTags {
 
   public static final String CORE = "core";
   public static final String PE = "pe";
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
similarity index 58%
rename from streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java
rename to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
index 4520bd9..e003366 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
@@ -15,13 +15,26 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery;
+package org.apache.streampipes.svcdiscovery.api.model;
 
-public class SpServiceTags {
+public class SpServiceTag {
 
-  public static final String CORE = "core";
-  public static final String PE = "pe";
-  public static final String CONNECT_MASTER = "connect-master";
-  public static final String CONNECT_WORKER = "connect-worker";
-  public static final String STREAMPIPES_CLIENT = "streampipes-client";
+  private final SpServiceTagPrefix prefix;
+  private final String value;
+
+  private static final String COLON = ":";
+
+  public static SpServiceTag create(SpServiceTagPrefix prefix,
+                                    String value) {
+    return new SpServiceTag(prefix, value);
+  }
+
+  private SpServiceTag(SpServiceTagPrefix prefix, String value) {
+    this.prefix = prefix;
+    this.value = value;
+  }
+
+  public String asString() {
+    return prefix.asString() + COLON + value;
+  }
 }
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
similarity index 67%
rename from streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
rename to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
index c6c5b02..12dde94 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
@@ -15,10 +15,23 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery;
+package org.apache.streampipes.svcdiscovery.api.model;
 
-public class SpServiceGroups {
+public enum SpServiceTagPrefix {
+  SYSTEM("sys"),
+  ADAPTER("adapter"),
+  PROTOCOL("protocol"),
+  DATA_STREAM("dstream"),
+  DATA_PROCESSOR("dprocessor"),
+  DATA_SINK("dsink");
 
-  public static final String CORE = "core";
-  public static final String EXTENSIONS = "ext";
+  private String prefix;
+
+  SpServiceTagPrefix(String prefix) {
+    this.prefix = prefix;
+  }
+
+  public String asString() {
+    return this.prefix;
+  }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index d595bb3..105d6da 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -27,12 +27,14 @@ import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.StringEntity;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.apache.streampipes.svcdiscovery.consul.model.ConsulServiceRegistrationBody;
 import org.apache.streampipes.svcdiscovery.consul.model.HealthCheckConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 
 public class SpConsulServiceDiscovery extends AbstractConsulService implements ISpServiceDiscovery {
@@ -46,12 +48,16 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
   private static final String PE_SVC_TAG = "pe";
 
   @Override
-  public void registerService(String svcGroup, String svcId, String host, int port, List<String> tags) {
+  public void registerService(String svcGroup,
+                              String svcId,
+                              String host,
+                              int port,
+                              List<SpServiceTag> tags) {
     boolean connected = false;
 
     while (!connected) {
       LOG.info("Trying to register service at Consul with svcGroup={}, svcId={} host={}, port={}. ", svcGroup, svcId, host, port);
-      ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, tags);
+      ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, asString(tags));
       connected = registerServiceHttpClient(svcRegistration);
 
       if (!connected) {
@@ -66,15 +72,8 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
     LOG.info("Successfully registered service at Consul: " + svcId);
   }
 
-  @Override
-  public void registerPeService(String svcId, String host, int port) {
-    registerService(PE_SVC_TAG, svcId, host, port, Collections.singletonList(PE_SVC_TAG));
-  }
-
-  @Override
-  public void registerPeService(String svcId, String host, int port, List<String> tags) {
-    tags.add(PE_SVC_TAG);
-    registerService(PE_SVC_TAG, svcId, host, port, tags);
+  private List<String> asString(List<SpServiceTag> tags) {
+    return tags.stream().map(SpServiceTag::asString).collect(Collectors.toList());
   }
 
   @Override
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
index 78570b3..f9adc7e 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
@@ -21,7 +21,6 @@ package org.apache.streampipes.service.extensions.base;
 import org.apache.streampipes.container.base.StreamPipesServiceBase;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.model.SpServiceDefinition;
-import org.apache.streampipes.svcdiscovery.SpServiceGroups;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +58,7 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic
                                        SpServiceDefinition serviceDef) throws UnknownHostException {
         this.startStreamPipesService(
                 serviceClass,
-                SpServiceGroups.EXTENSIONS,
+                serviceDef.getServiceGroup(),
                 serviceDef.getServiceId(),
                 serviceDef.getDefaultPort()
         );