You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/19 16:33:10 UTC

[incubator-streampipes] 02/02: [STREAMPIPES-386] Add base service for extensions

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 4b1216311f3ead07e05c3cdf8f82eb923db1ac54
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Jun 19 18:32:50 2021 +0200

    [STREAMPIPES-386] Add base service for extensions
---
 pom.xml                                            |  1 +
 .../backend/StreamPipesBackendApplication.java     | 15 ++--
 .../container/base/StreamPipesServiceBase.java     | 27 ++++----
 streampipes-container-extensions/pom.xml           | 15 ++--
 .../extensions/ExtensionsModelSubmitter.java       |  4 +-
 streampipes-container-standalone/pom.xml           |  4 +-
 .../standalone/init/StandaloneModelSubmitter.java  | 79 ++++++++--------------
 .../container/init/DeclarersSingleton.java         |  9 +++
 .../container/util/ServiceDefinitionUtil.java      |  3 +-
 .../streampipes/svcdiscovery/SpServiceGroups.java  |  9 ++-
 streampipes-service-extensions-base/pom.xml        | 32 +++++++++
 .../base/StreamPipesExtensionsServiceBase.java     | 78 +++++++++++++++++++++
 12 files changed, 185 insertions(+), 91 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0a0c3d9..a6f561f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -923,6 +923,7 @@
         <module>streampipes-service-discovery-consul</module>
         <module>streampipes-service-discovery-api</module>
         <module>streampipes-connect-api</module>
+        <module>streampipes-service-extensions-base</module>
     </modules>
 
     <profiles>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 6c2ee97..e8af7a7 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -41,6 +41,7 @@ import org.springframework.context.annotation.Import;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.servlet.ServletContextListener;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -70,7 +71,14 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
 
   public static void main(String[] args) {
       StreamPipesBackendApplication application = new StreamPipesBackendApplication();
-      application.startStreamPipesService(StreamPipesBackendApplication.class, "core", "core");
+      try {
+        application.startStreamPipesService(StreamPipesBackendApplication.class,
+                "core",
+                "core",
+                8030);
+      } catch (UnknownHostException e) {
+        LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
+      }
   }
 
   @PostConstruct
@@ -225,11 +233,6 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
   }
 
   @Override
-  protected Integer getDefaultPort() {
-    return 8030;
-  }
-
-  @Override
   protected List<String> getServiceTags() {
     return Arrays.asList(SpServiceTags.CORE, SpServiceTags.CONNECT_MASTER);
   }
