You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/19 16:33:10 UTC
[incubator-streampipes] 02/02: [STREAMPIPES-386] Add base service
for extensions
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 4b1216311f3ead07e05c3cdf8f82eb923db1ac54
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Jun 19 18:32:50 2021 +0200
[STREAMPIPES-386] Add base service for extensions
---
pom.xml | 1 +
.../backend/StreamPipesBackendApplication.java | 15 ++--
.../container/base/StreamPipesServiceBase.java | 27 ++++----
streampipes-container-extensions/pom.xml | 15 ++--
.../extensions/ExtensionsModelSubmitter.java | 4 +-
streampipes-container-standalone/pom.xml | 4 +-
.../standalone/init/StandaloneModelSubmitter.java | 79 ++++++++--------------
.../container/init/DeclarersSingleton.java | 9 +++
.../container/util/ServiceDefinitionUtil.java | 3 +-
.../streampipes/svcdiscovery/SpServiceGroups.java | 9 ++-
streampipes-service-extensions-base/pom.xml | 32 +++++++++
.../base/StreamPipesExtensionsServiceBase.java | 78 +++++++++++++++++++++
12 files changed, 185 insertions(+), 91 deletions(-)
diff --git a/pom.xml b/pom.xml
index 0a0c3d9..a6f561f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -923,6 +923,7 @@
<module>streampipes-service-discovery-consul</module>
<module>streampipes-service-discovery-api</module>
<module>streampipes-connect-api</module>
+ <module>streampipes-service-extensions-base</module>
</modules>
<profiles>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 6c2ee97..e8af7a7 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -41,6 +41,7 @@ import org.springframework.context.annotation.Import;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.servlet.ServletContextListener;
+import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -70,7 +71,14 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
public static void main(String[] args) {
StreamPipesBackendApplication application = new StreamPipesBackendApplication();
- application.startStreamPipesService(StreamPipesBackendApplication.class, "core", "core");
+ try {
+ application.startStreamPipesService(StreamPipesBackendApplication.class,
+ "core",
+ "core",
+ 8030);
+ } catch (UnknownHostException e) {
+ LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
+ }
}
@PostConstruct
@@ -225,11 +233,6 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
}
@Override
- protected Integer getDefaultPort() {
- return 8030;
- }
-
- @Override
protected List<String> getServiceTags() {
return Arrays.asList(SpServiceTags.CORE, SpServiceTags.CONNECT_MASTER);
}
diff --git a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
index 33ab0fd..d51a1ee 100644
--- a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
+++ b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
@@ -33,29 +33,28 @@ public abstract class StreamPipesServiceBase {
protected void startStreamPipesService(Class<?> serviceClass,
String serviceGroup,
- String serviceName) {
- try {
- registerService(serviceGroup, serviceName);
- runApplication(serviceClass);
- } catch (UnknownHostException e) {
- LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
- }
+ String serviceName,
+ Integer defaultPort) throws UnknownHostException {
+ registerService(serviceGroup, serviceName, defaultPort);
+ runApplication(serviceClass, defaultPort);
}
- private void runApplication(Class<?> serviceClass) {
+ private void runApplication(Class<?> serviceClass,
+ Integer defaultPort) {
SpringApplication app = new SpringApplication(serviceClass);
- app.setDefaultProperties(Collections.singletonMap("server.port", getPort()));
+ app.setDefaultProperties(Collections.singletonMap("server.port", getPort(defaultPort)));
app.run();
}
private void registerService(String serviceGroup,
- String serviceName) throws UnknownHostException {
+ String serviceName,
+ Integer defaultPort) throws UnknownHostException {
SpServiceDiscovery
.getServiceDiscovery()
.registerService(serviceGroup,
serviceName,
getHostname(),
- getPort(),
+ getPort(defaultPort),
getServiceTags());
}
@@ -63,12 +62,10 @@ public abstract class StreamPipesServiceBase {
return Networking.getHostname();
}
- protected Integer getPort() {
- return Networking.getPort(getDefaultPort());
+ protected Integer getPort(Integer defaultPort) {
+ return Networking.getPort(defaultPort);
}
- protected abstract Integer getDefaultPort();
-
protected abstract List<String> getServiceTags();
diff --git a/streampipes-container-extensions/pom.xml b/streampipes-container-extensions/pom.xml
index f0def38..0df5985 100644
--- a/streampipes-container-extensions/pom.xml
+++ b/streampipes-container-extensions/pom.xml
@@ -33,12 +33,17 @@
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-connect-container-worker</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-container</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-container-base</artifactId>
+ <artifactId>streampipes-service-extensions-base</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
@@ -47,12 +52,6 @@
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-connect-container-worker</artifactId>
- <version>0.68.0-SNAPSHOT</version>
- </dependency>
-
<!-- Test dependencies -->
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 45491e1..1bb2b9c 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -21,7 +21,7 @@ import org.apache.streampipes.connect.api.IAdapter;
import org.apache.streampipes.connect.api.IProtocol;
import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.init.ModelSubmitter;
+import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
import org.apache.streampipes.container.init.RunningInstances;
import org.apache.streampipes.container.locales.LabelGenerator;
import org.apache.streampipes.container.model.ExtensionsConfig;
@@ -53,7 +53,7 @@ import java.util.List;
@Configuration
@EnableAutoConfiguration
@Import({ ExtensionsResourceConfig.class })
-public abstract class ExtensionsModelSubmitter extends ModelSubmitter<ExtensionsConfig> {
+public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServiceBase<ExtensionsConfig> {
private static final Logger LOG =
LoggerFactory.getLogger(ExtensionsModelSubmitter.class.getCanonicalName());
diff --git a/streampipes-container-standalone/pom.xml b/streampipes-container-standalone/pom.xml
index 8b2abdc..0064633 100644
--- a/streampipes-container-standalone/pom.xml
+++ b/streampipes-container-standalone/pom.xml
@@ -36,7 +36,7 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-container-base</artifactId>
+ <artifactId>streampipes-service-extensions-base</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
@@ -48,4 +48,4 @@
<!-- Test dependencies -->
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 16a869a..52ff8e4 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -19,90 +19,51 @@
package org.apache.streampipes.container.standalone.init;
-import org.apache.streampipes.commons.networking.Networking;
+import org.apache.streampipes.container.declarer.Declarer;
import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.init.ModelSubmitter;
import org.apache.streampipes.container.init.RunningInstances;
import org.apache.streampipes.container.model.PeConfig;
-import org.apache.streampipes.container.model.SpServiceDefinition;
import org.apache.streampipes.container.util.ServiceDefinitionUtil;
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
import org.apache.streampipes.svcdiscovery.SpServiceTags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
-import javax.annotation.PreDestroy;
import java.net.UnknownHostException;
-import java.util.Collections;
+import java.util.Collection;
import java.util.List;
@Configuration
@EnableAutoConfiguration
@Import({ PipelineElementContainerResourceConfig.class })
-public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig> {
+public abstract class StandaloneModelSubmitter extends StreamPipesExtensionsServiceBase<PeConfig> {
private static final Logger LOG =
LoggerFactory.getLogger(StandaloneModelSubmitter.class.getCanonicalName());
- public void init() {
- SpServiceDefinition serviceDef = provideServiceDefinition();
- init(serviceDef);
- }
-
- public void init(SpServiceDefinition serviceDef) {
- try {
- String host = Networking.getHostname();
- Integer port = Networking.getPort(serviceDef.getDefaultPort());
- List<String> serviceTags = ServiceDefinitionUtil.extractAppIds(serviceDef.getDeclarers());
- serviceTags.add(SpServiceTags.PE);
-
- DeclarersSingleton.getInstance().populate(host, port, serviceDef);
-
- SpServiceDiscovery
- .getServiceDiscovery()
- .registerPeService("pe/" +serviceDef.getServiceId(), host, port, serviceTags);
-
- startService(port);
- } catch (UnknownHostException e) {
- LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
- }
- }
-
@Deprecated
public void init(PeConfig peConfig) {
-
DeclarersSingleton.getInstance()
.setHostName(peConfig.getHost());
DeclarersSingleton.getInstance()
.setPort(peConfig.getPort());
- startService(peConfig.getPort());
-
- SpServiceDiscovery.getServiceDiscovery().registerPeService(
- peConfig.getId(),
- peConfig.getHost(),
- peConfig.getPort()
- );
- }
-
- private void startService(Integer port) {
- SpringApplication app = new SpringApplication(StandaloneModelSubmitter.class);
- app.setDefaultProperties(Collections.singletonMap("server.port", port));
- app.run();
- }
-
- public SpServiceDefinition provideServiceDefinition() {
- return null;
+ try {
+ startExtensionsService(this.getClass(),
+ peConfig.getId(),
+ DeclarersSingleton.getInstance().getPort());
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
}
- @PreDestroy
+ @Override
public void onExit() {
LOG.info("Shutting down StreamPipes pipeline element container...");
- Integer runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
+ int runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
while (runningInstancesCount > 0) {
LOG.info("Waiting for {} running pipeline elements to be stopped...", runningInstancesCount);
@@ -114,4 +75,18 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig>
runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
}
}
+
+ @Override
+ protected List<String> getServiceTags() {
+ Collection<Declarer<?>> declarers = DeclarersSingleton.getInstance().getDeclarers().values();
+ List<String> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
+ serviceTags.add(SpServiceTags.PE);
+
+ return serviceTags;
+ }
+
+ @Override
+ public void afterServiceRegistered() {
+
+ }
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
index 1ec7d1c..0ff7bcd 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
@@ -81,6 +81,7 @@ public class DeclarersSingleton {
return DeclarersSingleton.instance;
}
+
public void populate(String host, Integer port, SpServiceDefinition serviceDef) {
this.setHostName(host);
this.setPort(port);
@@ -96,6 +97,10 @@ public class DeclarersSingleton {
}
}
+ @Deprecated
+ /**
+ * @Deprecated Use ServiceDefinitionBuilder instead
+ */
public DeclarersSingleton add(Declarer<?> d) {
if (d instanceof SemanticEventProcessingAgentDeclarer) {
addEpaDeclarer((SemanticEventProcessingAgentDeclarer) d);
@@ -262,6 +267,10 @@ public class DeclarersSingleton {
this.port = port;
}
+ public int getPort() {
+ return this.port;
+ }
+
public void setHostName(String host) {
this.hostName = host;
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
index 101cc1c..7e8af0a 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
@@ -19,12 +19,13 @@ package org.apache.streampipes.container.util;
import org.apache.streampipes.container.declarer.Declarer;
+import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
public class ServiceDefinitionUtil {
- public static List<String> extractAppIds(List<Declarer<?>> declarers) {
+ public static List<String> extractAppIds(Collection<Declarer<?>> declarers) {
return declarers
.stream()
.map(d -> d.declareModel().getAppId())
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
similarity index 82%
rename from streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
rename to streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
index b2734b6..c6c5b02 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
+++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
@@ -15,11 +15,10 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.svcdiscovery;
-package org.apache.streampipes.container.init;
-
-public abstract class ModelSubmitter<T> {
-
- public abstract void init(T conf);
+public class SpServiceGroups {
+ public static final String CORE = "core";
+ public static final String EXTENSIONS = "ext";
}
diff --git a/streampipes-service-extensions-base/pom.xml b/streampipes-service-extensions-base/pom.xml
new file mode 100644
index 0000000..6038ab4
--- /dev/null
+++ b/streampipes-service-extensions-base/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streampipes-parent</artifactId>
+ <groupId>org.apache.streampipes</groupId>
+ <version>0.68.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streampipes-service-extensions-base</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-container</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-container-base</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+</project>
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
new file mode 100644
index 0000000..4226820
--- /dev/null
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.extensions.base;
+
+import org.apache.streampipes.container.base.StreamPipesServiceBase;
+import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.svcdiscovery.SpServiceGroups;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PreDestroy;
+import java.net.UnknownHostException;
+
+public abstract class StreamPipesExtensionsServiceBase<T> extends StreamPipesServiceBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class);
+
+ public void init() {
+ SpServiceDefinition serviceDef = provideServiceDefinition();
+ init(serviceDef);
+ }
+
+ public void init(SpServiceDefinition serviceDef) {
+ try {
+ String host = getHostname();
+ Integer port = getPort(serviceDef.getDefaultPort());
+ DeclarersSingleton.getInstance().populate(host, port, serviceDef);
+
+ String serviceId = serviceDef.getServiceId();
+
+ startExtensionsService(this.getClass(), serviceId, serviceDef.getDefaultPort());
+ } catch (UnknownHostException e) {
+ LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
+ }
+ }
+
+ public SpServiceDefinition provideServiceDefinition() {
+ return null;
+ }
+
+ @Deprecated
+ public abstract void init(T conf);
+
+ public abstract void afterServiceRegistered();
+
+ public void startExtensionsService(Class<?> serviceClass,
+ String serviceName,
+ Integer defaultPort) throws UnknownHostException {
+ this.startStreamPipesService(
+ serviceClass,
+ SpServiceGroups.EXTENSIONS,
+ serviceName,
+ defaultPort
+ );
+ this.afterServiceRegistered();
+ }
+
+ @PreDestroy
+ public abstract void onExit();
+
+}