You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/20 09:47:12 UTC
[incubator-streampipes] 02/02: [STREAMPIPES-319] Add tag registration and service deregistration to all services
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit b7413302e3eaba6f447ef23bc57475634c0d1542
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Jun 20 11:46:47 2021 +0200
[STREAMPIPES-319] Add tag registration and service deregistration to all services
---
.../backend/StreamPipesBackendApplication.java | 28 +++++++++++++-------
.../worker/init/AdapterWorkerContainer.java | 6 +++--
.../worker/init/ConnectWorkerTagProvider.java | 8 ++++--
.../container/base/StreamPipesServiceBase.java | 20 ++++++++++-----
.../extensions/ExtensionsModelSubmitter.java | 7 +++--
.../init/PipelineElementServiceTagProvider.java | 10 +++++---
.../standalone/init/StandaloneModelSubmitter.java | 4 ++-
.../container/model/SpServiceDefinition.java | 15 ++++++++---
.../model/SpServiceDefinitionBuilder.java | 10 ++++----
.../container/util/ServiceDefinitionUtil.java | 30 +++++++++++++++++-----
.../svcdiscovery/api/ISpServiceDiscovery.java | 23 +++--------------
.../api/model/DefaultSpServiceGroups.java | 6 ++---
.../api/model/DefaultSpServiceTags.java | 4 +--
.../svcdiscovery/api/model/SpServiceTag.java | 27 ++++++++++++++-----
.../svcdiscovery/api/model/SpServiceTagPrefix.java | 21 ++++++++++++---
.../consul/SpConsulServiceDiscovery.java | 21 ++++++++-------
.../base/StreamPipesExtensionsServiceBase.java | 3 +--
17 files changed, 153 insertions(+), 90 deletions(-)
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index e8af7a7..43ad4a8 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -28,7 +28,10 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.rest.notifications.NotificationListener;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.svcdiscovery.SpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -42,10 +45,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.servlet.ServletContextListener;
import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -73,8 +73,8 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
StreamPipesBackendApplication application = new StreamPipesBackendApplication();
try {
application.startStreamPipesService(StreamPipesBackendApplication.class,
- "core",
- "core",
+ DefaultSpServiceGroups.CORE,
+ AUTO_GENERATED_SERVICE_ID,
8030);
} catch (UnknownHostException e) {
LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
@@ -126,6 +126,8 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
}
});
+ deregisterService(AUTO_GENERATED_SERVICE_ID);
+
LOG.info("Thanks for using Apache StreamPipes - see you next time!");
}
@@ -233,7 +235,15 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
}
@Override
- protected List<String> getServiceTags() {
- return Arrays.asList(SpServiceTags.CORE, SpServiceTags.CONNECT_MASTER);
+ protected List<SpServiceTag> getServiceTags() {
+ return Arrays.asList(
+ createSysTag(DefaultSpServiceTags.CORE),
+ createSysTag(DefaultSpServiceTags.CONNECT_MASTER),
+ createSysTag(DefaultSpServiceTags.STREAMPIPES_CLIENT)
+ );
+ }
+
+ private SpServiceTag createSysTag(String value) {
+ return SpServiceTag.create(SpServiceTagPrefix.SYSTEM, value);
}
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
index 0829deb..b6b3f6a 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
@@ -18,8 +18,10 @@
package org.apache.streampipes.connect.container.worker.init;
+import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.model.SpServiceDefinition;
import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -41,7 +43,7 @@ public abstract class AdapterWorkerContainer extends StreamPipesExtensionsServic
}
@Override
- protected List<String> getServiceTags() {
+ protected List<SpServiceTag> getServiceTags() {
return new ConnectWorkerTagProvider().extractServiceTags();
}
@@ -52,6 +54,6 @@ public abstract class AdapterWorkerContainer extends StreamPipesExtensionsServic
@Override
public void onExit() {
-
+ deregisterService(DeclarersSingleton.getInstance().getServiceId());
}
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
index 0f6f81b..d261c89 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
@@ -21,6 +21,9 @@ import org.apache.streampipes.connect.api.IAdapter;
import org.apache.streampipes.connect.api.IProtocol;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.util.ServiceDefinitionUtil;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
import java.util.ArrayList;
import java.util.Collection;
@@ -28,12 +31,13 @@ import java.util.List;
public class ConnectWorkerTagProvider {
- public List<String> extractServiceTags() {
- List<String> tags = new ArrayList<>();
+ public List<SpServiceTag> extractServiceTags() {
+ List<SpServiceTag> tags = new ArrayList<>();
Collection<IAdapter> adapters = DeclarersSingleton.getInstance().getAllAdapters();
Collection<IProtocol> protocols = DeclarersSingleton.getInstance().getAllProtocols();
tags.addAll(ServiceDefinitionUtil.extractAppIdsFromAdapters(adapters));
tags.addAll(ServiceDefinitionUtil.extractAppIdsFromProtocols(protocols));
+ tags.add(SpServiceTag.create(SpServiceTagPrefix.SYSTEM, DefaultSpServiceTags.CONNECT_WORKER));
return tags;
}
diff --git a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
index d51a1ee..e915775 100644
--- a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
+++ b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.container.base;
import org.apache.streampipes.commons.networking.Networking;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
@@ -26,17 +27,20 @@ import org.springframework.boot.SpringApplication;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
public abstract class StreamPipesServiceBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamPipesServiceBase.class);
+ protected static final String AUTO_GENERATED_SERVICE_ID = UUID.randomUUID().toString();
+
protected void startStreamPipesService(Class<?> serviceClass,
String serviceGroup,
- String serviceName,
+ String serviceId,
Integer defaultPort) throws UnknownHostException {
- registerService(serviceGroup, serviceName, defaultPort);
- runApplication(serviceClass, defaultPort);
+ registerService(serviceGroup, serviceId, defaultPort);
+ runApplication(serviceClass, defaultPort);
}
private void runApplication(Class<?> serviceClass,
@@ -47,12 +51,12 @@ public abstract class StreamPipesServiceBase {
}
private void registerService(String serviceGroup,
- String serviceName,
+ String serviceId,
Integer defaultPort) throws UnknownHostException {
SpServiceDiscovery
.getServiceDiscovery()
.registerService(serviceGroup,
- serviceName,
+ serviceId,
getHostname(),
getPort(defaultPort),
getServiceTags());
@@ -66,7 +70,11 @@ public abstract class StreamPipesServiceBase {
return Networking.getPort(defaultPort);
}
- protected abstract List<String> getServiceTags();
+ protected abstract List<SpServiceTag> getServiceTags();
+ protected void deregisterService(String serviceId) {
+ LOG.info("Deregistering service (id={})...", serviceId);
+ SpServiceDiscovery.getServiceDiscovery().deregisterService(serviceId);
+ }
}
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 33182ef..ac9f572 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -19,10 +19,12 @@ package org.apache.streampipes.container.extensions;
import org.apache.streampipes.connect.container.worker.init.ConnectWorkerRegistrationService;
import org.apache.streampipes.connect.container.worker.init.ConnectWorkerTagProvider;
+import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.model.SpServiceDefinition;
import org.apache.streampipes.container.standalone.init.PipelineElementServiceShutdownHandler;
import org.apache.streampipes.container.standalone.init.PipelineElementServiceTagProvider;
import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -43,6 +45,7 @@ public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServ
@PreDestroy
public void onExit() {
new PipelineElementServiceShutdownHandler().onShutdown();
+ deregisterService(DeclarersSingleton.getInstance().getServiceId());
}
@Override
@@ -51,8 +54,8 @@ public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServ
}
@Override
- protected List<String> getServiceTags() {
- List<String> serviceTags = new PipelineElementServiceTagProvider().extractServiceTags();
+ protected List<SpServiceTag> getServiceTags() {
+ List<SpServiceTag> serviceTags = new PipelineElementServiceTagProvider().extractServiceTags();
serviceTags.addAll(new ConnectWorkerTagProvider().extractServiceTags());
return serviceTags;
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
index 9e78bcc..44ff9ba 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
@@ -20,17 +20,19 @@ package org.apache.streampipes.container.standalone.init;
import org.apache.streampipes.container.declarer.Declarer;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.util.ServiceDefinitionUtil;
-import org.apache.streampipes.svcdiscovery.SpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
import java.util.Collection;
import java.util.List;
public class PipelineElementServiceTagProvider {
- public List<String> extractServiceTags() {
+ public List<SpServiceTag> extractServiceTags() {
Collection<Declarer<?>> declarers = DeclarersSingleton.getInstance().getDeclarers().values();
- List<String> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
- serviceTags.add(SpServiceTags.PE);
+ List<SpServiceTag> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
+ serviceTags.add(SpServiceTag.create(SpServiceTagPrefix.SYSTEM, DefaultSpServiceTags.PE));
return serviceTags;
}
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 3fa7d54..547aef9 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.model.PeConfig;
import org.apache.streampipes.container.model.SpServiceDefinition;
import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -59,10 +60,11 @@ public abstract class StandaloneModelSubmitter extends StreamPipesExtensionsServ
@Override
public void onExit() {
new PipelineElementServiceShutdownHandler().onShutdown();
+ deregisterService(DeclarersSingleton.getInstance().getServiceId());
}
@Override
- protected List<String> getServiceTags() {
+ protected List<SpServiceTag> getServiceTags() {
return new PipelineElementServiceTagProvider().extractServiceTags();
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
index 6bbf0ea..b45d4d7 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinition.java
@@ -23,13 +23,11 @@ import org.apache.streampipes.container.declarer.Declarer;
import org.apache.streampipes.dataformat.SpDataFormatFactory;
import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class SpServiceDefinition {
+ private String serviceGroup;
private String serviceId;
private String serviceName;
private String serviceDescription;
@@ -43,6 +41,7 @@ public class SpServiceDefinition {
private Map<String, IAdapter> specificAdapters;
public SpServiceDefinition() {
+ this.serviceId = UUID.randomUUID().toString();
this.declarers = new ArrayList<>();
this.dataFormatFactories = new ArrayList<>();
this.protocolDefinitionFactories = new ArrayList<>();
@@ -50,6 +49,14 @@ public class SpServiceDefinition {
this.specificAdapters = new HashMap<>();
}
+ public String getServiceGroup() {
+ return serviceGroup;
+ }
+
+ public void setServiceGroup(String serviceGroup) {
+ this.serviceGroup = serviceGroup;
+ }
+
public String getServiceId() {
return serviceId;
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
index 8528fbc..eff9d86 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/SpServiceDefinitionBuilder.java
@@ -33,23 +33,23 @@ public class SpServiceDefinitionBuilder {
private SpServiceDefinition serviceDefinition;
private SpConfig config;
- public static SpServiceDefinitionBuilder create(String serviceId,
+ public static SpServiceDefinitionBuilder create(String serviceGroup,
String serviceName,
String serviceDescription,
Integer defaultPort) {
- return new SpServiceDefinitionBuilder(serviceId, serviceName, serviceDescription, defaultPort);
+ return new SpServiceDefinitionBuilder(serviceGroup, serviceName, serviceDescription, defaultPort);
}
- private SpServiceDefinitionBuilder(String serviceId,
+ private SpServiceDefinitionBuilder(String serviceGroup,
String serviceName,
String serviceDescription,
Integer defaultPort) {
this.serviceDefinition = new SpServiceDefinition();
- this.serviceDefinition.setServiceId(serviceId);
+ this.serviceDefinition.setServiceGroup(serviceGroup);
this.serviceDefinition.setServiceName(serviceName);
this.serviceDefinition.setServiceDescription(serviceDescription);
this.serviceDefinition.setDefaultPort(defaultPort);
- this.config = new ConsulSpConfig(serviceId);
+ this.config = new ConsulSpConfig(serviceGroup);
}
public SpServiceDefinitionBuilder withHostname(String hostname) {
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
index 9729283..6851381 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
@@ -20,6 +20,12 @@ package org.apache.streampipes.container.util;
import org.apache.streampipes.connect.api.IAdapter;
import org.apache.streampipes.connect.api.IProtocol;
import org.apache.streampipes.container.declarer.Declarer;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
import java.util.Collection;
import java.util.List;
@@ -27,24 +33,36 @@ import java.util.stream.Collectors;
public class ServiceDefinitionUtil {
- public static List<String> extractAppIds(Collection<Declarer<?>> declarers) {
+ public static List<SpServiceTag> extractAppIds(Collection<Declarer<?>> declarers) {
return declarers
.stream()
- .map(d -> d.declareModel().getAppId())
+ .map(d -> SpServiceTag.create(getPrefix(d.declareModel()), d.declareModel().getAppId()))
.collect(Collectors.toList());
}
- public static List<String> extractAppIdsFromAdapters(Collection<IAdapter> adapters) {
+ private static SpServiceTagPrefix getPrefix(NamedStreamPipesEntity entity) {
+ if (entity instanceof SpDataStream) {
+ return SpServiceTagPrefix.DATA_STREAM;
+ } else if (entity instanceof DataProcessorDescription) {
+ return SpServiceTagPrefix.DATA_PROCESSOR;
+ } else if (entity instanceof DataSinkDescription) {
+ return SpServiceTagPrefix.DATA_SINK;
+ } else {
+ throw new RuntimeException("Could not find service tag for entity " + entity.getClass().getSimpleName());
+ }
+ }
+
+ public static List<SpServiceTag> extractAppIdsFromAdapters(Collection<IAdapter> adapters) {
return adapters
.stream()
- .map(d -> d.declareModel().getAppId())
+ .map(d -> SpServiceTag.create(SpServiceTagPrefix.ADAPTER, d.declareModel().getAppId()))
.collect(Collectors.toList());
}
- public static List<String> extractAppIdsFromProtocols(Collection<IProtocol> protocols) {
+ public static List<SpServiceTag> extractAppIdsFromProtocols(Collection<IProtocol> protocols) {
return protocols
.stream()
- .map(p -> p.declareModel().getAppId())
+ .map(p -> SpServiceTag.create(SpServiceTagPrefix.PROTOCOL, p.declareModel().getAppId()))
.collect(Collectors.toList());
}
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
index d455e60..2d6630e 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
@@ -17,6 +17,8 @@
*/
package org.apache.streampipes.svcdiscovery.api;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+
import java.util.List;
import java.util.Map;
@@ -31,26 +33,7 @@ public interface ISpServiceDiscovery {
* @param port port of service endpoint
* @param tags tags of service
*/
- void registerService(String svcGroup, String svcId, String host, int port, List<String> tags);
-
- /**
- * Method to register a new pipeline element service endpoint.
- *
- * @param svcId unique service id
- * @param host host address of pipeline element service endpoint
- * @param port port of pipeline element service endpoint
- */
- void registerPeService(String svcId, String host, int port);
-
- /**
- * Method to register a new pipeline element service endpoint.
- *
- * @param svcId unique service id
- * @param host host address of pipeline element service endpoint
- * @param port port of pipeline element service endpoint
- * @param tags tags of service
- */
- void registerPeService(String svcId, String host, int port, List<String> tags);
+ void registerService(String svcGroup, String svcId, String host, int port, List<SpServiceTag> tags);
/**
* Get active pipeline element service endpoints
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
similarity index 87%
copy from streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
copy to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
index c6c5b02..d263f07 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
@@ -15,10 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery;
+package org.apache.streampipes.svcdiscovery.api.model;
-public class SpServiceGroups {
+public class DefaultSpServiceGroups {
public static final String CORE = "core";
- public static final String EXTENSIONS = "ext";
+
}
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
similarity index 92%
copy from streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java
copy to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
index 4520bd9..093df8c 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
@@ -15,9 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery;
+package org.apache.streampipes.svcdiscovery.api.model;
-public class SpServiceTags {
+public class DefaultSpServiceTags {
public static final String CORE = "core";
public static final String PE = "pe";
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
similarity index 58%
rename from streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java
rename to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
index 4520bd9..e003366 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceTags.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
@@ -15,13 +15,26 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery;
+package org.apache.streampipes.svcdiscovery.api.model;
-public class SpServiceTags {
+public class SpServiceTag {
- public static final String CORE = "core";
- public static final String PE = "pe";
- public static final String CONNECT_MASTER = "connect-master";
- public static final String CONNECT_WORKER = "connect-worker";
- public static final String STREAMPIPES_CLIENT = "streampipes-client";
+ private final SpServiceTagPrefix prefix;
+ private final String value;
+
+ private static final String COLON = ":";
+
+ public static SpServiceTag create(SpServiceTagPrefix prefix,
+ String value) {
+ return new SpServiceTag(prefix, value);
+ }
+
+ private SpServiceTag(SpServiceTagPrefix prefix, String value) {
+ this.prefix = prefix;
+ this.value = value;
+ }
+
+ public String asString() {
+ return prefix.asString() + COLON + value;
+ }
}
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
similarity index 67%
rename from streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
rename to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
index c6c5b02..12dde94 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
@@ -15,10 +15,23 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery;
+package org.apache.streampipes.svcdiscovery.api.model;
-public class SpServiceGroups {
+public enum SpServiceTagPrefix {
+ SYSTEM("sys"),
+ ADAPTER("adapter"),
+ PROTOCOL("protocol"),
+ DATA_STREAM("dstream"),
+ DATA_PROCESSOR("dprocessor"),
+ DATA_SINK("dsink");
- public static final String CORE = "core";
- public static final String EXTENSIONS = "ext";
+ private String prefix;
+
+ SpServiceTagPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public String asString() {
+ return this.prefix;
+ }
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index d595bb3..105d6da 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -27,12 +27,14 @@ import org.apache.http.client.fluent.Request;
import org.apache.http.entity.StringEntity;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.apache.streampipes.svcdiscovery.consul.model.ConsulServiceRegistrationBody;
import org.apache.streampipes.svcdiscovery.consul.model.HealthCheckConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
+import java.util.stream.Collectors;
public class SpConsulServiceDiscovery extends AbstractConsulService implements ISpServiceDiscovery {
@@ -46,12 +48,16 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
private static final String PE_SVC_TAG = "pe";
@Override
- public void registerService(String svcGroup, String svcId, String host, int port, List<String> tags) {
+ public void registerService(String svcGroup,
+ String svcId,
+ String host,
+ int port,
+ List<SpServiceTag> tags) {
boolean connected = false;
while (!connected) {
LOG.info("Trying to register service at Consul with svcGroup={}, svcId={} host={}, port={}. ", svcGroup, svcId, host, port);
- ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, tags);
+ ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, asString(tags));
connected = registerServiceHttpClient(svcRegistration);
if (!connected) {
@@ -66,15 +72,8 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
LOG.info("Successfully registered service at Consul: " + svcId);
}
- @Override
- public void registerPeService(String svcId, String host, int port) {
- registerService(PE_SVC_TAG, svcId, host, port, Collections.singletonList(PE_SVC_TAG));
- }
-
- @Override
- public void registerPeService(String svcId, String host, int port, List<String> tags) {
- tags.add(PE_SVC_TAG);
- registerService(PE_SVC_TAG, svcId, host, port, tags);
+ private List<String> asString(List<SpServiceTag> tags) {
+ return tags.stream().map(SpServiceTag::asString).collect(Collectors.toList());
}
@Override
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
index 78570b3..f9adc7e 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
@@ -21,7 +21,6 @@ package org.apache.streampipes.service.extensions.base;
import org.apache.streampipes.container.base.StreamPipesServiceBase;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.model.SpServiceDefinition;
-import org.apache.streampipes.svcdiscovery.SpServiceGroups;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +58,7 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic
SpServiceDefinition serviceDef) throws UnknownHostException {
this.startStreamPipesService(
serviceClass,
- SpServiceGroups.EXTENSIONS,
+ serviceDef.getServiceGroup(),
serviceDef.getServiceId(),
serviceDef.getDefaultPort()
);