diff --git a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
index 33ab0fd..d51a1ee 100644
--- a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
+++ b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
@@ -33,29 +33,28 @@ public abstract class StreamPipesServiceBase {
 
   protected void startStreamPipesService(Class<?> serviceClass,
                                          String serviceGroup,
-                                         String serviceName) {
-    try {
-      registerService(serviceGroup, serviceName);
-      runApplication(serviceClass);
-    } catch (UnknownHostException e) {
-      LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
-    }
+                                         String serviceName,
+                                         Integer defaultPort) throws UnknownHostException {
+      registerService(serviceGroup, serviceName, defaultPort);
+      runApplication(serviceClass, defaultPort);
   }
 
-  private void runApplication(Class<?> serviceClass) {
+  private void runApplication(Class<?> serviceClass,
+                              Integer defaultPort) {
     SpringApplication app = new SpringApplication(serviceClass);
-    app.setDefaultProperties(Collections.singletonMap("server.port", getPort()));
+    app.setDefaultProperties(Collections.singletonMap("server.port", getPort(defaultPort)));
     app.run();
   }
 
   private void registerService(String serviceGroup,
-                               String serviceName) throws UnknownHostException {
+                               String serviceName,
+                               Integer defaultPort) throws UnknownHostException {
     SpServiceDiscovery
             .getServiceDiscovery()
             .registerService(serviceGroup,
                     serviceName,
                     getHostname(),
-                    getPort(),
+                    getPort(defaultPort),
                     getServiceTags());
   }
 
@@ -63,12 +62,10 @@ public abstract class StreamPipesServiceBase {
     return Networking.getHostname();
   }
 
-  protected Integer getPort() {
-    return Networking.getPort(getDefaultPort());
+  protected Integer getPort(Integer defaultPort) {
+    return Networking.getPort(defaultPort);
   }
 
-  protected abstract Integer getDefaultPort();
-
   protected abstract List<String> getServiceTags();
 
 
diff --git a/streampipes-container-extensions/pom.xml b/streampipes-container-extensions/pom.xml
index f0def38..0df5985 100644
--- a/streampipes-container-extensions/pom.xml
+++ b/streampipes-container-extensions/pom.xml
@@ -33,12 +33,17 @@
         <!-- StreamPipes dependencies -->
         <dependency>
             <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-connect-container-worker</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-container</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-container-base</artifactId>
+            <artifactId>streampipes-service-extensions-base</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
 
@@ -47,12 +52,6 @@
             <groupId>com.fasterxml.jackson.datatype</groupId>
             <artifactId>jackson-datatype-jdk8</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-connect-container-worker</artifactId>
-            <version>0.68.0-SNAPSHOT</version>
-        </dependency>
-
         <!-- Test dependencies -->
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 45491e1..1bb2b9c 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -21,7 +21,7 @@ import org.apache.streampipes.connect.api.IAdapter;
 import org.apache.streampipes.connect.api.IProtocol;
 import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
 import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.init.ModelSubmitter;
+import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
 import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.container.locales.LabelGenerator;
 import org.apache.streampipes.container.model.ExtensionsConfig;
@@ -53,7 +53,7 @@ import java.util.List;
 @Configuration
 @EnableAutoConfiguration
 @Import({ ExtensionsResourceConfig.class })
-public abstract class ExtensionsModelSubmitter extends ModelSubmitter<ExtensionsConfig> {
+public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServiceBase<ExtensionsConfig> {
     private static final Logger LOG =
             LoggerFactory.getLogger(ExtensionsModelSubmitter.class.getCanonicalName());
 
diff --git a/streampipes-container-standalone/pom.xml b/streampipes-container-standalone/pom.xml
index 8b2abdc..0064633 100644
--- a/streampipes-container-standalone/pom.xml
+++ b/streampipes-container-standalone/pom.xml
@@ -36,7 +36,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-container-base</artifactId>
+            <artifactId>streampipes-service-extensions-base</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
 
@@ -48,4 +48,4 @@
 
         <!-- Test dependencies -->
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 16a869a..52ff8e4 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -19,90 +19,51 @@
 package org.apache.streampipes.container.standalone.init;
 
 
-import org.apache.streampipes.commons.networking.Networking;
+import org.apache.streampipes.container.declarer.Declarer;
 import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.init.ModelSubmitter;
 import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.container.model.PeConfig;
-import org.apache.streampipes.container.model.SpServiceDefinition;
 import org.apache.streampipes.container.util.ServiceDefinitionUtil;
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.service.extensions.base.StreamPipesExtensionsServiceBase;
 import org.apache.streampipes.svcdiscovery.SpServiceTags;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
 
-import javax.annotation.PreDestroy;
 import java.net.UnknownHostException;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.List;
 
 @Configuration
 @EnableAutoConfiguration
 @Import({ PipelineElementContainerResourceConfig.class })
-public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig> {
+public abstract class StandaloneModelSubmitter extends StreamPipesExtensionsServiceBase<PeConfig> {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(StandaloneModelSubmitter.class.getCanonicalName());
 
-    public void init() {
-        SpServiceDefinition serviceDef = provideServiceDefinition();
-        init(serviceDef);
-    }
-
-    public void init(SpServiceDefinition serviceDef) {
-        try {
-            String host = Networking.getHostname();
-            Integer port = Networking.getPort(serviceDef.getDefaultPort());
-            List<String> serviceTags = ServiceDefinitionUtil.extractAppIds(serviceDef.getDeclarers());
-            serviceTags.add(SpServiceTags.PE);
-
-            DeclarersSingleton.getInstance().populate(host, port, serviceDef);
-
-            SpServiceDiscovery
-                    .getServiceDiscovery()
-                    .registerPeService("pe/" +serviceDef.getServiceId(), host, port, serviceTags);
-
-            startService(port);
-        } catch (UnknownHostException e) {
-            LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
-        }
-    }
-
     @Deprecated
     public void init(PeConfig peConfig) {
-
         DeclarersSingleton.getInstance()
                 .setHostName(peConfig.getHost());
         DeclarersSingleton.getInstance()
                 .setPort(peConfig.getPort());
 
-        startService(peConfig.getPort());
-
-        SpServiceDiscovery.getServiceDiscovery().registerPeService(
-                peConfig.getId(),
-                peConfig.getHost(),
-                peConfig.getPort()
-        );
-    }
-
-    private void startService(Integer port) {
-        SpringApplication app = new SpringApplication(StandaloneModelSubmitter.class);
-        app.setDefaultProperties(Collections.singletonMap("server.port", port));
-        app.run();
-    }
-
-    public SpServiceDefinition provideServiceDefinition() {
-        return null;
+        try {
+            startExtensionsService(this.getClass(),
+                    peConfig.getId(),
+                    DeclarersSingleton.getInstance().getPort());
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
     }
 
-    @PreDestroy
+    @Override
     public void onExit() {
         LOG.info("Shutting down StreamPipes pipeline element container...");
-        Integer runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
+        int runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
 
         while (runningInstancesCount > 0) {
             LOG.info("Waiting for {} running pipeline elements to be stopped...", runningInstancesCount);
@@ -114,4 +75,18 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig>
             runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
         }
     }
+
+    @Override
+    protected List<String> getServiceTags() {
+        Collection<Declarer<?>> declarers = DeclarersSingleton.getInstance().getDeclarers().values();
+        List<String> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
+        serviceTags.add(SpServiceTags.PE);
+
+        return serviceTags;
+    }
+
+    @Override
+    public void afterServiceRegistered() {
+
+    }
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
index 1ec7d1c..0ff7bcd 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
@@ -81,6 +81,7 @@ public class DeclarersSingleton {
     return DeclarersSingleton.instance;
   }
 
+
   public void populate(String host, Integer port, SpServiceDefinition serviceDef) {
     this.setHostName(host);
     this.setPort(port);
@@ -96,6 +97,10 @@ public class DeclarersSingleton {
     }
   }
 
+  @Deprecated
+  /**
+   * @Deprecated Use ServiceDefinitionBuilder instead
+   */
   public DeclarersSingleton add(Declarer<?> d) {
     if (d instanceof SemanticEventProcessingAgentDeclarer) {
       addEpaDeclarer((SemanticEventProcessingAgentDeclarer) d);
@@ -262,6 +267,10 @@ public class DeclarersSingleton {
     this.port = port;
   }
 
+  public int getPort() {
+    return this.port;
+  }
+
   public void setHostName(String host) {
     this.hostName = host;
   }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
index 101cc1c..7e8af0a 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ServiceDefinitionUtil.java
@@ -19,12 +19,13 @@ package org.apache.streampipes.container.util;
 
 import org.apache.streampipes.container.declarer.Declarer;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 
 public class ServiceDefinitionUtil {
 
-  public static List<String> extractAppIds(List<Declarer<?>> declarers) {
+  public static List<String> extractAppIds(Collection<Declarer<?>> declarers) {
     return declarers
             .stream()
             .map(d -> d.declareModel().getAppId())
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
similarity index 82%
rename from streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
rename to streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
index b2734b6..c6c5b02 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
+++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceGroups.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.svcdiscovery;
 
-package org.apache.streampipes.container.init;
-
-public abstract class ModelSubmitter<T> {
-
-    public abstract void init(T conf);
+public class SpServiceGroups {
 
+  public static final String CORE = "core";
+  public static final String EXTENSIONS = "ext";
 }
diff --git a/streampipes-service-extensions-base/pom.xml b/streampipes-service-extensions-base/pom.xml
new file mode 100644
index 0000000..6038ab4
--- /dev/null
+++ b/streampipes-service-extensions-base/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streampipes-parent</artifactId>
+        <groupId>org.apache.streampipes</groupId>
+        <version>0.68.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streampipes-service-extensions-base</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-container</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-container-base</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+</project>
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
new file mode 100644
index 0000000..4226820
--- /dev/null
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.extensions.base;
+
+import org.apache.streampipes.container.base.StreamPipesServiceBase;
+import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.svcdiscovery.SpServiceGroups;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PreDestroy;
+import java.net.UnknownHostException;
+
+public abstract class StreamPipesExtensionsServiceBase<T> extends StreamPipesServiceBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class);
+
+    public void init() {
+        SpServiceDefinition serviceDef = provideServiceDefinition();
+        init(serviceDef);
+    }
+
+    public void init(SpServiceDefinition serviceDef) {
+        try {
+        String host = getHostname();
+        Integer port = getPort(serviceDef.getDefaultPort());
+        DeclarersSingleton.getInstance().populate(host, port, serviceDef);
+
+        String serviceId = serviceDef.getServiceId();
+
+        startExtensionsService(this.getClass(), serviceId, serviceDef.getDefaultPort());
+        } catch (UnknownHostException e) {
+            LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
+        }
+    }
+
+    public SpServiceDefinition provideServiceDefinition() {
+        return null;
+    }
+
+    @Deprecated
+    public abstract void init(T conf);
+
+    public abstract void afterServiceRegistered();
+
+    public void startExtensionsService(Class<?> serviceClass,
+                                       String serviceName,
+                                       Integer defaultPort) throws UnknownHostException {
+        this.startStreamPipesService(
+                serviceClass,
+                SpServiceGroups.EXTENSIONS,
+                serviceName,
+                defaultPort
+        );
+        this.afterServiceRegistered();
+    }
+
+    @PreDestroy
+    public abstract void onExit();
+
+}