You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/12/01 21:41:33 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-263] add ExtensionsModelSubmitter for bundled extensions module

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5cec898  [STREAMPIPES-263] add ExtensionsModelSubmitter for bundled extensions module
     new cc29299  Merge branch 'dev' into bundle
5cec898 is described below

commit 5cec898b37a6e3d114137f836a7653aec982b933
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Dec 1 22:39:54 2020 +0100

    [STREAMPIPES-263] add ExtensionsModelSubmitter for bundled extensions module
---
 pom.xml                                            |   1 +
 streampipes-container-extensions/pom.xml           |  58 ++++++++
 .../extensions/ExtensionsModelSubmitter.java       | 161 +++++++++++++++++++++
 .../extensions/ExtensionsResourceConfig.java       |  49 +++++++
 .../src/main/resources/banner.txt                  |   6 +
 .../standalone/init/StandaloneModelSubmitter.java  |   2 +-
 .../streampipes/container/init/ModelSubmitter.java |   6 +-
 .../ExtensionsConfig.java}                         |  13 +-
 8 files changed, 286 insertions(+), 10 deletions(-)

diff --git a/pom.xml b/pom.xml
index d2d2117..91f9c9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -892,6 +892,7 @@
         <module>streampipes-messaging-mqtt</module>
         <module>streampipes-maven-plugin</module>
         <module>streampipes-model-shared</module>
+        <module>streampipes-container-extensions</module>
     </modules>
 
     <profiles>
