You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/12/19 21:45:43 UTC
[incubator-streampipes] branch edge-extensions updated: [WIP]
initial support for data stream relays from edge nodes
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 9f4168b [WIP] initial support for data stream relays from edge nodes
9f4168b is described below
commit 9f4168b79d7500b0435ed611a1d9733eb7d6743e
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Sat Dec 19 22:45:20 2020 +0100
[WIP] initial support for data stream relays from edge nodes
---
.../extensions/ExtensionsModelSubmitter.java | 12 +-
.../streampipes/model/SpDataStreamRelay.java | 1 +
.../model/SpDataStreamRelayContainer.java | 139 ++++++++
.../model/graph/DataProcessorInvocation.java | 2 +
.../streampipes/model/pipeline/Pipeline.java | 10 +
.../management/pe/InvocableElementManager.java | 2 +
.../{EventRelayManager.java => EventRelay.java} | 6 +-
.../management/relay/RunningRelayInstances.java | 23 +-
.../relay/bridges/MultiBrokerBridge.java | 1 +
.../container/rest/DataStreamRelayResource.java | 80 +++++
.../container/rest/DebugRelayResource.java | 57 ---
.../container/rest/InfoStatusResource.java | 4 +-
.../container/rest/InvocableEntityResource.java | 57 ++-
.../rest/NodeControllerResourceConfig.java | 4 +-
.../manager/execution/http/GraphSubmitter.java | 12 +-
.../manager/execution/http/HttpRequestBuilder.java | 25 +-
.../manager/execution/http/PipelineExecutor.java | 146 +++++---
.../http/StreamRelayEndpointUrlGenerator.java | 55 +++
.../manager/matching/InvocationGraphBuilder.java | 381 +++++++++++++--------
.../jsonld/CustomAnnotationProvider.java | 3 +-
.../serializers/jsonld/JsonLdTransformer.java | 3 +-
.../apache/streampipes/vocabulary/StreamPipes.java | 2 +
ui/src/app/core-model/gen/streampipes-model.ts | 4 +-
.../save-pipeline/save-pipeline.component.html | 46 ---
.../save-pipeline/save-pipeline.component.ts | 2 +
25 files changed, 729 insertions(+), 348 deletions(-)
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 82983c7..0c28bb6 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -64,6 +64,10 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<Extensions
private static final String SLASH = "/";
private static final String COLON = ":";
+ private static final String NODE_CONTROLLER_ID = "SP_NODE_CONTROLLER_ID";
+ private static final String NODE_CONTROLLER_CONTAINER_HOST = "SP_NODE_CONTROLLER_CONTAINER_HOST";
+ private static final String NODE_CONTROLLER_CONTAINER_PORT = "SP_NODE_CONTROLLER_CONTAINER_PORT";
+
public void init(ExtensionsConfig conf) {
DeclarersSingleton.getInstance().setHostName(conf.getHost());
DeclarersSingleton.getInstance().setPort(conf.getPort());
@@ -89,7 +93,7 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<Extensions
String adapterUrl = PROTOCOL + conf.getHost() + COLON + conf.getPort() + SLASH;
// check wether pipeline element is managed by node controller
- if (System.getenv("SP_NODE_CONTROLLER_ID") != null) {
+ if (System.getenv(NODE_CONTROLLER_ID) != null) {
// secondary
// register pipeline element service via node controller
NodeControllerUtil.register(
@@ -142,9 +146,9 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<Extensions
private ConnectWorkerContainer getContainerDescription(String endpointUrl, boolean runsOnEdgeNode) {
if (runsOnEdgeNode) {
- String deploymentTargetNodeId = System.getenv("SP_NODE_CONTROLLER_ID");
- String deploymentTargetNodeHostname = System.getenv("SP_NODE_CONTROLLER_CONTAINER_HOST");
- int deploymentTargetNodePort = Integer.parseInt(System.getenv("SP_NODE_CONTROLLER_CONTAINER_PORT"));
+ String deploymentTargetNodeId = System.getenv(NODE_CONTROLLER_ID);
+ String deploymentTargetNodeHostname = System.getenv(NODE_CONTROLLER_CONTAINER_HOST);
+ int deploymentTargetNodePort = Integer.parseInt(System.getenv(NODE_CONTROLLER_CONTAINER_PORT));
List<AdapterDescription> adapters = new ArrayList<>();
for (Adapter a : AdapterDeclarerSingleton.getInstance().getAllAdapters()) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
index 4486dac..addbd55 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
@@ -59,6 +59,7 @@ public class SpDataStreamRelay extends NamedStreamPipesEntity {
}
public SpDataStreamRelay(EventGrounding eventGrounding) {
+ super(prefix + RandomStringUtils.randomAlphabetic(6));
this.eventGrounding = eventGrounding;
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
new file mode 100644
index 0000000..cb1cd40
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model;
+
+import io.fogsy.empire.annotations.RdfProperty;
+import io.fogsy.empire.annotations.RdfsClass;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.vocabulary.StreamPipes;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.OneToOne;
+import java.util.ArrayList;
+import java.util.List;
+
+
+@RdfsClass(StreamPipes.DATA_STREAM_RELAY_CONTAINER)
+@Entity
+public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
+
+ private static final long serialVersionUID = -4675162465357705480L;
+
+ private static final String prefix = "urn:apache.org:relaystreamcontainer:";
+
+ @OneToOne(fetch = FetchType.EAGER,
+ cascade = {CascadeType.PERSIST, CascadeType.MERGE})
+ @RdfProperty(StreamPipes.HAS_GROUNDING)
+ protected EventGrounding inputGrounding;
+
+ @OneToOne(fetch = FetchType.EAGER,
+ cascade = {CascadeType.PERSIST, CascadeType.MERGE})
+ @RdfProperty(StreamPipes.HAS_EVENT_RELAY)
+ private List<SpDataStreamRelay> outputStreamRelays;
+
+ @RdfProperty(StreamPipes.HAS_EVENT_RELAY_STRATEGY)
+ private String eventRelayStrategy;
+
+ @RdfProperty(StreamPipes.DATA_STREAM_RELAY_RUNNING_INSTANCE_ID)
+ private String runningStreamRelayInstanceId;
+
+ @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_ID)
+ private String deploymentTargetNodeId;
+
+ @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_HOSTNAME)
+ private String deploymentTargetNodeHostname;
+
+ @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_PORT)
+ private Integer deploymentTargetNodePort;
+
+ public SpDataStreamRelayContainer() {
+ super(prefix + RandomStringUtils.randomAlphabetic(6));
+ this.outputStreamRelays = new ArrayList<>();
+ }
+
+ public SpDataStreamRelayContainer(String elementId, List<SpDataStreamRelay> outputStreamRelays,
+ String eventRelayStrategy) {
+ super(elementId);
+ this.outputStreamRelays = outputStreamRelays;
+ this.eventRelayStrategy = eventRelayStrategy;
+ }
+
+ public SpDataStreamRelayContainer(NamedStreamPipesEntity other) {
+ super(other);
+ }
+
+ public EventGrounding getInputGrounding() {
+ return inputGrounding;
+ }
+
+ public void setInputGrounding(EventGrounding inputGrounding) {
+ this.inputGrounding = inputGrounding;
+ }
+
+ public List<SpDataStreamRelay> getOutputStreamRelays() {
+ return outputStreamRelays;
+ }
+
+ public void setOutputStreamRelays(List<SpDataStreamRelay> outputStreamRelays) {
+ this.outputStreamRelays = outputStreamRelays;
+ }
+
+ public String getRunningStreamRelayInstanceId() {
+ return runningStreamRelayInstanceId;
+ }
+
+ public void setRunningStreamRelayInstanceId(String runningStreamRelayInstanceId) {
+ this.runningStreamRelayInstanceId = runningStreamRelayInstanceId;
+ }
+
+ public String getEventRelayStrategy() {
+ return eventRelayStrategy;
+ }
+
+ public void setEventRelayStrategy(String eventRelayStrategy) {
+ this.eventRelayStrategy = eventRelayStrategy;
+ }
+
+ public String getDeploymentTargetNodeId() {
+ return deploymentTargetNodeId;
+ }
+
+ public void setDeploymentTargetNodeId(String deploymentTargetNodeId) {
+ this.deploymentTargetNodeId = deploymentTargetNodeId;
+ }
+
+ public String getDeploymentTargetNodeHostname() {
+ return deploymentTargetNodeHostname;
+ }
+
+ public void setDeploymentTargetNodeHostname(String deploymentTargetNodeHostname) {
+ this.deploymentTargetNodeHostname = deploymentTargetNodeHostname;
+ }
+
+ public Integer getDeploymentTargetNodePort() {
+ return deploymentTargetNodePort;
+ }
+
+ public void setDeploymentTargetNodePort(Integer deploymentTargetNodePort) {
+ this.deploymentTargetNodePort = deploymentTargetNodePort;
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index bd168c9..016f3c7 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -174,6 +174,8 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
public boolean addOutputStreamRelay(SpDataStreamRelay spDataStreamRelay) { return outputStreamRelays.add(spDataStreamRelay); }
+ public boolean removeOutputStreamRelay(SpDataStreamRelay spDataStreamRelay) { return outputStreamRelays.remove(spDataStreamRelay); }
+
public List<SpDataStreamRelay> getOutputStreamRelays() {
return outputStreamRelays;
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
index 5de1173..58774ca 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
@@ -44,6 +44,7 @@ public class Pipeline extends ElementComposition {
private boolean publicElement;
private String createdByUser;
+ private String eventRelayStrategy;
private List<String> pipelineCategories;
@@ -133,6 +134,14 @@ public class Pipeline extends ElementComposition {
this.createdAt = createdAt;
}
+ public String getEventRelayStrategy() {
+ return eventRelayStrategy;
+ }
+
+ public void setEventRelayStrategy(String eventRelayStrategy) {
+ this.eventRelayStrategy = eventRelayStrategy;
+ }
+
public Pipeline clone() {
Pipeline pipeline = new Pipeline();
pipeline.setName(name);
@@ -145,6 +154,7 @@ public class Pipeline extends ElementComposition {
pipeline.setCreatedAt(createdAt);
pipeline.setPipelineId(pipelineId);
pipeline.setRev(rev);
+ pipeline.setEventRelayStrategy(eventRelayStrategy);
return pipeline;
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 854e4a6..3b57df8 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -69,6 +69,8 @@ public class InvocableElementManager implements ElementLifeCyle {
NodeInfoStorage.getInstance()
.retrieveNodeInfo()
.setSupportedPipelineElementAppIds(registration.getSupportedPipelineElementAppIds());
+
+ LOG.info("Successfully registered pipeline element container");
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
similarity index 86%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
index 84882bf..7c52867 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
@@ -21,15 +21,15 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
-public class EventRelayManager extends BaseEventRelay {
+public class EventRelay extends BaseEventRelay {
private static final String DEFAULT_EVENT_RELAY_STRATEGY = "buffer";
- public EventRelayManager(TransportProtocol source, TransportProtocol target) {
+ public EventRelay(TransportProtocol source, TransportProtocol target) {
super(source, target, DEFAULT_EVENT_RELAY_STRATEGY);
}
- public EventRelayManager(TransportProtocol source, TransportProtocol target, String relayStrategy) {
+ public EventRelay(TransportProtocol source, TransportProtocol target, String relayStrategy) {
super(source, target, relayStrategy);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index a3d1649..e95ab57 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -19,21 +19,18 @@ package org.apache.streampipes.node.controller.container.management.relay;
import org.apache.streampipes.node.controller.container.management.IRunningInstances;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.stream.Collectors;
-public enum RunningRelayInstances implements IRunningInstances<EventRelayManager> {
+public enum RunningRelayInstances implements IRunningInstances<Map<String,EventRelay>> {
INSTANCE;
- private final Map<String, EventRelayManager> runningInstances = new HashMap<>();
+ private final Map<String, Map<String, EventRelay>> runningInstances = new HashMap<>();
// TODO: persist active relays to support failure handling
@Override
- public void add(String id, EventRelayManager eventRelayManager) {
- runningInstances.put(id, eventRelayManager);
+ public void add(String id, Map<String,EventRelay> eventRelayMap) {
+ runningInstances.put(id, eventRelayMap);
}
@Override
@@ -42,7 +39,7 @@ public enum RunningRelayInstances implements IRunningInstances<EventRelayManager
}
@Override
- public EventRelayManager get(String id) {
+ public Map<String,EventRelay> get(String id) {
return isRunning(id) ? runningInstances.get(id) : null;
}
@@ -51,7 +48,11 @@ public enum RunningRelayInstances implements IRunningInstances<EventRelayManager
runningInstances.remove(id);
}
- public List<EventRelayManager> getRunningInstances() {
- return new ArrayList<>(runningInstances.values());
+ public List<EventRelay> getRunningInstances() {
+ return runningInstances.values().stream()
+ .map(Map::values)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ //return new ArrayList<>(runningInstances.values().forEach(e -> e.values()));
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
index 3bc1af6..cc7c0c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
@@ -21,6 +21,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.EventRelay;
+import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DataStreamRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DataStreamRelayResource.java
new file mode 100644
index 0000000..7de261a
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DataStreamRelayResource.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container.rest;
+
+import org.apache.streampipes.container.transform.Transformer;
+import org.apache.streampipes.container.util.Util;
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
+import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.util.HashMap;
+import java.util.Map;
+
+@Path("/api/v2/node/stream/relay")
+public class DataStreamRelayResource extends AbstractNodeContainerResource {
+ private static final Logger LOG = LoggerFactory.getLogger(DataStreamRelayResource.class.getCanonicalName());
+
+ @POST
+ @Path("/invoke")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public String invokeRelaySourceDataStream(SpDataStreamRelayContainer graph) {
+
+ String strategy = graph.getEventRelayStrategy();
+ String runningInstanceId = graph.getRunningStreamRelayInstanceId();
+ TransportProtocol source = graph.getInputGrounding().getTransportProtocol();
+
+ Map<String, EventRelay> eventRelayMap = new HashMap<>();
+
+ graph.getOutputStreamRelays().forEach(r -> {
+ TransportProtocol target = r.getEventGrounding().getTransportProtocol();
+
+ EventRelay eventRelay = new EventRelay(source, target, strategy);
+ eventRelay.start();
+ eventRelayMap.put(r.getElementId(), eventRelay);
+ });
+
+ RunningRelayInstances.INSTANCE.add(graph.getRunningStreamRelayInstanceId(), eventRelayMap);
+
+ return Util.toResponseString(new Response(runningInstanceId,true,""));
+ }
+
+ @DELETE
+ @Path("/detach/{runningInstanceId}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public String detachRelaySourceDataStream(@PathParam("runningInstanceId") String runningInstanceId) {
+
+ Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(runningInstanceId);
+ if (relay != null) {
+ relay.values().forEach(EventRelay::stop);
+ }
+
+ RunningRelayInstances.INSTANCE.remove(runningInstanceId);
+
+ return Util.toResponseString(new Response(runningInstanceId, true, ""));
+ }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
deleted file mode 100644
index 8f99a8f..0000000
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.container.rest;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
-import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
-
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
-
-@Path("/api/v2/relay")
-public class DebugRelayResource extends AbstractNodeContainerResource {
-
- // TODO: Debug-only.
- @POST
- @Path("/start")
- public Response debugRelayEventStream(String msg) throws SpRuntimeException {
- // TODO implement
-
-// System.out.println(msg);
-// EventRelayManager eventRelayManager = new EventRelayManager();
-// eventRelayManager.start();
-// RunningRelayInstances.INSTANCE.add(eventRelayManager.getRelayTopic(), eventRelayManager);
-
- return ok();
- }
-
- @POST
- @Path("/stop")
- public Response debugStopRelayEventStream(String msg) throws SpRuntimeException {
- // TODO implement
-
-// System.out.println(msg);
-// EventRelayManager eventRelayManager = RunningRelayInstances.INSTANCE.get("org.apache.streampipes.flowrate01");
-// assert eventRelayManager != null;
-// eventRelayManager.stop();
-
- return ok();
- }
-}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
index 30d2459..276b8fa 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
@@ -19,7 +19,7 @@ package org.apache.streampipes.node.controller.container.rest;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
-import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
+import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
import org.apache.streampipes.node.controller.container.management.resources.ResourceManager;
@@ -57,7 +57,7 @@ public class InfoStatusResource extends AbstractNodeContainerResource{
List<RelayMetrics> metricsList = RunningRelayInstances.INSTANCE.getRunningInstances()
.stream()
- .map(EventRelayManager::getRelayMetrics)
+ .map(EventRelay::getRelayMetrics)
.collect(Collectors.toList());
try {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
index c899701..5f13a8d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
@@ -19,20 +19,17 @@ package org.apache.streampipes.node.controller.container.rest;
import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.container.transform.Transformer;
-import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.SpDataStreamRelay;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.node.PipelineElementDockerContainer;
import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
-import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
+import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,10 +37,12 @@ import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
@Path("/api/v2/node/container")
-public class InvocableEntityResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
+public class InvocableEntityResource extends AbstractNodeContainerResource {
private static final Logger LOG = LoggerFactory.getLogger(InvocableEntityResource.class.getCanonicalName());
@GET
@@ -61,19 +60,8 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
@POST
@Path("/register")
- public void register(String body) {
- try {
- InvocableRegistration invocableRegistration = JacksonSerializer
- .getObjectMapper()
- .readValue(body, InvocableRegistration.class);
-
- // register pipeline elements at consul and node controller
- InvocableElementManager.getInstance().register(invocableRegistration);
- LOG.info("Sucessfully registered pipeline element container");
-
- } catch (IOException e) {
- LOG.error("Could not register pipeline element container - " + e.toString());
- }
+ public void register(InvocableRegistration registration) {
+ InvocableElementManager.getInstance().register(registration);
}
@POST
@@ -83,30 +71,29 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
public String invoke(@PathParam("identifier") String identifier,
@PathParam("elementId") String elementId, String payload) {
- // TODO implement
String endpoint;
InvocableStreamPipesEntity graph;
try {
if (identifier.equals("sepa")) {
graph = Transformer.fromJsonLd(DataProcessorInvocation.class, payload);
-
endpoint = graph.getBelongsTo();
-
TransportProtocol source = ((DataProcessorInvocation) graph)
.getOutputStream()
.getEventGrounding()
.getTransportProtocol();
- String relayStrategy = ((DataProcessorInvocation) graph).getEventRelayStrategy();
+ String strategy = ((DataProcessorInvocation) graph).getEventRelayStrategy();
+ List<SpDataStreamRelay> dataStreamRelays = ((DataProcessorInvocation) graph).getOutputStreamRelays();
- ((DataProcessorInvocation) graph).getOutputStreamRelays().forEach(r -> {
+ Map<String, EventRelay> eventRelayMap = new HashMap<>();
+ dataStreamRelays.forEach(r -> {
TransportProtocol target = r.getEventGrounding().getTransportProtocol();
- EventRelayManager relayManager = new EventRelayManager(source, target, relayStrategy);
- relayManager.start();
- RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(),
- relayManager);
+ EventRelay eventRelay = new EventRelay(source, target, strategy);
+ eventRelay.start();
+ eventRelayMap.put(r.getElementId(), eventRelay);
});
+ RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), eventRelayMap);
org.apache.streampipes.model.Response resp = InvocableElementManager.getInstance().invoke(endpoint,
payload);
@@ -123,7 +110,14 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
graph = Transformer.fromJsonLd(DataSinkInvocation.class, payload);
endpoint = graph.getBelongsTo();
- InvocableElementManager.getInstance().invoke(endpoint, payload);
+ org.apache.streampipes.model.Response resp = InvocableElementManager.getInstance()
+ .invoke(endpoint, payload);
+
+ if (resp.isSuccess()) {
+ RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
+ }
+
+ return resp.toString();
}
} catch (IOException e) {
@@ -141,15 +135,13 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
// TODO store host and port locally to retrieve by runningInstanceId
-
String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
String resp = InvocableElementManager.getInstance().detach(endpoint + "/" + runningInstanceId);
// Stop relay for invocable if existing
- // TODO: maybe use unique identifier for retrieving relay
- EventRelayManager relay = RunningRelayInstances.INSTANCE.get(runningInstanceId);
+ Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(runningInstanceId);
if (relay != null) {
- relay.stop();
+ relay.values().forEach(EventRelay::stop);
}
RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
@@ -165,5 +157,4 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
InvocableElementManager.getInstance().unregister();
return ok(DockerContainerOrchestrator.getInstance().remove(container));
}
-
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
index a18816c..8e1e117 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
@@ -27,8 +27,6 @@ public class NodeControllerResourceConfig extends ResourceConfig {
register(HealthCheckResource.class);
register(InfoStatusResource.class);
register(InvocableEntityResource.class);
-
- // TODO remove later - only for local relay tests
- register(DebugRelayResource.class);
+ register(DataStreamRelayResource.class);
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 173e0b0..edbb551 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.manager.execution.http;
+import org.apache.streampipes.model.SpDataStreamRelayContainer;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
public class GraphSubmitter {
@@ -37,18 +39,23 @@ public class GraphSubmitter {
private final List<SpDataSet> dataSets;
private final String pipelineId;
private final String pipelineName;
+ private final List<SpDataStreamRelayContainer> streamRelays;
public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
- List<SpDataSet> dataSets) {
+ List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> streamRelays) {
this.graphs = graphs;
this.pipelineId = pipelineId;
this.pipelineName = pipelineName;
this.dataSets = dataSets;
+ this.streamRelays = streamRelays;
}
public PipelineOperationStatus invokeGraphs() {
PipelineOperationStatus status = initPipelineOperationStatus();
+ if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
+ streamRelays.forEach(streamRelay -> invoke(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+ }
graphs.forEach(graph -> invoke(new InvocableEntityUrlGenerator(graph), graph, status));
// only invoke datasets when following pipeline elements are started
if (allInvocableEntitiesRunning(status)) {
@@ -67,6 +74,9 @@ public class GraphSubmitter {
graphs.forEach(graph -> detach(new InvocableEntityUrlGenerator(graph), graph, status));
dataSets.forEach(dataset -> detach(new DataSetEntityUrlGenerator(dataset), dataset, status));
+ if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
+ streamRelays.forEach(streamRelay -> detach(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+ }
return verifyPipelineOperationStatus(
status,
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index 6ab296a..f6706eb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -18,14 +18,17 @@
package org.apache.streampipes.manager.execution.http;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
import org.apache.streampipes.commons.Utils;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,12 +50,18 @@ public class HttpRequestBuilder {
}
public PipelineElementStatus invoke() {
- LOG.info("Invoking element: " + endpointUrl);
try {
- String jsonLd = jsonLd();
+ String json;
+ if (payload instanceof InvocableStreamPipesEntity) {
+ LOG.info("Invoking pipeline element: " + endpointUrl);
+ json = jsonLd();
+ } else {
+ LOG.info("Invoking data stream relay: " + endpointUrl);
+ json = jackson();
+ }
Response httpResp = Request
.Post(endpointUrl)
- .bodyString(jsonLd, ContentType.APPLICATION_JSON)
+ .bodyString(json, ContentType.APPLICATION_JSON)
.connectTimeout(CONNECT_TIMEOUT)
.execute();
return handleResponse(httpResp);
@@ -63,6 +72,12 @@ public class HttpRequestBuilder {
}
public PipelineElementStatus detach() {
+ if (payload instanceof InvocableStreamPipesEntity) {
+ LOG.info("Detaching pipeline element: " + endpointUrl);
+ } else {
+ LOG.info("Detaching data stream relay: " + endpointUrl);
+ }
+
try {
Response httpResp = Request
.Delete(endpointUrl)
@@ -88,4 +103,8 @@ public class HttpRequestBuilder {
private PipelineElementStatus convert(org.apache.streampipes.model.Response response) {
return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(), response.getOptionalMessage());
}
+
+ private String jackson() throws JsonProcessingException {
+ return JacksonSerializer.getObjectMapper().writeValueAsString(payload);
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index ce16327..9172398 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -18,6 +18,10 @@
package org.apache.streampipes.manager.execution.http;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.grounding.EventGrounding;
import org.lightcouch.DocumentConflictException;
import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
import org.apache.streampipes.manager.execution.status.SepMonitoringManager;
@@ -36,10 +40,7 @@ import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.user.management.encryption.CredentialsManager;
import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
import java.util.stream.Collectors;
public class PipelineExecutor {
@@ -76,15 +77,24 @@ public class PipelineExecutor {
graphs.forEach(g -> g.setStreamRequirements(Arrays.asList()));
- PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), decryptedGraphs, dataSets)
- .invokeGraphs();
+ List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(decryptedGraphs);
+
+ PipelineOperationStatus status = new GraphSubmitter(
+ pipeline.getPipelineId(),
+ pipeline.getName(),
+ decryptedGraphs,
+ dataSets,
+ dataStreamRelayContainers).invokeGraphs();
if (status.isSuccess()) {
storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
- PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
- new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description()));
+ PipelineStatusManager.addPipelineStatus(
+ pipeline.getPipelineId(),
+ new PipelineStatusMessage(pipeline.getPipelineId(),
+ System.currentTimeMillis(),
+ PipelineStatusMessageType.PIPELINE_STARTED.title(),
+ PipelineStatusMessageType.PIPELINE_STARTED.description()));
if (monitor) {
SepMonitoringManager.addObserver(pipeline.getPipelineId());
@@ -97,40 +107,18 @@ public class PipelineExecutor {
return status;
}
- private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
- List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
- graphs.stream().map(g -> {
- if (g instanceof DataProcessorInvocation) {
- return new DataProcessorInvocation((DataProcessorInvocation) g);
- } else {
- return new DataSinkInvocation((DataSinkInvocation) g);
- }
- }).forEach(g -> {
- g.getStaticProperties()
- .stream()
- .filter(SecretStaticProperty.class::isInstance)
- .forEach(sp -> {
- try {
- String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
- ((SecretStaticProperty) sp).getValue());
- ((SecretStaticProperty) sp).setValue(decrypted);
- ((SecretStaticProperty) sp).setEncrypted(false);
- } catch (GeneralSecurityException e) {
- e.printStackTrace();
- }
- });
- decryptedGraphs.add(g);
- });
- return decryptedGraphs;
- }
-
public PipelineOperationStatus stopPipeline() {
List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
- PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), graphs, dataSets)
- .detachGraphs();
+ List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(graphs);
+
+ PipelineOperationStatus status = new GraphSubmitter(
+ pipeline.getPipelineId(),
+ pipeline.getName(),
+ graphs,
+ dataSets,
+ dataStreamRelayContainers).detachGraphs();
if (status.isSuccess()) {
if (visualize) {
@@ -158,6 +146,82 @@ public class PipelineExecutor {
return status;
}
+ private String extractTopic(EventGrounding eg) {
+ return eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
+ }
+
+ private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
+ Set<String> topicSet = new HashSet<>();
+ List<SpDataStreamRelayContainer> dsRelayContainers = new ArrayList<>();
+ SpDataStreamRelayContainer dsRelayContainer = new SpDataStreamRelayContainer();
+ List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+
+ graphs.stream()
+ .filter(g -> pipeline.getStreams().stream()
+ .filter(ds -> topicSet.add(extractTopic(ds.getEventGrounding())))
+ .anyMatch(ds -> (!matchingDeploymentTargets(ds, g) && connected(ds, g))))
+ .forEach(g -> pipeline.getStreams()
+ .forEach(ds -> {
+ dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
+ // TODO: retrieve relay strategy from somewhere, e.g. make it accessible on pipeline level
+ dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
+ dsRelayContainer.setName(ds.getName() + " (Data Stream Relay)");
+ dsRelayContainer.setInputGrounding(new EventGrounding(ds.getEventGrounding()));
+ dsRelayContainer.setDeploymentTargetNodeId(ds.getDeploymentTargetNodeId());
+ dsRelayContainer.setDeploymentTargetNodeHostname(ds.getDeploymentTargetNodeHostname());
+ dsRelayContainer.setDeploymentTargetNodePort(ds.getDeploymentTargetNodePort());
+
+ dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(g.getInputStreams()
+ .get(getIndex(ds.getDOM(), g))
+ .getEventGrounding())));
+ })
+ );
+
+ dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
+ dsRelayContainers.add(dsRelayContainer);
+
+ return dsRelayContainers;
+ }
+
+ private boolean matchingDeploymentTargets(SpDataStream source, InvocableStreamPipesEntity target) {
+ return source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+ }
+
+ private boolean connected(SpDataStream source, InvocableStreamPipesEntity target) {
+ int index = getIndex(source.getDOM(), target);
+ if (index != -1) {
+ return target.getConnectedTo().get(index).equals(source.getDOM());
+ }
+ return false;
+ }
+
+ private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+ List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
+ graphs.stream().map(g -> {
+ if (g instanceof DataProcessorInvocation) {
+ return new DataProcessorInvocation((DataProcessorInvocation) g);
+ } else {
+ return new DataSinkInvocation((DataSinkInvocation) g);
+ }
+ }).forEach(g -> {
+ g.getStaticProperties()
+ .stream()
+ .filter(SecretStaticProperty.class::isInstance)
+ .forEach(sp -> {
+ try {
+ String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
+ ((SecretStaticProperty) sp).getValue());
+ ((SecretStaticProperty) sp).setValue(decrypted);
+ ((SecretStaticProperty) sp).setEncrypted(false);
+ } catch (GeneralSecurityException e) {
+ e.printStackTrace();
+ }
+ });
+ decryptedGraphs.add(g);
+ });
+ return decryptedGraphs;
+ }
+
private void setPipelineStarted(Pipeline pipeline) {
pipeline.setRunning(true);
pipeline.setStartedAt(new Date().getTime());
@@ -183,4 +247,8 @@ public class PipelineExecutor {
return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
}
+ private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
+ return targetElement.getConnectedTo().indexOf(sourceDomId);
+ }
+
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
new file mode 100644
index 0000000..e62ff41
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+import org.apache.streampipes.model.SpDataStreamRelayContainer;
+
+public class StreamRelayEndpointUrlGenerator extends EndpointUrlGenerator<SpDataStreamRelayContainer> {
+
+ private static final String BASE_ROUTE = "api/v2/node/stream/relay";
+ private static final String INVOKE_ROUTE = "/invoke";
+ private static final String DETACH_ROUTE = "/detach";
+
+ private final SpDataStreamRelayContainer streamRelay;
+
+ public StreamRelayEndpointUrlGenerator(SpDataStreamRelayContainer streamRelay) {
+ super(streamRelay);
+ this.streamRelay = streamRelay;
+ }
+
+ @Override
+ public String generateInvokeEndpoint() {
+ return generateEndpoint(INVOKE_ROUTE);
+ }
+
+ @Override
+ public String generateDetachEndpoint() {
+ return generateEndpoint(DETACH_ROUTE)
+ + SLASH
+ + streamRelay.getRunningStreamRelayInstanceId();
+ }
+
+ private String generateEndpoint(String path) {
+ return HTTP_PROTOCOL
+ + streamRelay.getDeploymentTargetNodeHostname()
+ + COLON
+ + streamRelay.getDeploymentTargetNodePort()
+ + SLASH
+ + BASE_ROUTE
+ + path;
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index 2375543..e4c61ac 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -18,7 +18,9 @@
package org.apache.streampipes.manager.matching;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpEdgeNodeProtocol;
import org.apache.streampipes.config.backend.SpProtocol;
import org.apache.streampipes.container.util.ConsulUtil;
import org.apache.streampipes.manager.data.PipelineGraph;
@@ -30,6 +32,7 @@ import org.apache.streampipes.model.SpDataStreamRelay;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
import org.apache.streampipes.model.output.OutputStrategy;
@@ -41,6 +44,7 @@ import java.util.stream.Collectors;
public class InvocationGraphBuilder {
+ private static final String DEFAULT_TAG = "default";
private final PipelineGraph pipelineGraph;
private final String pipelineId;
private Integer uniquePeIndex = 0;
@@ -62,7 +66,6 @@ public class InvocationGraphBuilder {
}
private void configure(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
-
EventGrounding inputGrounding = new GroundingBuilder(source, targets).getEventGrounding();
// set output stream event grounding for source data processors
@@ -83,163 +86,117 @@ public class InvocationGraphBuilder {
// set input stream event grounding for target element data processors and sinks
targets.forEach(t -> {
// check if source and target share same node
- if (source instanceof InvocableStreamPipesEntity) {
- if (((InvocableStreamPipesEntity) source).getDeploymentTargetNodeId() != null ||
- t.getDeploymentTargetNodeId() != null) {
+ if (source instanceof InvocableStreamPipesEntity && deploymentTargetNotNull(source, t)) {
- if (matchingDeploymentTarget((InvocableStreamPipesEntity) source, t)) {
- // both PE on same node - share grounding
+ if (matchingDeploymentTargets(source, t)) {
+ // both processor on same node - share grounding
t.getInputStreams()
.get(getIndex(source.getDOM(), t))
.setEventGrounding(inputGrounding);
- } else {
- // check if target runs on cloud or edge node
- if (t.getDeploymentTargetNodeId().equals("default")) {
- // target runs on cloud node: use central cloud broker, e.g. kafka
- // TODO: set event relay to true
- // TODO: add cloud broker to List<EventRelays>
- if (source instanceof DataProcessorInvocation) {
-
- String relayTopic = inputGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName();
-
- if (relayNotExists(relayTopic, source)) {
- // TODO: use prioritized cloud transport protocol instead of kafka
- SpProtocol prioritizedProtocol =
- BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
-
- EventGrounding relayEventGrounding = new EventGrounding();
-
- if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
- JmsTransportProtocol tp = new JmsTransportProtocol(
- BackendConfig.INSTANCE.getJmsHost(),
- BackendConfig.INSTANCE.getJmsPort(),
- relayTopic);
- relayEventGrounding.setTransportProtocol(tp);
- }
- else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)) {
- KafkaTransportProtocol tp = new KafkaTransportProtocol(
- BackendConfig.INSTANCE.getKafkaHost(),
- BackendConfig.INSTANCE.getKafkaPort(),
- relayTopic,
- BackendConfig.INSTANCE.getZookeeperHost(),
- BackendConfig.INSTANCE.getZookeeperPort());
- relayEventGrounding.setTransportProtocol(tp);
- }
- else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)){
- MqttTransportProtocol tp = new MqttTransportProtocol(
- BackendConfig.INSTANCE.getMqttHost(),
- BackendConfig.INSTANCE.getMqttPort(),
- relayTopic);
- relayEventGrounding.setTransportProtocol(tp);
- }
-
- relayEventGrounding.setTransportFormats(inputGrounding.getTransportFormats());
-
- // TODO: when modifying pipelines new relay are added to old ones. Should initialize new ArrayList
- // graphExists()
- if(!graphExists(t.getDOM())) {
- ((DataProcessorInvocation) source)
- .addOutputStreamRelay(new SpDataStreamRelay(relayEventGrounding));
- } else {
- ((DataProcessorInvocation) source)
- .getOutputStreamRelays()
- .get(getIndex(source.getDOM(), t))
- .getEventGrounding()
- .getTransportProtocol()
- .setTopicDefinition(inputGrounding.getTransportProtocol().getTopicDefinition());
- }
-
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .getEventGrounding()
- .getTransportProtocol()
- .setTopicDefinition(inputGrounding.getTransportProtocol().getTopicDefinition());
-
- } else {
- // split in the end
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .getEventGrounding()
- .getTransportProtocol()
- .setTopicDefinition(inputGrounding.getTransportProtocol().getTopicDefinition());
- }
+ }
+ else if (defaultDeploymentTarget(t) && source instanceof DataProcessorInvocation) {
+ // target runs on cloud node: use central cloud broker, e.g. kafka
+
+ if (!eventRelayExists(source, t)) {
+
+ if(!graphExists(t.getDOM())) {
+ // add initial relay grounding to source processor
+ ((DataProcessorInvocation) source).addOutputStreamRelay(
+ new SpDataStreamRelay(generateRelayGrounding(inputGrounding,false)));
+ } else {
+ // modify relay topic of existing relay grounding
+ modifyTopicForEventRelay(source, t, extractTopic(inputGrounding));
}
-
+ modifyTopicForTargetInputStream(source, t, extractTopic(inputGrounding));
} else {
- // target runs on edge node: use target edge node broker
- // TODO: set event relay to true
- // TODO: add target edge node broker to List<EventRelays>
-
- String relayTopic = inputGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName();
-
- if (relayNotExists(relayTopic, source)) {
-
- EventGrounding relayEventGrounding = new EventGrounding();
-
- relayEventGrounding.setTransportProtocol(
- new MqttTransportProtocol(
- getTargetNodeBrokerHost(t),
- getTargetNodeBrokerPort(t),
- relayTopic
- ));
-
- relayEventGrounding.setTransportFormats(inputGrounding.getTransportFormats());
-
- // TODO: when modifying pipelines new relay are added to old ones. Should initialize new ArrayList
- if(!graphExists(t.getDOM())) {
- ((DataProcessorInvocation) source)
- .addOutputStreamRelay(new SpDataStreamRelay(relayEventGrounding));
- } else {
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .setEventGrounding(relayEventGrounding);
- }
+ EventGrounding updatedRelayGrounding = generateRelayGrounding(inputGrounding, false);
+ removeExistingStreamRelay(source, t);
+ ((DataProcessorInvocation) source).addOutputStreamRelay(new SpDataStreamRelay(updatedRelayGrounding));
+ modifyTopicForTargetInputStream(source, t, extractTopic(inputGrounding));
+ }
+ }
+ else {
+ // target runs on other edge node: use target edge node broker
+ //if (!eventRelayExists(inputGrounding, source)) {
+ if (!eventRelayExists(source ,t)) {
+
+ EventGrounding relayGrounding = generateRelayGrounding(inputGrounding, t,true);
+ if(!graphExists(t.getDOM())) {
+ ((DataProcessorInvocation) source).addOutputStreamRelay(new SpDataStreamRelay(relayGrounding));
+ } else {
t.getInputStreams()
.get(getIndex(source.getDOM(), t))
- .setEventGrounding(((DataProcessorInvocation) source)
- .getOutputStreamRelays()
- .get(getIndex(source.getDOM(), t))
- .getEventGrounding());
-
+ .setEventGrounding(relayGrounding);
}
-// t.getInputStreams()
-// .get(getIndex(source.getDOM(), t))
-// .setEventGrounding(((DataProcessorInvocation) source)
-// .getOutputStreamRelays()
-// .get(getIndex(source.getDOM(), t))
-// .getEventGrounding());
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(((DataProcessorInvocation) source)
+ .getOutputStreamRelays()
+ .get(getIndex(source.getDOM(), t))
+ .getEventGrounding());
+ } else {
+ EventGrounding updatedRelayGrounding = generateRelayGrounding(inputGrounding, t,true);
+ removeExistingStreamRelay(source, t);
+ ((DataProcessorInvocation) source).addOutputStreamRelay(new SpDataStreamRelay(updatedRelayGrounding));
+
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(updatedRelayGrounding);
}
}
+// t.getInputStreams()
+// .get(getIndex(source.getDOM(), t))
+// .setEventGrounding(inputGrounding);
} else {
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .setEventGrounding(inputGrounding);
- }
- } else {
// TODO: Handle following edge situation:
// data stream -> invocable (processor, sink) in edge deployments that do not reside on same node
// idea: trigger corresponding node controller to relay topic to adjecent broker (either node broker or
// global cloud broker)
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .setEventGrounding(inputGrounding);
+
+ if (matchingDeploymentTargets(source, t)) {
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(inputGrounding);
+
+ } else if (defaultDeploymentTarget(t)) {
+ // target runs on cloud node: use central cloud broker, e.g. kafka
+ EventGrounding eg = generateRelayGrounding(inputGrounding,false);
+ // TODO: make topic unique for target in case we have multiple source stream relays to the target
+ String oldTopic = eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
+ eg.getTransportProtocol().getTopicDefinition().setActualTopicName(oldTopic + "."
+ + this.pipelineId);
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(),t))
+ .setEventGrounding(eg);
+ } else if (targetInvocableOnEdgeNode(t)) {
+ // case 2: target on other edge node -> relay + target node broker
+ // TODO: make topic unique for target in case we have multiple source stream relays to the target
+ EventGrounding eg = generateRelayGrounding(inputGrounding,t,true);
+ String oldTopic = eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
+ eg.getTransportProtocol().getTopicDefinition().setActualTopicName(oldTopic + "."
+ + this.pipelineId);
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(eg);
+ } else {
+ // default case while modelling. no deployment target known
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(inputGrounding);
+ }
}
- // old
-// t.getInputStreams()
-// .get(getIndex(source.getDOM(), t))
-// .setEventGrounding(inputGrounding);
t.getInputStreams()
.get(getIndex(source.getDOM(), t))
.setEventSchema(getInputSchema(source));
- String elementIdentifier = makeElementIdentifier(pipelineId, inputGrounding
- .getTransportProtocol().getTopicDefinition().getActualTopicName(), t.getName());
+ String elementIdentifier = makeElementIdentifier(pipelineId,
+ inputGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName(), t.getName());
t.setElementId(t.getBelongsTo() + "/" + elementIdentifier);
t.setDeploymentRunningInstanceId(elementIdentifier);
@@ -253,14 +210,71 @@ public class InvocationGraphBuilder {
});
}
- private boolean relayNotExists(String relayTopic, NamedStreamPipesEntity source) {
- return ((DataProcessorInvocation) source)
+ private void removeExistingStreamRelay(NamedStreamPipesEntity source, InvocableStreamPipesEntity t) {
+ ((DataProcessorInvocation) source).removeOutputStreamRelay(
+ ((DataProcessorInvocation) source)
+ .getOutputStreamRelays()
+ .get(getIndex(source.getDOM(), t)));
+ }
+
+ private boolean targetInvocableOnEdgeNode(InvocableStreamPipesEntity t) {
+ return t.getDeploymentTargetNodeId() != null && !t.getDeploymentTargetNodeId().equals(DEFAULT_TAG);
+ }
+
+ private boolean defaultDeploymentTarget(InvocableStreamPipesEntity t) {
+ return t.getDeploymentTargetNodeId() != null && t.getDeploymentTargetNodeId().equals(DEFAULT_TAG);
+ }
+
+ private void modifyTopicForTargetInputStream(NamedStreamPipesEntity s, InvocableStreamPipesEntity t,
+ String topic) {
+ t.getInputStreams()
+ .get(getIndex(s.getDOM(), t))
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .setActualTopicName(topic);
+ }
+
+ private void modifyTopicForEventRelay(NamedStreamPipesEntity s, InvocableStreamPipesEntity t,
+ String topic) {
+ ((DataProcessorInvocation) s)
+ .getOutputStreamRelays()
+ .get(getIndex(s.getDOM(), t))
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .setActualTopicName(topic);
+ }
+
+ private boolean deploymentTargetNotNull(NamedStreamPipesEntity s, InvocableStreamPipesEntity t) {
+ if (s instanceof SpDataStream) {
+ return ((SpDataStream) s).getDeploymentTargetNodeId() != null && t.getDeploymentTargetNodeId() != null;
+ } else {
+ return ((InvocableStreamPipesEntity) s).getDeploymentTargetNodeId() != null &&
+ t.getDeploymentTargetNodeId() != null;
+ }
+ }
+
+ private String extractTopic(EventGrounding eg) {
+ return eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
+ }
+
+ private boolean eventRelayExists(EventGrounding eg, NamedStreamPipesEntity s) {
+ return ((DataProcessorInvocation) s)
.getOutputStreamRelays()
.stream()
- .noneMatch(r -> r.getEventGrounding()
+ .anyMatch(r -> r.getEventGrounding()
.getTransportProtocol()
.getTopicDefinition()
- .getActualTopicName().equals(relayTopic));
+ .getActualTopicName().equals(extractTopic(eg)));
+ }
+
+ private boolean eventRelayExists(NamedStreamPipesEntity s, InvocableStreamPipesEntity t) {
+ int idx = getIndex(s.getDOM(), t);
+ return ((DataProcessorInvocation) s).getOutputStreamRelays()
+ .stream()
+ .anyMatch(i -> extractTopic(i.getEventGrounding()).equals(
+ extractTopic(t.getInputStreams().get(idx).getEventGrounding())));
}
private Tuple2<EventSchema,? extends OutputStrategy> getOutputSettings(DataProcessorInvocation dataProcessorInvocation) {
@@ -311,11 +325,17 @@ public class InvocationGraphBuilder {
}
- private boolean matchingDeploymentTarget(InvocableStreamPipesEntity source, InvocableStreamPipesEntity target) {
- if (source instanceof DataProcessorInvocation && target instanceof DataProcessorInvocation) {
- if (source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId())) {
- return true;
- }
+ private boolean matchingDeploymentTargets(NamedStreamPipesEntity s, InvocableStreamPipesEntity t) {
+ if (s instanceof DataProcessorInvocation &&
+ (t instanceof DataProcessorInvocation || t instanceof DataSinkInvocation) && deploymentTargetNotNull(s,t)) {
+ return ((DataProcessorInvocation) s)
+ .getDeploymentTargetNodeId()
+ .equals(t.getDeploymentTargetNodeId());
+ } else if (s instanceof SpDataStream &&
+ (t instanceof DataProcessorInvocation || t instanceof DataSinkInvocation) && deploymentTargetNotNull(s,t)) {
+ return ((SpDataStream) s)
+ .getDeploymentTargetNodeId()
+ .equals(t.getDeploymentTargetNodeId());
}
return false;
}
@@ -379,8 +399,83 @@ public class InvocationGraphBuilder {
.get();
}
- public static Boolean isPrioritized(SpProtocol prioritizedProtocol,
- Class<?> protocolClass) {
- return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName());
+ private EventGrounding generateRelayGrounding(EventGrounding sourceInvocableOutputGrounding,
+ boolean edgeToEdgeRelay) {
+ return generateRelayGrounding(sourceInvocableOutputGrounding, null, edgeToEdgeRelay);
+ }
+
+ private EventGrounding generateRelayGrounding(EventGrounding sourceInvocableOutputGrounding,
+ InvocableStreamPipesEntity target, boolean edgeToEdgeRelay) {
+ EventGrounding eg = new EventGrounding();
+ String topic = extractTopic(sourceInvocableOutputGrounding);
+ if (edgeToEdgeRelay) {
+ eg.setTransportProtocol(getTargetNodeProtocol(
+ BackendConfig.INSTANCE.getMessagingSettings().getEdgeNodeProtocol(), topic, target));
+ } else {
+ eg.setTransportProtocol(getPrioritizedGlobalProtocol(
+ BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0), topic));
+ }
+ eg.setTransportFormats(sourceInvocableOutputGrounding.getTransportFormats());
+ return eg;
+ }
+
+ private TransportProtocol getPrioritizedGlobalProtocol(SpProtocol p, String topic) {
+ if (matches(p, JmsTransportProtocol.class)) {
+ return jmsTransportProtocol(topic);
+ } else if (matches(p, KafkaTransportProtocol.class)) {
+ return kafkaTransportProtocol(topic);
+ } else if (matches(p, MqttTransportProtocol.class)){
+ return mqttTransportProtocol(topic);
+ }
+ throw new SpRuntimeException("Could not retrieve prioritized transport protocol");
+ }
+
+ private TransportProtocol getTargetNodeProtocol(SpEdgeNodeProtocol p, String topic,
+ InvocableStreamPipesEntity target) {
+ if (matches(p, MqttTransportProtocol.class)){
+ return mqttTransportProtocol(topic, target);
+ }
+ throw new SpRuntimeException("Could not retrieve prioritized transport protocol");
+ }
+
+ private JmsTransportProtocol jmsTransportProtocol(String topic) {
+ return new JmsTransportProtocol(
+ BackendConfig.INSTANCE.getJmsHost(),
+ BackendConfig.INSTANCE.getJmsPort(),
+ topic);
+ }
+
+ private KafkaTransportProtocol kafkaTransportProtocol(String topic) {
+ return new KafkaTransportProtocol(
+ BackendConfig.INSTANCE.getKafkaHost(),
+ BackendConfig.INSTANCE.getKafkaPort(),
+ topic,
+ BackendConfig.INSTANCE.getZookeeperHost(),
+ BackendConfig.INSTANCE.getZookeeperPort());
+ }
+
+ private MqttTransportProtocol mqttTransportProtocol(String topic) {
+ return mqttTransportProtocol(topic, null);
+ }
+
+ private MqttTransportProtocol mqttTransportProtocol(String topic, InvocableStreamPipesEntity target) {
+ if (target != null) {
+ return new MqttTransportProtocol(
+ getTargetNodeBrokerHost(target),
+ getTargetNodeBrokerPort(target),
+ topic);
+ }
+ return new MqttTransportProtocol(
+ BackendConfig.INSTANCE.getMqttHost(),
+ BackendConfig.INSTANCE.getMqttPort(),
+ topic);
+ }
+
+ private <T extends TransportProtocol> boolean matches(SpProtocol p, Class<T> clazz) {
+ return p.getProtocolClass().equals(clazz.getCanonicalName());
+ }
+
+ private <T extends TransportProtocol> boolean matches(SpEdgeNodeProtocol p, Class<T> clazz) {
+ return p.getProtocolClass().equals(clazz.getCanonicalName());
}
}
\ No newline at end of file
diff --git a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index 4b6f1e7..7d0c404 100644
--- a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -184,7 +184,8 @@ public class CustomAnnotationProvider implements EmpireAnnotationProvider {
DataExplorerWidgetModel.class,
StreamPipesJsonLdContainer.class,
DataLakeMeasure.class,
- SpDataStreamRelay.class
+ SpDataStreamRelay.class,
+ SpDataStreamRelayContainer.class
);
}
}
diff --git a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
index 5d2be10..6a6d059 100644
--- a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
+++ b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
@@ -69,7 +69,8 @@ public class JsonLdTransformer implements RdfTransformer {
StreamPipes.CONNECT_WORKER_CONTAINER,
StreamPipes.DASHBOARD_WIDGET_MODEL,
StreamPipes.DASHBOARD_MODEL,
- StreamPipes.DATA_EXPLORER_WIDGET_MODEL
+ StreamPipes.DATA_EXPLORER_WIDGET_MODEL,
+ StreamPipes.DATA_STREAM_RELAY_CONTAINER
);
private List<String> selectedRootElements;
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index c91b49c..d7efdfc 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -421,4 +421,6 @@ public class StreamPipes {
public static final String HAS_REQUIRED_FILETYPES = NS + "hasRequiredFiletypes" ;
public static final String HAS_EVENT_RELAY = NS + "hasEventRelay";
public static final String HAS_EVENT_RELAY_STRATEGY = NS + "hasEventRelayStrategy";
+ public static final String DATA_STREAM_RELAY_CONTAINER = NS + "DataStreamRelayContainer" ;
+ public static final String DATA_STREAM_RELAY_RUNNING_INSTANCE_ID = NS + "hasDataStreamRelayRunningInstanceId";
}
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index e87a2d9..d831e9b 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-12-12 16:11:38.
+// Generated using typescript-generator version 2.24.612 on 2020-12-19 20:40:34.
export class AbstractStreamPipesEntity {
"@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
@@ -2254,6 +2254,7 @@ export class Pipeline extends ElementComposition {
actions: DataSinkInvocation[];
createdAt: number;
createdByUser: string;
+ eventRelayStrategy: string;
pipelineCategories: string[];
publicElement: boolean;
running: boolean;
@@ -2271,6 +2272,7 @@ export class Pipeline extends ElementComposition {
instance.createdAt = data.createdAt;
instance.publicElement = data.publicElement;
instance.createdByUser = data.createdByUser;
+ instance.eventRelayStrategy = data.eventRelayStrategy;
instance.pipelineCategories = __getCopyArrayFn(__identity<string>())(data.pipelineCategories);
instance._id = data._id;
instance._rev = data._rev;
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
index 0db73ad..77588b6 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
@@ -154,52 +154,6 @@
</div>
</div>
-<!-- <mat-divider *ngIf="advancedSettings" style="margin: 1em 0 1em 0;"></mat-divider>-->
-<!-- <div *ngIf="advancedSettings">-->
-<!-- <b>Node Selection</b>-->
-<!-- <div *ngFor="let processors of pipeline.sepas">-->
-<!-- <div fxFlex="100" fxLayout="row">-->
-<!-- <div fxFlex="50" fxLayout="row" fxLayoutAlign="left center">-->
-<!-- <span>{{processors.name}}</span>-->
-<!-- </div>-->
-<!-- </div>-->
-<!-- <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">-->
-<!-- <mat-form-field appearance="outline">-->
-<!-- <mat-select [(ngModel)]="processors.deploymentTargetNodeId"-->
-<!-- [disabled]="disableNodeSelection.value"-->
-<!-- style="margin: 0;height: 20px;"-->
-<!-- placeholder="select execution node"-->
-<!-- required>-->
-<!-- <mat-option [value]="nodeInfo.nodeControllerId"-->
-<!-- *ngFor="let nodeInfo of deploymentOptions[processors.appId]">-->
-<!-- <em>{{nodeInfo.nodeMetadata.nodeAddress}}</em>-->
-<!-- </mat-option>-->
-<!-- </mat-select>-->
-<!-- </mat-form-field>-->
-<!-- </div>-->
-<!-- </div>-->
-<!-- <div *ngFor="let sinks of pipeline.actions">-->
-<!-- <div fxFlex="100" fxLayout="row">-->
-<!-- <div fxFlex="50" fxLayout="row" fxLayoutAlign="left center">-->
-<!-- <span>{{sinks.name}}</span>-->
-<!-- </div>-->
-<!-- </div>-->
-<!-- <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">-->
-<!-- <mat-form-field appearance="outline">-->
-<!-- <mat-select [(ngModel)]="sinks.deploymentTargetNodeId"-->
-<!-- [disabled]="disableNodeSelection.value"-->
-<!-- style="margin: 0; height: 20px;"-->
-<!-- placeholder="select execution node"-->
-<!-- required>-->
-<!-- <mat-option [value]="nodeInfo.nodeControllerId" *ngFor="let nodeInfo of-->
-<!-- deploymentOptions[sinks.appId]">-->
-<!-- <em>{{nodeInfo.nodeMetadata.nodeAddress}}</em>-->
-<!-- </mat-option>-->
-<!-- </mat-select>-->
-<!-- </mat-form-field>-->
-<!-- </div>-->
-<!-- </div>-->
-<!-- </div>-->
</div>
<div fxFlex="100" fxLayout="column" fxLayoutAlign="center center" *ngIf="saving">
<mat-spinner [mode]="'indeterminate'" [diameter]="50"></mat-spinner>
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index f782519..6d5a748 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -240,12 +240,14 @@ export class SavePipelineComponent implements OnInit {
if (this.currentModifiedPipelineId && this.updateMode === 'update') {
this.modifyPipelineElementsDeployments(this.tmpPipeline.sepas)
this.modifyPipelineElementsDeployments(this.tmpPipeline.actions)
+ this.tmpPipeline.eventRelayStrategy = this.selectedRelayStrategyVal;
this.pipeline = this.tmpPipeline;
storageRequest = this.pipelineService.updatePipeline(this.pipeline);
} else {
this.pipeline._id = undefined;
this.modifyPipelineElementsDeployments(this.tmpPipeline.sepas)
this.modifyPipelineElementsDeployments(this.tmpPipeline.actions)
+ this.tmpPipeline.eventRelayStrategy = this.selectedRelayStrategyVal;
this.pipeline = this.tmpPipeline;
storageRequest = this.pipelineService.storePipeline(this.pipeline);
}