You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/12/01 21:41:33 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-263] add
ExtensionsModelSubmitter for bundled extensions module
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 5cec898 [STREAMPIPES-263] add ExtensionsModelSubmitter for bundled extensions module
new cc29299 Merge branch 'dev' into bundle
5cec898 is described below
commit 5cec898b37a6e3d114137f836a7653aec982b933
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Dec 1 22:39:54 2020 +0100
[STREAMPIPES-263] add ExtensionsModelSubmitter for bundled extensions module
---
pom.xml | 1 +
streampipes-container-extensions/pom.xml | 58 ++++++++
.../extensions/ExtensionsModelSubmitter.java | 161 +++++++++++++++++++++
.../extensions/ExtensionsResourceConfig.java | 49 +++++++
.../src/main/resources/banner.txt | 6 +
.../standalone/init/StandaloneModelSubmitter.java | 2 +-
.../streampipes/container/init/ModelSubmitter.java | 6 +-
.../ExtensionsConfig.java} | 13 +-
8 files changed, 286 insertions(+), 10 deletions(-)
diff --git a/pom.xml b/pom.xml
index d2d2117..91f9c9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -892,6 +892,7 @@
<module>streampipes-messaging-mqtt</module>
<module>streampipes-maven-plugin</module>
<module>streampipes-model-shared</module>
+ <module>streampipes-container-extensions</module>
</modules>
<profiles>
diff --git a/streampipes-container-extensions/pom.xml b/streampipes-container-extensions/pom.xml
new file mode 100644
index 0000000..f0def38
--- /dev/null
+++ b/streampipes-container-extensions/pom.xml
@@ -0,0 +1,58 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streampipes-parent</artifactId>
+ <groupId>org.apache.streampipes</groupId>
+ <version>0.68.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streampipes-container-extensions</artifactId>
+
+ <dependencies>
+ <!-- StreamPipes dependencies -->
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-container</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-container-base</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- External dependencies -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jdk8</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-connect-container-worker</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
new file mode 100644
index 0000000..1921518
--- /dev/null
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.container.extensions;
+
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.model.generic.Protocol;
+import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
+import org.apache.streampipes.connect.init.AdapterDeclarerSingleton;
+import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.init.ModelSubmitter;
+import org.apache.streampipes.container.init.RunningInstances;
+import org.apache.streampipes.container.locales.LabelGenerator;
+import org.apache.streampipes.container.model.ExtensionsConfig;
+import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
+import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
+import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
+import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
+import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
+import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Configuration
+@EnableAutoConfiguration
+@Import({ ExtensionsResourceConfig.class })
+public abstract class ExtensionsModelSubmitter extends ModelSubmitter<ExtensionsConfig> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ExtensionsModelSubmitter.class.getCanonicalName());
+
+ public void init(ExtensionsConfig conf) {
+ DeclarersSingleton.getInstance().setHostName(conf.getHost());
+ DeclarersSingleton.getInstance().setPort(conf.getPort());
+
+ DeclarersSingleton.getInstance().registerDataFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory());
+
+ DeclarersSingleton.getInstance().registerProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpMqttProtocolFactory(),
+ new SpJmsProtocolFactory());
+
+ LOG.info("Starting StreamPipes Extensions Bundle");
+ SpringApplication app = new SpringApplication(ExtensionsModelSubmitter.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", conf.getPort()));
+ app.run();
+
+ String backendUrl = "http://" + conf.getBackendHost() + ":" + conf.getBackendPort() + "/streampipes-backend";
+ String adapterUrl = "http://" + conf.getHost() + ":" + conf.getPort() + "/";
+
+ boolean connected = false;
+ while (!connected) {
+ LOG.info("Trying to connect to master in backend: " + backendUrl);
+ connected = MasterRestClient.register(backendUrl, getContainerDescription(adapterUrl));
+
+ if (!connected) {
+ LOG.info("Retrying in 5 seconds");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ ConsulUtil.registerPeService(
+ conf.getId(),
+ conf.getHost(),
+ conf.getPort()
+ );
+ }
+
+ private ConnectWorkerContainer getContainerDescription(String endpointUrl) {
+ List<AdapterDescription> adapters = new ArrayList<>();
+ for (Adapter a : AdapterDeclarerSingleton.getInstance().getAllAdapters()) {
+ AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel(), endpointUrl);
+ adapters.add(desc);
+ }
+
+ List<ProtocolDescription> protocols = new ArrayList<>();
+ for (Protocol p : AdapterDeclarerSingleton.getInstance().getAllProtocols()) {
+ ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel(), endpointUrl);
+ protocols.add(desc);
+ }
+
+ ConnectWorkerContainer result = new ConnectWorkerContainer(endpointUrl, protocols, adapters);
+ return result;
+ }
+
+ private NamedStreamPipesEntity rewrite(NamedStreamPipesEntity entity, String endpointUrl) {
+ if (!(entity instanceof GenericAdapterDescription)) {
+ if (entity instanceof ProtocolDescription) {
+ entity.setElementId(endpointUrl + "protocol/" + entity.getElementId());
+ } else if (entity instanceof AdapterDescription) {
+ entity.setElementId(endpointUrl + "adapter/" + entity.getElementId());
+ }
+ }
+
+ // TODO remove after full internationalization support has been implemented
+ if (entity.isIncludesLocales()) {
+ LabelGenerator lg = new LabelGenerator(entity);
+ try {
+ entity = lg.generateLabels();
+ } catch (IOException e) {
+ LOG.error("Could not load labels for: " + entity.getAppId());
+ }
+ }
+ return entity;
+ }
+
+ @PreDestroy
+ public void onExit() {
+ LOG.info("Shutting down StreamPipes extensions container...");
+ int runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
+
+ while (runningInstancesCount > 0) {
+ LOG.info("Waiting for {} running pipeline elements to be stopped...", runningInstancesCount);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.error("Could not pause current thread...");
+ }
+ runningInstancesCount = RunningInstances.INSTANCE.getRunningInstancesCount();
+ }
+ }
+
+}
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
new file mode 100644
index 0000000..ec7e364
--- /dev/null
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.container.extensions;
+
+import org.apache.streampipes.connect.container.worker.rest.*;
+import org.apache.streampipes.container.api.*;
+import org.apache.streampipes.rest.shared.serializer.*;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ExtensionsResourceConfig extends ResourceConfig {
+ public ExtensionsResourceConfig() {
+ register(SecElement.class);
+ register(SepaElement.class);
+ register(SepElement.class);
+ register(WelcomePage.class);
+ register(PipelineTemplateElement.class);
+
+ //register(WelcomePageWorker.class);
+ register(GuessResource.class);
+ register(RuntimeResolvableResource.class);
+ register(WorkerResource.class);
+ register(MultiPartFeature.class);
+ register(AdapterResource.class);
+ register(ProtocolResource.class);
+ register(GsonWithIdProvider.class);
+ register(GsonWithoutIdProvider.class);
+ register(GsonClientModelProvider.class);
+ register(JsonLdProvider.class);
+ register(JacksonSerializationProvider.class);
+ }
+}
diff --git a/streampipes-container-extensions/src/main/resources/banner.txt b/streampipes-container-extensions/src/main/resources/banner.txt
new file mode 100644
index 0000000..584e843
--- /dev/null
+++ b/streampipes-container-extensions/src/main/resources/banner.txt
@@ -0,0 +1,6 @@
+ _______ __ ______ __
+| __| |_.----.-----.---.-.--------.| __ \__|.-----.-----.-----.
+|__ | _| _| -__| _ | || __/ || _ | -__|__ --|
+|_______|____|__| |_____|___._|__|__|__||___| |__|| __|_____|_____|
+ |__|
+** StreamPipes Extensions Container **
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index bc0bbce..52ceba0 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -38,7 +38,7 @@ import javax.annotation.PreDestroy;
@Configuration
@EnableAutoConfiguration
@Import({ PipelineElementContainerResourceConfig.class })
-public abstract class StandaloneModelSubmitter extends ModelSubmitter {
+public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig> {
private static final Logger LOG =
LoggerFactory.getLogger(StandaloneModelSubmitter.class.getCanonicalName());
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
index 3d14128..b2734b6 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
@@ -18,10 +18,8 @@
package org.apache.streampipes.container.init;
-import org.apache.streampipes.container.model.PeConfig;
+public abstract class ModelSubmitter<T> {
-public abstract class ModelSubmitter {
-
- public abstract void init(PeConfig peConfig);
+ public abstract void init(T conf);
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/ExtensionsConfig.java
similarity index 78%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
copy to streampipes-container/src/main/java/org/apache/streampipes/container/model/ExtensionsConfig.java
index 3d14128..56a6dc6 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/ModelSubmitter.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/ExtensionsConfig.java
@@ -16,12 +16,15 @@
*
*/
-package org.apache.streampipes.container.init;
+package org.apache.streampipes.container.model;
-import org.apache.streampipes.container.model.PeConfig;
+public interface ExtensionsConfig {
-public abstract class ModelSubmitter {
-
- public abstract void init(PeConfig peConfig);
+ String getId();
+ String getHost();
+ int getPort();
+ String getName();
+ String getBackendHost();
+ int getBackendPort();
}