diff --git a/streampipes-container-extensions/pom.xml b/streampipes-container-extensions/pom.xml
new file mode 100644
index 0000000..f0def38
--- /dev/null
+++ b/streampipes-container-extensions/pom.xml
@@ -0,0 +1,58 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streampipes-parent</artifactId>
+        <groupId>org.apache.streampipes</groupId>
+        <version>0.68.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streampipes-container-extensions</artifactId>
+
+    <dependencies>
+        <!-- StreamPipes dependencies -->
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-container</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-container-base</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+
+        <!-- External dependencies -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jdk8</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-connect-container-worker</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
new file mode 100644
index 0000000..1921518
--- /dev/null
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.container.extensions;
+
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.model.generic.Protocol;
+import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
+import org.apache.streampipes.connect.init.AdapterDeclarerSingleton;
+import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.init.ModelSubmitter;
+import org.apache.streampipes.container.init.RunningInstances;
+import org.apache.streampipes.container.locales.LabelGenerator;
+import org.apache.streampipes.container.model.ExtensionsConfig;
+import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
+import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
+import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
+import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
+import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
+import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Configuration
+@EnableAutoConfiguration
+@Import({ ExtensionsResourceConfig.class })
+public abstract class ExtensionsModelSubmitter extends ModelSubmitter<ExtensionsConfig> {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ExtensionsModelSubmitter.class.getCanonicalName());
+
+    public void init(ExtensionsConfig conf) {
+        DeclarersSingleton.getInstance().setHostName(conf.getHost());
+        DeclarersSingleton.getInstance().setPort(conf.getPort());
+
+        DeclarersSingleton.getInstance().registerDataFormats(
+                new JsonDataFormatFactory(),
+                new CborDataFormatFactory(),
+                new SmileDataFormatFactory(),
+                new FstDataFormatFactory());
+
+        DeclarersSingleton.getInstance().registerProtocols(
+                new SpKafkaProtocolFactory(),
+                new SpMqttProtocolFactory(),
+                new SpJmsProtocolFactory());
+
+        LOG.info("Starting StreamPipes Extensions Bundle");
+        SpringApplication app = new SpringApplication(ExtensionsModelSubmitter.class);
+        app.setDefaultProperties(Collections.singletonMap("server.port", conf.getPort()));
+        app.run();
+
+        String backendUrl = "http://" + conf.getBackendHost() + ":" + conf.getBackendPort() + "/streampipes-backend";
+        String adapterUrl = "http://" + conf.getHost() + ":" + conf.getPort() + "/";
+
+        boolean connected = false;
+        while (!connected) {
+            LOG.info("Trying to connect to master in backend: " + backendUrl);
+            connected = MasterRestClient.register(backendUrl, getContainerDescription(adapterUrl));
+
+            if (!connected) {
+                LOG.info("Retrying in 5 seconds");
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        ConsulUtil.registerPeService(
+                conf.getId(),
+                conf.getHost(),
+                conf.getPort()
+        );
+    }
+
+    private ConnectWorkerContainer getContainerDescription(String endpointUrl) {
+        List<AdapterDescription> adapters = new ArrayList<>();
+        for (Adapter a : AdapterDeclarerSingleton.getInstance().getAllAdapters()) {
+            AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel(), endpointUrl);
+            adapters.add(desc);
+        }
+
+        List<ProtocolDescription> protocols = new ArrayList<>();
+        for (Protocol p : AdapterDeclarerSingleton.getInstance().getAllProtocols()) {
+            ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel(), endpointUrl);
+            protocols.add(desc);
+        }
+
+        ConnectWorkerContainer result = new ConnectWorkerContainer(endpointUrl, protocols, adapters);
+        return result;
+    }
+
+    private NamedStreamPipesEntity rewrite(NamedStreamPipesEntity entity, String endpointUrl) {
+        if (!(entity instanceof GenericAdapterDescription)) {
+            if (entity instanceof  ProtocolDescription) {
+                entity.setElementId(endpointUrl +  "protocol/" + entity.getElementId());
+            } else if (entity instanceof  AdapterDescription) {
+                entity.setElementId(endpointUrl + "adapter/" + entity.getElementId());
+            }
+        }
+
+        // TODO remove after full internationalization support has been implemented
+        if (entity.isIncludesLocales()) {
+            LabelGenerator lg = new LabelGenerator(entity);
+            try {
+                entity = lg.generateLabels();
+            } catch (IOException e) {
+                LOG.error("Could not load labels for: " + entity.getAppId());
+            }
+        }
+        return entity;
+    }
+
+    @PreDestroy
+    public void onExit() {
+        LOG.info("Shutting down StreamPipes extensions container...");
+        int runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
+
+        while (runningInstancesCount > 0) {
+            LOG.info("Waiting for {} running pipeline elements to be stopped...", runningInstancesCount);
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                LOG.error("Could not pause current thread...");
+            }
+            runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
+        }
+    }
+
+}
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
new file mode 100644
index 0000000..ec7e364
--- /dev/null
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.container.extensions;
+
+import org.apache.streampipes.connect.container.worker.rest.*;
+import org.apache.streampipes.container.api.*;
+import org.apache.streampipes.rest.shared.serializer.*;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ExtensionsResourceConfig extends ResourceConfig {
+    public ExtensionsResourceConfig() {
+        register(SecElement.class);
+        register(SepaElement.class);
+        register(SepElement.class);
+        register(WelcomePage.class);
+        register(PipelineTemplateElement.class);
+
+        //register(WelcomePageWorker.class);
+        register(GuessResource.class);
+        register(RuntimeResolvableResource.class);
+        register(WorkerResource.class);
+        register(MultiPartFeature.class);
+        register(AdapterResource.class);
+        register(ProtocolResource.class);
+        register(GsonWithIdProvider.class);
+        register(GsonWithoutIdProvider.class);
+        register(GsonClientModelProvider.class);
+        register(JsonLdProvider.class);
+        register(JacksonSerializationProvider.class);
+    }
+}
diff --git a/streampipes-container-extensions/src/main/resources/banner.txt b/streampipes-container-extensions/src/main/resources/banner.txt
new file mode 100644
index 0000000..584e843
--- /dev/null
+++ b/streampipes-container-extensions/src/main/resources/banner.txt
@@ -0,0 +1,6 @@
+ _______ __                              ______ __
+|     __|  |_.----.-----.---.-.--------.|   __ \__|.-----.-----.-----.
+|__     |   _|   _|  -__|  _  |        ||    __/  ||  _  |  -__|__ --|
+|_______|____|__| |_____|___._|__|__|__||___|  |__||   __|_____|_____|
+                                                   |__|
+** StreamPipes Extensions Container **
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index bc0bbce..52ceba0 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -38,7 +38,7 @@ import javax.annotation.PreDestroy;
 @Configuration
 @EnableAutoConfiguration
 @Import({ PipelineElementContainerResourceConfig.class })
-public abstract class StandaloneModelSubmitter extends ModelSubmitter {
+public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig> {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(StandaloneModelSubmitter.class.getCanonicalName());
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
index 3d14128..b2734b6 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
@@ -18,10 +18,8 @@
 
 package org.apache.streampipes.container.init;
 
-import org.apache.streampipes.container.model.PeConfig;
+public abstract class ModelSubmitter<T> {
 
-public abstract class ModelSubmitter {
-
-    public abstract void init(PeConfig peConfig);
+    public abstract void init(T conf);
 
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/ExtensionsConfig.java
similarity index 78%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
copy to streampipes-container/src/main/java/org/apache/streampipes/container/model/ExtensionsConfig.java
index 3d14128..56a6dc6 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/ExtensionsConfig.java
@@ -16,12 +16,15 @@
  *
  */
 
-package org.apache.streampipes.container.init;
+package org.apache.streampipes.container.model;
 
-import org.apache.streampipes.container.model.PeConfig;
+public interface ExtensionsConfig {
 
-public abstract class ModelSubmitter {
-
-    public abstract void init(PeConfig peConfig);
+    String getId();
+    String getHost();
+    int getPort();
+    String getName();
+    String getBackendHost();
+    int getBackendPort();
 
 }