You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/04 00:43:06 UTC
[incubator-streampipes] 01/02: [WIP] refactoring pipeline element
migration and node controller sync on rejoin
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit afacf8916c4433f233a34b883dd6dc5209c4eddb
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Mar 4 01:40:02 2021 +0100
[WIP] refactoring pipeline element migration and node controller sync on rejoin
---
.../model/{ => eventrelay}/SpDataStreamRelay.java | 2 +-
.../SpDataStreamRelayContainer.java | 58 ++-
.../model/eventrelay}/metrics/RelayMetrics.java | 2 +-
.../model/graph/DataProcessorDescription.java | 2 +-
.../model/graph/DataProcessorInvocation.java | 2 +-
.../model/message/NotificationType.java | 5 +-
.../pipeline/PipelineElementMigrationEntity.java | 51 ++
.../org/apache/streampipes/model/util/Cloner.java | 2 +-
streampipes-node-controller-container/pom.xml | 4 +
...yResource.java => DataStreamRelayResource.java} | 8 +-
.../container/api/InvocableEntityResource.java | 55 +--
.../api/NodeControllerResourceConfig.java | 4 +-
...ource.java => NodeInfoDescriptionResource.java} | 4 +-
.../container/management/node/NodeManager.java | 5 +-
.../management/pe/InvocableElementManager.java | 111 +++--
.../management/pe/PipelineElementLifeCycle.java | 3 +-
.../management/relay/DataStreamRelayManager.java | 108 ++---
.../container/management/relay/EventRelay.java | 6 +-
.../relay/bridges/MultiBrokerBridge.java | 3 +-
.../manager/execution/http/ElementSubmitter.java | 162 +++++++
.../manager/execution/http/GraphSubmitter.java | 208 ++++-----
.../manager/execution/http/HttpRequestBuilder.java | 9 +-
.../manager/execution/http/MigrationHelpers.java | 75 ---
.../manager/execution/http/PipelineExecutor.java | 513 ---------------------
.../http/StreamRelayEndpointUrlGenerator.java | 2 +-
.../pipeline/AbstractPipelineExecutor.java | 458 ++++++++++++++++++
.../execution/pipeline/PipelineExecutor.java | 110 +++++
.../pipeline/PipelineMigrationExecutor.java | 349 ++++++++++++++
.../pipeline/PipelineMigrationHelpers.java | 15 +-
.../{http => pipeline}/PipelineStorageService.java | 2 +-
.../execution/pipeline/migration/Command.java | 17 +-
.../migration/PipelineElementStartCommand.java | 21 +-
.../migration/PipelineElementStopCommand.java | 20 +-
.../pipeline/migration/RelayStartCommand.java | 20 +-
.../pipeline/migration/RelayStopCommand.java | 20 +-
.../manager/matching/InvocationGraphBuilder.java | 2 +-
.../migration/PipelineElementMigrationHandler.java | 147 ++++++
.../manager/node/AbstractClusterManager.java | 112 +++--
.../manager/node/AvailableNodesFetcher.java | 1 +
.../manager/node/NodeClusterManager.java | 112 ++++-
.../streampipes/manager/node/NodeSyncOptions.java | 16 +-
.../streampipes/manager/operations/Operations.java | 51 +-
.../org/apache/streampipes/rest/impl/Node.java | 6 +-
.../streampipes/rest/impl/PipelineResource.java | 9 +-
.../serializers/json/GsonSerializer.java | 2 +
.../jsonld/CustomAnnotationProvider.java | 2 +
.../serializers/jsonld/JsonLdTransformer.java | 1 +
.../streampipes/storage/api/INoSqlStorage.java | 2 +
.../storage/api/INodeDataStreamRelay.java | 22 +-
.../storage/couchdb/CouchDbStorageManager.java | 5 +-
.../couchdb/impl/NodeDataStreamRelayImpl.java | 76 +++
.../streampipes/storage/couchdb/utils/Utils.java | 6 +
ui/src/app/core-model/gen/streampipes-model.ts | 14 +-
.../pipeline-status-dialog.component.ts | 12 +-
54 files changed, 1910 insertions(+), 1124 deletions(-)
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
similarity index 98%
rename from streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
index addbd55..0078b18 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.model;
+package org.apache.streampipes.model.eventrelay;
import io.fogsy.empire.annotations.RdfProperty;
import io.fogsy.empire.annotations.RdfsClass;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
similarity index 66%
rename from streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
index cb1cd40..9db3117 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
@@ -15,12 +15,16 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.model;
+package org.apache.streampipes.model.eventrelay;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
import io.fogsy.empire.annotations.RdfProperty;
import io.fogsy.empire.annotations.RdfsClass;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.vocabulary.StreamPipes;
@@ -39,6 +43,13 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
private static final long serialVersionUID = -4675162465357705480L;
private static final String prefix = "urn:apache.org:relaystreamcontainer:";
+ private static final String RELAY_SUFFIX = "(Stream Relay)";
+
+ @JsonProperty("_id")
+ private @SerializedName("_id") String couchDbId;
+
+ @JsonProperty("_rev")
+ private @SerializedName("_rev") String couchDbRev;
@OneToOne(fetch = FetchType.EAGER,
cascade = {CascadeType.PERSIST, CascadeType.MERGE})
@@ -77,6 +88,31 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
this.eventRelayStrategy = eventRelayStrategy;
}
+ public SpDataStreamRelayContainer(DataProcessorInvocation desc) {
+ super(desc.getElementId());
+ this.setName(makeRelayName(desc.getName()));
+ this.setEventRelayStrategy(desc.getEventRelayStrategy());
+ this.setRunningStreamRelayInstanceId(desc.getDeploymentRunningInstanceId());
+ this.setInputGrounding(new EventGrounding(desc.getOutputStream().getEventGrounding()));
+ this.setOutputStreamRelays(desc.getOutputStreamRelays());
+ this.setDeploymentTargetNodeHostname(desc.getDeploymentTargetNodeHostname());
+ this.setDeploymentTargetNodePort(desc.getDeploymentTargetNodePort());
+ this.setDeploymentTargetNodeId(desc.getDeploymentTargetNodeId());
+ }
+
+ public SpDataStreamRelayContainer(String runningStreamRelayInstanceId, String eventRelayStrategy, SpDataStream desc,
+ List<SpDataStreamRelay> dataStreamRelays) {
+ super(desc.getElementId());
+ this.setRunningStreamRelayInstanceId(runningStreamRelayInstanceId);
+ this.setEventRelayStrategy(eventRelayStrategy);
+ this.setName(makeRelayName(desc.getName()));
+ this.setInputGrounding(new EventGrounding(desc.getEventGrounding()));
+ this.setDeploymentTargetNodeId(desc.getDeploymentTargetNodeId());
+ this.setDeploymentTargetNodeHostname(desc.getDeploymentTargetNodeHostname());
+ this.setDeploymentTargetNodePort(desc.getDeploymentTargetNodePort());
+ this.setOutputStreamRelays(dataStreamRelays);
+ }
+
public SpDataStreamRelayContainer(NamedStreamPipesEntity other) {
super(other);
}
@@ -136,4 +172,24 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
public void setDeploymentTargetNodePort(Integer deploymentTargetNodePort) {
this.deploymentTargetNodePort = deploymentTargetNodePort;
}
+
+ private String makeRelayName(String name) {
+ return name + " " + RELAY_SUFFIX;
+ }
+
+ public String getCouchDbId() {
+ return couchDbId;
+ }
+
+ public void setCouchDbId(String couchDbId) {
+ this.couchDbId = couchDbId;
+ }
+
+ public String getCouchDbRev() {
+ return couchDbRev;
+ }
+
+ public void setCouchDbRev(String couchDbRev) {
+ this.couchDbRev = couchDbRev;
+ }
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
index 6a019ab..e70667a 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.management.relay.metrics;
+package org.apache.streampipes.model.eventrelay.metrics;
import org.apache.streampipes.model.grounding.TransportProtocol;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
index ac43c71..cee32d4 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.model.graph;
import io.fogsy.empire.annotations.RdfProperty;
import io.fogsy.empire.annotations.RdfsClass;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.model.staticproperty.StaticProperty;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 5c3236b..1e7a4d6 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.model.graph;
import io.fogsy.empire.annotations.RdfProperty;
import io.fogsy.empire.annotations.RdfsClass;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.model.staticproperty.StaticProperty;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
index 4a1c81e..d7bf388 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
@@ -68,7 +68,10 @@ public enum NotificationType {
INSTALLATION_SUCCESSFUL("Installation successful", ""),
PROPERTY_FILE_WRITTEN("Writing properties file...", ""),
- ADMIN_USER_CREATED("Creating admin user...", "");
+ ADMIN_USER_CREATED("Creating admin user...", ""),
+
+ NODE_JOIN_SUCCESS("Success", "Node successfully joined"),
+ NODE_JOIN_ERROR("Error", "Node could not be joined");
private final String title;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java
new file mode 100644
index 0000000..1973d6e
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.pipeline;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+public class PipelineElementMigrationEntity {
+
+ private InvocableStreamPipesEntity sourceElement;
+ private InvocableStreamPipesEntity targetElement;
+
+ public PipelineElementMigrationEntity() {
+ }
+
+ public PipelineElementMigrationEntity(InvocableStreamPipesEntity sourceElement,
+ InvocableStreamPipesEntity targetElement) {
+ this.sourceElement = sourceElement;
+ this.targetElement = targetElement;
+ }
+
+ public InvocableStreamPipesEntity getSourceElement() {
+ return sourceElement;
+ }
+
+ public void setSourceElement(InvocableStreamPipesEntity sourceElement) {
+ this.sourceElement = sourceElement;
+ }
+
+ public InvocableStreamPipesEntity getTargetElement() {
+ return targetElement;
+ }
+
+ public void setTargetElement(InvocableStreamPipesEntity targetElement) {
+ this.targetElement = targetElement;
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index 2ac64de..dc92659 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.model.util;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.model.output.*;
import org.apache.streampipes.model.resource.Hardware;
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index 4076064..d5e0f49 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -131,6 +131,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-jaxb-annotations</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
similarity index 82%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
index 00bb34a..0883a68 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
@@ -17,7 +17,7 @@
*/
package org.apache.streampipes.node.controller.container.api;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -25,7 +25,7 @@ import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
@Path("/api/v2/node/stream/relay")
-public class AdapterDataStreamRelayResource extends AbstractResource {
+public class DataStreamRelayResource extends AbstractResource {
@POST
@JacksonSerialized
@@ -33,7 +33,7 @@ public class AdapterDataStreamRelayResource extends AbstractResource {
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public javax.ws.rs.core.Response invoke(SpDataStreamRelayContainer graph) {
- return ok(DataStreamRelayManager.getInstance().startAdapterDataStreamRelay(graph));
+ return ok(DataStreamRelayManager.getInstance().start(graph));
}
@DELETE
@@ -41,6 +41,6 @@ public class AdapterDataStreamRelayResource extends AbstractResource {
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public javax.ws.rs.core.Response detach(@PathParam("runningInstanceId") String runningInstanceId) {
- return ok(DataStreamRelayManager.getInstance().stopAdapterDataStreamRelay(runningInstanceId));
+ return ok(DataStreamRelayManager.getInstance().stop(runningInstanceId));
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 57f8c29..6028438 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -21,12 +21,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
-import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
@@ -34,12 +31,6 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.*;
-import java.lang.annotation.Annotation;
-import java.net.URI;
-import java.util.Date;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
@Path("/api/v2/node/element")
public class InvocableEntityResource extends AbstractResource {
@@ -62,26 +53,22 @@ public class InvocableEntityResource extends AbstractResource {
@Produces(MediaType.APPLICATION_JSON)
public javax.ws.rs.core.Response invoke(@PathParam("identifier") String identifier,
@PathParam("elementId") String elementId, InvocableStreamPipesEntity graph) {
- String endpoint;
if (identifier.equals(DATA_PROCESSOR_PREFIX)) {
- endpoint = graph.getBelongsTo();
- DataStreamRelayManager.getInstance().startPipelineElementDataStreamRelay((DataProcessorInvocation) graph);
- Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
- if (resp.isSuccess()) {
+ Response elementResponse = InvocableElementManager.getInstance().invoke(graph);
+ if (elementResponse.isSuccess()) {
RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
}
- return ok(resp);
+ return ok(elementResponse);
}
// Currently no data sinks are registered at node controller. If we, at some point, want to also run data
// sinks on edge nodes we need to register there Declarer at the node controller one startup.
else if (identifier.equals(DATA_SINK_PREFIX)) {
- endpoint = graph.getBelongsTo();
- Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
- if (resp.isSuccess()) {
+ Response elementResponse = InvocableElementManager.getInstance().invoke(graph);
+ if (elementResponse.isSuccess()) {
RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
}
- return ok(resp);
+ return ok(elementResponse);
}
return ok();
@@ -96,37 +83,7 @@ public class InvocableEntityResource extends AbstractResource {
String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
Response resp = InvocableElementManager.getInstance().detach(endpoint + SLASH + runningInstanceId);
RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
- DataStreamRelayManager.getInstance().stopPipelineElementDataStreamRelay(runningInstanceId);
return ok(resp);
}
-
-
- @DELETE
- @Path("{identifier}/{elementId}/{runningInstanceId}/relay")
- @Produces(MediaType.APPLICATION_JSON)
- public Response detachRelay(@PathParam("identifier") String identifier,
- @PathParam("elementId") String elementId,
- @PathParam("runningInstanceId") String runningInstanceId) {
- return DataStreamRelayManager.getInstance().stopDataStreamRelay(runningInstanceId);
- }
-
- @POST
- @Path("{identifier}/{elementId}/{runningInstanceId}/relay")
- @Produces(MediaType.APPLICATION_JSON)
- public Response invokeRelay(@PathParam("identifier") String identifier,
- @PathParam("elementId") String elementId,
- @PathParam("runningInstanceId") String runningInstanceId,
- SpDataStreamRelayContainer relay) {
- return DataStreamRelayManager.getInstance().startDataStreamRelay(relay, runningInstanceId);
- }
-
- private String toJson(InvocableStreamPipesEntity graph) {
- try {
- return JacksonSerializer.getObjectMapper().writeValueAsString(graph);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- throw new SpRuntimeException("Could not serialize object: " + graph);
- }
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index 5fcd3dc..ac277e8 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@ -25,9 +25,9 @@ public class NodeControllerResourceConfig extends ResourceConfig {
public NodeControllerResourceConfig() {
register(HealthCheckResource.class);
- register(InfoStatusResource.class);
+ register(NodeInfoDescriptionResource.class);
register(InvocableEntityResource.class);
- register(AdapterDataStreamRelayResource.class);
+ register(DataStreamRelayResource.class);
register(ConnectResource.class);
register(ContainerResource.class);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
similarity index 97%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
index 7e7b36a..6bbe2f5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
@@ -28,7 +28,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/api/v2/node/info")
-public class InfoStatusResource extends AbstractResource {
+public class NodeInfoDescriptionResource extends AbstractResource {
private static final String ACTIVATE = "activate";
private static final String DEACTIVATE = "deactivate";
@@ -70,6 +70,6 @@ public class InfoStatusResource extends AbstractResource {
@Path("/relays")
@Produces(MediaType.APPLICATION_JSON)
public Response getAllRelays() {
- return ok(DataStreamRelayManager.getInstance().getAllRelays());
+ return ok(DataStreamRelayManager.getInstance().getAllRelaysMetrics());
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index d9ec40d..0f01751 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -127,13 +127,13 @@ public class NodeManager {
}
public Message activate() {
- LOG.info("Deactivate node controller");
+ LOG.info("Activate node controller");
this.nodeInfo.setActive(true);
return Notifications.success(NotificationType.OPERATION_SUCCESS);
}
public Message deactivate() {
- LOG.info("Activate node controller");
+ LOG.info("Deactivate node controller");
this.nodeInfo.setActive(false);
return Notifications.success(NotificationType.OPERATION_SUCCESS);
}
@@ -151,6 +151,7 @@ public class NodeManager {
SuccessMessage message = JacksonSerializer
.getObjectMapper()
.readValue(resp, SuccessMessage.class);
+ LOG.info(message.getNotifications().get(0).getDescription());
return message.isSuccess();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 4db36b3..9891ee6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -17,12 +17,16 @@
*/
package org.apache.streampipes.node.controller.container.management.pe;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.gson.JsonSyntaxException;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.node.NodeManager;
import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -31,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
public class InvocableElementManager implements PipelineElementLifeCycle {
@@ -58,50 +63,19 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
@Override
public void register(InvocableRegistration registration) {
- try {
- Request.Put(makeConsulRegistrationEndpoint())
- .addHeader("accept", "application/json")
- .body(new StringEntity(JacksonSerializer
- .getObjectMapper()
- .writeValueAsString(registration.getConsulServiceRegistrationBody())))
- .execute();
-
- // TODO: persistent storage to survive failures
- NodeManager.getInstance()
- .retrieveNodeInfoDescription()
- .setSupportedElements(registration.getSupportedPipelineElementAppIds());
-
- String url = "http://"
- + NodeControllerConfig.INSTANCE.getBackendHost()
- + ":"
- + NodeControllerConfig.INSTANCE.getBackendPort()
- + "/"
- + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
- + "/"
- + NodeControllerConfig.INSTANCE.getNodeControllerId();
-
- String desc = JacksonSerializer.getObjectMapper()
- .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
-
- Request.Put(url)
- .bodyString(desc, ContentType.APPLICATION_JSON)
-// .connectTimeout(1000)
-// .socketTimeout(100000)
- .execute();
-
- LOG.info("Successfully registered pipeline element container");
- } catch (IOException e) {
- e.printStackTrace();
- }
+ registerAtConsul(registration);
+ updateAndSyncNodeInfoDescription(registration);
+ LOG.info("Successfully registered pipeline element container");
}
@Override
- public Response invoke(String endpoint, String payload) {
+ public Response invoke(InvocableStreamPipesEntity graph) {
+ String endpoint = graph.getBelongsTo();
LOG.info("Invoke pipeline element: {}", endpoint);
try {
org.apache.http.client.fluent.Response httpResp = Request
.Post(endpoint)
- .bodyString(payload, ContentType.APPLICATION_JSON)
+ .bodyString(toJson(graph), ContentType.APPLICATION_JSON)
.connectTimeout(CONNECT_TIMEOUT)
.execute();
return handleResponse(httpResp);
@@ -129,29 +103,58 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
@Override
public void unregister(){
// TODO: unregister element from Consul and
+ setSupportedPipelineElements(Collections.emptyList());
+ try {
+ String url = generateBackendEndpoint();
+ String desc = toJson(getNodeInfoDescription());
+ Request.Put(url)
+ .bodyString(desc, ContentType.APPLICATION_JSON)
+ .execute();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
+ setSupportedPipelineElements(registration.getSupportedPipelineElementAppIds());
+ try {
+ String url = generateBackendEndpoint();
+ String desc = toJson(getNodeInfoDescription());
+ Request.Put(url)
+ .bodyString(desc, ContentType.APPLICATION_JSON)
+ .execute();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private NodeInfoDescription getNodeInfoDescription() {
+ return NodeManager.getInstance().retrieveNodeInfoDescription();
+ }
+
+ private void setSupportedPipelineElements(List<String> supportedPipelineElements) {
NodeManager.getInstance()
.retrieveNodeInfoDescription()
- .setSupportedElements(Collections.emptyList());
+ .setSupportedElements(supportedPipelineElements);
+ }
- String url = "http://"
+ private String generateBackendEndpoint() {
+ return HTTP_PROTOCOL
+ NodeControllerConfig.INSTANCE.getBackendHost()
- + ":"
+ + COLON
+ NodeControllerConfig.INSTANCE.getBackendPort()
- + "/"
+ + SLASH
+ "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
- + "/"
+ + SLASH
+ NodeControllerConfig.INSTANCE.getNodeControllerId();
+ }
+ private void registerAtConsul(InvocableRegistration registration) {
try {
- String desc = JacksonSerializer.getObjectMapper()
- .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
-
- Request.Put(url)
- .bodyString(desc, ContentType.APPLICATION_JSON)
- .connectTimeout(1000)
- .socketTimeout(100000)
+ Request.Put(makeConsulRegistrationEndpoint())
+ .addHeader("accept", "application/json")
+ .body(new StringEntity(toJson(registration.getConsulServiceRegistrationBody())))
.execute();
-
} catch (IOException e) {
e.printStackTrace();
}
@@ -183,4 +186,12 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
}
}
+ private <T> String toJson(T element) {
+ try {
+ return JacksonSerializer.getObjectMapper().writeValueAsString(element);
+ } catch (JsonProcessingException e) {
+ throw new SpRuntimeException("Could not serialize object: " + element, e);
+ }
+ }
+
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index 0ad32ee..107c717 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -19,12 +19,13 @@ package org.apache.streampipes.node.controller.container.management.pe;
import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
public interface PipelineElementLifeCycle {
void register(InvocableRegistration registration);
- Response invoke(String endpoint, String payload);
+ Response invoke(InvocableStreamPipesEntity graph);
Response detach(String runningInstanceId);
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
index 638e9bc..0fbe538 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
@@ -17,16 +17,17 @@
*/
package org.apache.streampipes.node.controller.container.management.relay;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.SpDataStreamRelay;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+
+import java.util.*;
import java.util.stream.Collectors;
public class DataStreamRelayManager {
@@ -45,80 +46,65 @@ public class DataStreamRelayManager {
return instance;
}
- public Response startAdapterDataStreamRelay(SpDataStreamRelayContainer desc) {
- String strategy = desc.getEventRelayStrategy();
- String runningInstanceId = desc.getRunningStreamRelayInstanceId();
- TransportProtocol source = desc.getInputGrounding().getTransportProtocol();
-
- Map<String, EventRelay> eventRelayMap = new HashMap<>();
-
- desc.getOutputStreamRelays().forEach(r -> {
- TransportProtocol target = r.getEventGrounding().getTransportProtocol();
- EventRelay eventRelay = new EventRelay(source, target, strategy);
- eventRelay.start();
- eventRelayMap.put(r.getElementId(), eventRelay);
- });
- RunningRelayInstances.INSTANCE.add(desc.getRunningStreamRelayInstanceId(), eventRelayMap);
- return new Response(runningInstanceId,true,"");
- }
-
- public Response stopAdapterDataStreamRelay(String id) {
- return stopDataStreamRelay(id);
+ public Response start(InvocableStreamPipesEntity graph) {
+ return start(convert(graph));
}
-
- public Response startDataStreamRelay(SpDataStreamRelayContainer desc, String id) {
+ public Response start(SpDataStreamRelayContainer desc) {
String strategy = desc.getEventRelayStrategy();
+ String id = desc.getRunningStreamRelayInstanceId();
TransportProtocol source = desc.getInputGrounding().getTransportProtocol();
Map<String, EventRelay> eventRelayMap = new HashMap<>();
- desc.getOutputStreamRelays().forEach(r -> {
- TransportProtocol target = r.getEventGrounding().getTransportProtocol();
- EventRelay eventRelay = new EventRelay(source, target, strategy);
- eventRelay.start();
- eventRelayMap.put(r.getElementId(), eventRelay);
+ // start data stream relay
+ // 1:1 mapping -> remote forward
+ // 1:n mamping -> remote fan-out
+ desc.getOutputStreamRelays().forEach(relay -> {
+ // add new relay if not running
+ if(!runningRelayExists(id, relay)) {
+ TransportProtocol target = relay.getEventGrounding().getTransportProtocol();
+ EventRelay eventRelay = new EventRelay(source, target, strategy);
+ eventRelay.start();
+ eventRelayMap.put(relay.getElementId(), eventRelay);
+ }
});
- RunningRelayInstances.INSTANCE.add(id, eventRelayMap);
+ RunningRelayInstances.INSTANCE.add(desc.getRunningStreamRelayInstanceId(), eventRelayMap);
return new Response(id,true,"");
}
- public Response stopDataStreamRelay(String id) {
- Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(id);
- if (relay != null) {
- relay.values().forEach(EventRelay::stop);
+ public Response stop(String id) {
+ Map<String, EventRelay> relays = RunningRelayInstances.INSTANCE.get(id);
+ if (relays != null) {
+ relays.values().forEach(EventRelay::stop);
}
RunningRelayInstances.INSTANCE.remove(id);
return new Response(id, true, "");
}
- public void startPipelineElementDataStreamRelay(DataProcessorInvocation graph) {
- TransportProtocol source = graph
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocol();
-
- String strategy = graph.getEventRelayStrategy();
- Map<String, EventRelay> eventRelayMap = new HashMap<>();
-
- List<SpDataStreamRelay> dataStreamRelays = graph.getOutputStreamRelays();
- dataStreamRelays.forEach(r -> {
- TransportProtocol target = r.getEventGrounding().getTransportProtocol();
- EventRelay eventRelay = new EventRelay(source, target, strategy);
- eventRelay.start();
- eventRelayMap.put(r.getElementId(), eventRelay);
- });
- RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), eventRelayMap);
- }
-
- public void stopPipelineElementDataStreamRelay(String id) {
- stopDataStreamRelay(id);
- }
-
- public List<RelayMetrics> getAllRelays() {
+ public List<RelayMetrics> getAllRelaysMetrics() {
return RunningRelayInstances.INSTANCE.getRunningInstances()
.stream()
.map(EventRelay::getRelayMetrics)
.collect(Collectors.toList());
}
+
+ private boolean runningRelayExists(String id, SpDataStreamRelay relay) {
+ Map<String, EventRelay> relays = RunningRelayInstances.INSTANCE.get(id);
+ if (relays != null) {
+ return relays.keySet().stream()
+ .anyMatch(eventRelay -> eventRelay.equals(relay.getElementId()));
+ } else {
+ return false;
+ }
+ }
+
+ // Helpers
+
+ private SpDataStreamRelayContainer convert(InvocableStreamPipesEntity graph) {
+ if (graph instanceof DataProcessorInvocation) {
+ return new SpDataStreamRelayContainer((DataProcessorInvocation) graph);
+ }
+ throw new SpRuntimeException("Could not convert pipeline element description to data stream relay");
+ }
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
index 7c52867..f22a815 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
@@ -19,7 +19,7 @@ package org.apache.streampipes.node.controller.container.management.relay;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.grounding.*;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
public class EventRelay extends BaseEventRelay {
@@ -29,11 +29,11 @@ public class EventRelay extends BaseEventRelay {
super(source, target, DEFAULT_EVENT_RELAY_STRATEGY);
}
- public EventRelay(TransportProtocol source, TransportProtocol target, String relayStrategy) {
+ public EventRelay(TransportProtocol source, TransportProtocol target, String relayStrategy) {
super(source, target, relayStrategy);
}
- public void start() throws SpRuntimeException{
+ public void start() throws SpRuntimeException {
this.multiBrokerBridge.start();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
index cc7c0c5..09b9057 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
@@ -21,10 +21,9 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.EventRelay;
-import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.grounding.*;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
import org.apache.streampipes.sdk.helpers.Tuple3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
new file mode 100644
index 0000000..57715b9
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public abstract class ElementSubmitter {
+ protected final static Logger LOG = LoggerFactory.getLogger(ElementSubmitter.class);
+
+ protected final String pipelineId;
+ protected final String pipelineName;
+ protected final List<InvocableStreamPipesEntity> graphs;
+ protected final List<SpDataSet> dataSets;
+ protected final List<SpDataStreamRelayContainer> relays;
+
+ public ElementSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
+ List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> relays) {
+ this.pipelineId = pipelineId;
+ this.pipelineName = pipelineName;
+ this.graphs = graphs;
+ this.dataSets = dataSets;
+ this.relays = relays;
+ }
+
+ protected abstract PipelineOperationStatus invokePipelineElementsAndRelays();
+
+ protected abstract PipelineOperationStatus detachPipelineElementsAndRelays();
+
+ protected abstract PipelineOperationStatus invokeRelaysOnMigration();
+
+ protected abstract PipelineOperationStatus detachRelaysOnMigration();
+
+
+ protected void invokePipelineElements(PipelineOperationStatus status) {
+ graphs.forEach(graph ->
+ status.addPipelineElementStatus(makeInvokeHttpRequest(new InvocableEntityUrlGenerator(graph), graph)));
+ }
+
+ protected void detachPipelineElements(PipelineOperationStatus status) {
+ graphs.forEach(graph ->
+ status.addPipelineElementStatus(makeDetachHttpRequest(new InvocableEntityUrlGenerator(graph), graph)));
+ }
+
+ protected void invokeDataSets(PipelineOperationStatus status) {
+ dataSets.forEach(dataset ->
+ status.addPipelineElementStatus(makeInvokeHttpRequest(new DataSetEntityUrlGenerator(dataset), dataset)));
+ }
+
+ protected void detachDataSets(PipelineOperationStatus status) {
+ dataSets.forEach(dataset ->
+ status.addPipelineElementStatus(makeDetachHttpRequest(new DataSetEntityUrlGenerator(dataset), dataset)));
+ }
+
+ protected void invokeRelays(PipelineOperationStatus status) {
+ relays.forEach(relay ->
+ status.addPipelineElementStatus(makeInvokeHttpRequest(new StreamRelayEndpointUrlGenerator(relay), relay)));
+ }
+
+ protected void detachRelays(PipelineOperationStatus status) {
+ relays.forEach(relay ->
+ status.addPipelineElementStatus(makeDetachHttpRequest(new StreamRelayEndpointUrlGenerator(relay), relay)));
+ }
+
+
+ protected PipelineElementStatus makeInvokeHttpRequest(EndpointUrlGenerator<?> urlGenerator,
+ NamedStreamPipesEntity entity) {
+ if (entity instanceof SpDataStreamRelay) {
+ // data stream relays
+ return new HttpRequestBuilder(entity, urlGenerator.generateRelayEndpoint()).invoke();
+ } else {
+ // data sets, data processors, data sinks
+ return new HttpRequestBuilder(entity, urlGenerator.generateInvokeEndpoint()).invoke();
+ }
+ }
+
+ protected PipelineElementStatus makeDetachHttpRequest(EndpointUrlGenerator<?> urlGenerator,
+ NamedStreamPipesEntity entity) {
+ if (entity instanceof SpDataStreamRelay) {
+ // data stream relays
+ return new HttpRequestBuilder(entity, urlGenerator.generateRelayEndpoint()).detach();
+ } else {
+ // data sets, data processors, data sinks
+ return new HttpRequestBuilder(entity, urlGenerator.generateDetachEndpoint()).detach();
+ }
+ }
+
+ protected PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
+ String errorMessage, boolean rollbackIfFailed) {
+ status.setSuccess(status.getElementStatus().stream()
+ .allMatch(PipelineElementStatus::isSuccess));
+
+ if (status.isSuccess()) {
+ status.setTitle(successMessage);
+ } else {
+ if (rollbackIfFailed) {
+ LOG.info("Could not start pipeline, initializing rollback...");
+ rollbackInvokedEntities(status);
+ }
+ status.setTitle(errorMessage);
+ }
+ return status;
+ }
+
+ private void rollbackInvokedEntities(PipelineOperationStatus status) {
+ for (PipelineElementStatus s : status.getElementStatus()) {
+ if (s.isSuccess()) {
+ Optional<InvocableStreamPipesEntity> graphs = findGraphs(s.getElementId());
+ graphs.ifPresent(graph -> {
+ LOG.info("Rolling back element " + graph.getElementId());
+ makeDetachHttpRequest(new InvocableEntityUrlGenerator(graph), graph);
+ });
+ }
+ }
+ }
+
+ // Helper methods
+
+ protected PipelineOperationStatus initPipelineOperationStatus() {
+ PipelineOperationStatus status = new PipelineOperationStatus();
+ status.setPipelineId(pipelineId);
+ status.setPipelineName(pipelineName);
+ return status;
+ }
+
+ private Optional<InvocableStreamPipesEntity> findGraphs(String elementId) {
+ return graphs.stream().filter(i -> i.getBelongsTo().equals(elementId)).findFirst();
+ }
+
+ protected boolean allInvocableEntitiesRunning(PipelineOperationStatus status) {
+ return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
+ }
+
+ protected boolean relaysExist() {
+ return relays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0);
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index ab14532..726d1a6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -18,88 +18,57 @@
package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-public class GraphSubmitter {
+public class GraphSubmitter extends ElementSubmitter {
- private final static Logger LOG = LoggerFactory.getLogger(GraphSubmitter.class);
-
- private final List<InvocableStreamPipesEntity> graphs;
- private final List<SpDataSet> dataSets;
- private final String pipelineId;
- private final String pipelineName;
- private final List<SpDataStreamRelayContainer> streamRelays;
-
- public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
- List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> streamRelays) {
- this.graphs = graphs;
- this.pipelineId = pipelineId;
- this.pipelineName = pipelineName;
- this.dataSets = dataSets;
- this.streamRelays = streamRelays;
+ public GraphSubmitter(String pipelineId, String pipelineName) {
+ super(pipelineId, pipelineName, new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
}
-
- public PipelineOperationStatus invokeRelays(Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays) {
- PipelineOperationStatus status = initPipelineOperationStatus();
-
- relays.entrySet().forEach(e -> {
- if(e.getValue().getOutputStreamRelays().size() > 0){
- if(e.getKey() instanceof DataProcessorInvocation)
- status.addPipelineElementStatus(makeHttpRequest(new InvocableEntityUrlGenerator((DataProcessorInvocation) e.getKey()), e.getValue(), "invokeRelay"));
- else
- status.addPipelineElementStatus(makeHttpRequest(new StreamRelayEndpointUrlGenerator(e.getValue()), e.getValue(), "invoke"));
- }
- });
-
- return verifyPipelineOperationStatus(
- status,
- "Successfully started relays in pipeline " + pipelineName,
- "Could not start relays in pipeline" + pipelineName,
- true);
+ public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs) {
+ super(pipelineId, pipelineName, graphs, new ArrayList<>(), new ArrayList<>());
}
- public PipelineOperationStatus detachRelays(Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays) {
- PipelineOperationStatus status = initPipelineOperationStatus();
-
- relays.entrySet().forEach(e -> {
- if(e.getValue().getOutputStreamRelays().size() > 0){
- if(e.getKey() instanceof DataProcessorInvocation)
- status.addPipelineElementStatus(makeHttpRequest(new InvocableEntityUrlGenerator((DataProcessorInvocation) e.getKey()), e.getValue(), "detachRelay"));
- else
- status.addPipelineElementStatus(makeHttpRequest(new StreamRelayEndpointUrlGenerator(e.getValue()), e.getValue(), "detach"));
- }
- });
-
- return verifyPipelineOperationStatus(
- status,
- "Successfully stopped relays in pipeline " + pipelineName,
- "Could not stop all relays in pipeline " + pipelineName,
- false);
+ public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
+ List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> relays) {
+ super(pipelineId, pipelineName, graphs, dataSets, relays);
}
- public PipelineOperationStatus invokeGraphs() {
+ /**
+ * Called when pipeline is started. This invokes all pipeline elements including relays between adjacent pipeline
+ * element nodes in the pipeline graph. Thereby, we do this along the following procedure:
+ *
+ * - start relays (if any)
+ * - start pipeline elements
+ * - start data sets (only if pipeline elements and relays started)
+ *
+ * We verify the status of this procedure and trigger a graceful rollback in case of any failure. In any case, we
+ * return an appropriate success/error message.
+ *
+ * @return PipelineOperationStatus
+ */
+ @Override
+ public PipelineOperationStatus invokePipelineElementsAndRelays() {
PipelineOperationStatus status = initPipelineOperationStatus();
- if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
- streamRelays.forEach(streamRelay -> invoke(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+ if (relaysExist()) {
+ invokeRelays(status);
}
- graphs.forEach(graph -> invoke(new InvocableEntityUrlGenerator(graph), graph, status));
- // only invoke datasets when following pipeline elements are started
+
+ invokePipelineElements(status);
+
if (allInvocableEntitiesRunning(status)) {
- dataSets.forEach(dataset -> invoke(new DataSetEntityUrlGenerator(dataset), dataset, status));
+ invokeDataSets(status);
}
return verifyPipelineOperationStatus(
@@ -109,13 +78,27 @@ public class GraphSubmitter {
true);
}
- public PipelineOperationStatus detachGraphs() {
+ /**
+ * Called when pipeline is stopped. This detaches all pipeline elements including relays between adjacent pipeline
+ * element nodes in the pipeline graph. Thereby, we do this along the following procedure:
+ *
+ * - stop pipeline elements
+ * - stop data sets
+ * - stop relays (if any)
+ *
+ * We verify the status of this procedure and return the success/error message.
+ *
+ * @return PipelineOperationStatus
+ */
+ @Override
+ public PipelineOperationStatus detachPipelineElementsAndRelays() {
PipelineOperationStatus status = initPipelineOperationStatus();
- graphs.forEach(graph -> detach(new InvocableEntityUrlGenerator(graph), graph, status));
- dataSets.forEach(dataset -> detach(new DataSetEntityUrlGenerator(dataset), dataset, status));
- if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
- streamRelays.forEach(streamRelay -> detach(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+ detachPipelineElements(status);
+ detachDataSets(status);
+
+ if (relaysExist()) {
+ detachRelays(status);
}
return verifyPipelineOperationStatus(
@@ -125,74 +108,41 @@ public class GraphSubmitter {
false);
}
- private PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
- String errorMessage, boolean rollbackIfFailed) {
- status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
- if (status.isSuccess()) {
- status.setTitle(successMessage);
- } else {
- if (rollbackIfFailed) {
- LOG.info("Could not start pipeline, initializing rollback...");
- rollbackInvokedEntities(status);
- }
- status.setTitle(errorMessage);
- }
- return status;
- }
-
- private void rollbackInvokedEntities(PipelineOperationStatus status) {
- for (PipelineElementStatus s : status.getElementStatus()) {
- if (s.isSuccess()) {
- Optional<InvocableStreamPipesEntity> graphs = findGraphs(s.getElementId());
- graphs.ifPresent(graph -> {
- LOG.info("Rolling back element " + graph.getElementId());
- makeHttpRequest(new InvocableEntityUrlGenerator(graph), graph, "detach");
- });
- }
- }
- }
+ /**
+ * Called when pipeline elements are migrated and new relays need to be invoked between adjacent pipeline elements
+ *
+ * @return PipelineOperationStatus
+ */
+ @Override
+ public PipelineOperationStatus invokeRelaysOnMigration() {
+ PipelineOperationStatus status = initPipelineOperationStatus();
- private void invoke(EndpointUrlGenerator<?> urlGenerator,
- NamedStreamPipesEntity namedEntity, PipelineOperationStatus status) {
- status.addPipelineElementStatus(makeHttpRequest(urlGenerator, namedEntity, "invoke"));
- }
+ invokeRelays(status);
- private void detach(EndpointUrlGenerator<?> urlGenerator,
- NamedStreamPipesEntity namedEntity, PipelineOperationStatus status) {
- status.addPipelineElementStatus(makeHttpRequest(urlGenerator, namedEntity, "detach"));
+ return verifyPipelineOperationStatus(
+ status,
+ "Successfully started relays in pipeline " + pipelineName,
+ "Could not start relays in pipeline" + pipelineName,
+ true);
}
- // Helper methods
-
- private PipelineOperationStatus initPipelineOperationStatus() {
- PipelineOperationStatus status = new PipelineOperationStatus();
- status.setPipelineId(pipelineId);
- status.setPipelineName(pipelineName);
- return status;
- }
+ /**
+ * Called when pipeline elements are migrated and new relays need to be detached, e.g. from predecessors to old
+ * pipeline element
+ *
+ * @return PipelineOperationStatus
+ */
+ @Override
+ public PipelineOperationStatus detachRelaysOnMigration() {
+ PipelineOperationStatus status = initPipelineOperationStatus();
- private PipelineElementStatus makeHttpRequest(EndpointUrlGenerator<?> urlGenerator,
- NamedStreamPipesEntity namedEntity, String type) {
- switch (type) {
- case "invoke":
- return new HttpRequestBuilder(namedEntity, urlGenerator.generateInvokeEndpoint()).invoke();
- case "detach":
- return new HttpRequestBuilder(namedEntity, urlGenerator.generateDetachEndpoint()).detach();
- case "invokeRelay":
- return new HttpRequestBuilder(namedEntity, urlGenerator.generateRelayEndpoint()).invoke();
- case "detachRelay":
- return new HttpRequestBuilder(namedEntity, urlGenerator.generateRelayEndpoint()).detach();
- default:
- throw new IllegalArgumentException("Type not known: " + type);
- }
- }
+ detachRelays(status);
- private Optional<InvocableStreamPipesEntity> findGraphs(String elementId) {
- return graphs.stream().filter(i -> i.getBelongsTo().equals(elementId)).findFirst();
+ return verifyPipelineOperationStatus(
+ status,
+ "Successfully stopped relays in pipeline " + pipelineName,
+ "Could not stop all relays in pipeline " + pipelineName,
+ false);
}
- private boolean allInvocableEntitiesRunning(PipelineOperationStatus status) {
- return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
- }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index 7097577..dc0fdd9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -18,21 +18,20 @@
package org.apache.streampipes.manager.execution.http;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.gson.Gson;
+
import com.google.gson.JsonSyntaxException;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
-import org.apache.streampipes.commons.Utils;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.serializers.json.JacksonSerializer;
-import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
+
import java.io.IOException;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java
deleted file mode 100644
index 43aa454..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public class MigrationHelpers {
-
- static public List<Tuple2<DataProcessorInvocation, DataProcessorInvocation>> getDelta(Pipeline pipelineX, Pipeline pipelineY){
- List<Tuple2<DataProcessorInvocation, DataProcessorInvocation>> delta = new ArrayList<>();
- pipelineX.getSepas().forEach(iX -> {
- if (pipelineY.getSepas().stream().filter(iY -> iY.getElementId().equals(iX.getElementId()))
- .noneMatch(iY -> iY.getDeploymentTargetNodeId().equals(iX.getDeploymentTargetNodeId()))){
- Optional<DataProcessorInvocation> invocationY = pipelineY.getSepas().stream().
- filter(iY -> iY.getDeploymentRunningInstanceId().equals(iX.getDeploymentRunningInstanceId())).findFirst();
- invocationY.ifPresent(dataProcessorInvocation -> delta.add(new Tuple2<>(iX, dataProcessorInvocation)));
- }
- });
- return delta;
- }
-
- static public PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
- String errorMessage) {
- //Duplicate from method in GraphSubmitter
- status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
- if (status.isSuccess()) {
- status.setTitle(successMessage);
- } else {
- status.setTitle(errorMessage);
- }
- return status;
- }
-
- public static void exchangePipelineElement(Pipeline exchangePipeline, Tuple2<? extends NamedStreamPipesEntity, ? extends NamedStreamPipesEntity> entity){
-
- if (entity.a instanceof DataProcessorInvocation){
- int index = exchangePipeline.getSepas().indexOf(entity.b);
- exchangePipeline.getSepas().remove(index);
- exchangePipeline.getSepas().add(index, (DataProcessorInvocation) entity.a);
- }
- }
-
- public static Pipeline deepCopyPipeline(Pipeline object) throws JsonProcessingException {
- ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.readValue(objectMapper.writeValueAsString(object), Pipeline.class);
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
deleted file mode 100644
index 11ecbf4..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.grounding.EventGrounding;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.user.management.encryption.CredentialsManager;
-import org.lightcouch.DocumentConflictException;
-
-import java.security.GeneralSecurityException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-public class PipelineExecutor {
-
- private Pipeline pipeline;
- private boolean visualize;
- private boolean storeStatus;
- private boolean monitor;
-
- public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
- boolean monitor) {
- this.pipeline = pipeline;
- this.visualize = visualize;
- this.storeStatus = storeStatus;
- this.monitor = monitor;
- }
-
- public PipelineOperationStatus startPipeline() {
-
- pipeline.getSepas().forEach(this::updateGroupIds);
- pipeline.getActions().forEach(this::updateGroupIds);
-
- List<DataProcessorInvocation> sepas = pipeline.getSepas();
- List<DataSinkInvocation> secs = pipeline.getActions();
-
- List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
- SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-
- for (SpDataSet ds : dataSets) {
- ds.setCorrespondingPipeline(pipeline.getPipelineId());
- }
-
- List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
- graphs.addAll(sepas);
- graphs.addAll(secs);
-
- List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
-
- graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
-
- List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(decryptedGraphs);
-
- PipelineOperationStatus status = new GraphSubmitter(
- pipeline.getPipelineId(),
- pipeline.getName(),
- decryptedGraphs,
- dataSets,
- dataStreamRelayContainers).invokeGraphs();
-
- if (status.isSuccess()) {
- storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-
- PipelineStatusManager.addPipelineStatus(
- pipeline.getPipelineId(),
- new PipelineStatusMessage(pipeline.getPipelineId(),
- System.currentTimeMillis(),
- PipelineStatusMessageType.PIPELINE_STARTED.title(),
- PipelineStatusMessageType.PIPELINE_STARTED.description()));
-
- if (storeStatus) {
- setPipelineStarted(pipeline);
- }
- }
- return status;
- }
-
- public PipelineOperationStatus stopPipeline() {
- List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
- List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
-
- List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(graphs);
-
- PipelineOperationStatus status = new GraphSubmitter(
- pipeline.getPipelineId(),
- pipeline.getName(),
- graphs,
- dataSets,
- dataStreamRelayContainers).detachGraphs();
-
- if (status.isSuccess()) {
- if (visualize) {
- StorageDispatcher
- .INSTANCE
- .getNoSqlStore()
- .getVisualizationStorageApi()
- .deleteVisualization(pipeline.getPipelineId());
- }
- if (storeStatus) {
- setPipelineStopped(pipeline);
- }
-
- PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
- new PipelineStatusMessage(pipeline.getPipelineId(),
- System.currentTimeMillis(),
- PipelineStatusMessageType.PIPELINE_STOPPED.title(),
- PipelineStatusMessageType.PIPELINE_STOPPED.description()));
-
- }
- return status;
- }
-
- public PipelineOperationStatus migratePipelineElement(Pipeline currentPipeline,
- Tuple2<DataProcessorInvocation, DataProcessorInvocation> targetElement){
- PipelineOperationStatus status = initPipelineOperationStatus();
- //Purge existing relays
- pipeline.getSepas().forEach(s -> s.setOutputStreamRelays(new ArrayList<>()));
- //Generate new relays
- PipelineGraph newPipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
- new InvocationGraphBuilder(newPipelineGraph, pipeline.getPipelineId()).buildGraphs();
- //Get predecessors
- List<NamedStreamPipesEntity> predecessors = new ArrayList<>();
- PipelineGraph currentPipelineGraph = new PipelineGraphBuilder(currentPipeline).buildGraph();
- PipelineGraphHelpers
- .findStreams(newPipelineGraph)
- .forEach(source -> predecessors.addAll(getPredecessors(source, targetElement.a, newPipelineGraph, new ArrayList<>())));
- //Find counterpart for predecessors in currentPipeline
- List<Tuple2<NamedStreamPipesEntity, NamedStreamPipesEntity>> matchingPredecessors = new ArrayList<>();
- for(NamedStreamPipesEntity e : predecessors){
- matchingPredecessors.add(new Tuple2<>(e, findMatching(e, currentPipelineGraph)));
- }
- List<NamedStreamPipesEntity> currentPredecessors = matchingPredecessors.stream().map(t -> t.b).collect(Collectors.toList());
-
-
- PipelineOperationStatus statusStartTarget = startPipelineElement(Collections.singletonList(targetElement.a));
- statusStartTarget.getElementStatus().forEach(status::addPipelineElementStatus);
- if(!statusStartTarget.isSuccess()){
- //Target PE could not be started; nothing to roll back
- return status;
- }
- //Stop relay to origin
- PipelineOperationStatus statusStopRelays = stopRelays(currentPredecessors, targetElement.b);
- statusStopRelays.getElementStatus().forEach(status::addPipelineElementStatus);
- if(!statusStopRelays.isSuccess()){
- //Rollback of target PE and all successfully stopped Relays
- stopPipelineElements(Collections.singletonList(targetElement.a))
- .getElementStatus().forEach(status::addPipelineElementStatus);
-
- Set<String> relaysToRollBack = statusStopRelays.getElementStatus().stream().filter(PipelineElementStatus::isSuccess)
- .map(PipelineElementStatus::getElementId).collect(Collectors.toSet());
- Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> rollbackRelays = findRelays(currentPredecessors, targetElement.b)
- .entrySet()
- .stream()
- .filter(entry -> relaysToRollBack.contains(entry.getValue().getRunningStreamRelayInstanceId()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).invokeRelays(rollbackRelays)
- .getElementStatus().forEach(status::addPipelineElementStatus);
- return status;
- }
- //start Relay to target
- PipelineOperationStatus statusStartRelays = startRelays(predecessors, targetElement.a);
- statusStartRelays.getElementStatus().forEach(status::addPipelineElementStatus);
- if(!statusStartRelays.isSuccess()){
- //Rollback target PE, stopped relays and all successfully started Relays
- stopPipelineElements(Collections.singletonList(targetElement.a))
- .getElementStatus().forEach(status::addPipelineElementStatus);
-
- startRelays(currentPredecessors, targetElement.b)
- .getElementStatus().forEach(status::addPipelineElementStatus);
-
- Set<String> relaysToRollBack = statusStartRelays.getElementStatus().stream().filter(PipelineElementStatus::isSuccess)
- .map(PipelineElementStatus::getElementId).collect(Collectors.toSet());
- Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> rollbackRelays = findRelays(predecessors, targetElement.a)
- .entrySet()
- .stream()
- .filter(entry -> relaysToRollBack.contains(entry.getValue().getRunningStreamRelayInstanceId()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).detachRelays(rollbackRelays)
- .getElementStatus().forEach(status::addPipelineElementStatus);
- return status;
- }
- //Stop origin and associated relay
- PipelineOperationStatus statusStopOrigin = stopPipelineElements(Collections.singletonList(targetElement.b));
- statusStopOrigin.getElementStatus().forEach(status::addPipelineElementStatus);
-
- if(!statusStopOrigin.isSuccess()){
- //Rollback everything
- stopRelays(predecessors, targetElement.a)
- .getElementStatus().forEach(status::addPipelineElementStatus);
- startRelays(currentPredecessors, targetElement.b)
- .getElementStatus().forEach(status::addPipelineElementStatus);
- stopPipelineElements(Collections.singletonList(targetElement.a))
- .getElementStatus().forEach(status::addPipelineElementStatus);
- return status;
- }
-
- List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
- graphs.addAll(pipeline.getActions());
- graphs.addAll(pipeline.getSepas());
- List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
- SpDataSet((SpDataSet) s)).collect(Collectors.toList());
- storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-
- status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
- return status;
- }
-
- private PipelineOperationStatus stopPipelineElements(List<NamedStreamPipesEntity> graphs){
- List<InvocableStreamPipesEntity> invocations = new ArrayList<>();
- graphs.stream().filter(i -> i instanceof InvocableStreamPipesEntity).forEach(i -> invocations.add((InvocableStreamPipesEntity) i));
-
- if (invocations.isEmpty()) {
- PipelineOperationStatus ret = initPipelineOperationStatus();
- ret.setSuccess(true);
- return ret;
- }
- return new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), invocations, new ArrayList<>(), new ArrayList<>())
- .detachGraphs();
- }
-
- private PipelineOperationStatus startPipelineElement(List<NamedStreamPipesEntity> graphs){
- List<InvocableStreamPipesEntity> invocations = new ArrayList<>();
- graphs.stream().filter(i -> i instanceof InvocableStreamPipesEntity).forEach(i -> invocations.add((InvocableStreamPipesEntity) i));
-
- if (invocations.isEmpty()) {
- PipelineOperationStatus ret = initPipelineOperationStatus();
- ret.setSuccess(true);
- return ret;
- }
- List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(invocations);
- return new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), decryptedGraphs, new ArrayList<>(), new ArrayList<>())
- .invokeGraphs();
- }
-
- private PipelineOperationStatus stopRelays(List<NamedStreamPipesEntity> predecessors, InvocableStreamPipesEntity target){
- Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = findRelays(predecessors, target);
- return new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).detachRelays(relays);
- }
-
- private PipelineOperationStatus startRelays(List<NamedStreamPipesEntity> predecessors, InvocableStreamPipesEntity target){
- Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = findRelays(predecessors, target);
- return new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).invokeRelays(relays);
- }
-
- private Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
- InvocableStreamPipesEntity target){
-
- Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = new HashMap<>();
-
- predecessors.forEach(pred -> {
- List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
- SpDataStreamRelayContainer dsRelayContainer = new SpDataStreamRelayContainer();
-
- if (pred instanceof DataProcessorInvocation){
- //Data Processor
- DataProcessorInvocation processorInvocation = (DataProcessorInvocation) pred;
- dataStreamRelays.addAll(processorInvocation.getOutputStreamRelays().stream().
- filter(r ->
- target.getInputStreams().stream().map(s ->
- s.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName())
- .collect(Collectors.toSet()).contains(r.getEventGrounding().getTransportProtocol()
- .getTopicDefinition().getActualTopicName()))
- .collect(Collectors.toList()));
- dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
- dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
- dsRelayContainer.setName(processorInvocation.getName() + " (Stream Relay)");
- dsRelayContainer.setInputGrounding(new EventGrounding(processorInvocation.getOutputStream().getEventGrounding()));
- dsRelayContainer.setDeploymentTargetNodeId(processorInvocation.getDeploymentTargetNodeId());
- dsRelayContainer.setDeploymentTargetNodeHostname(processorInvocation.getDeploymentTargetNodeHostname());
- dsRelayContainer.setDeploymentTargetNodePort(processorInvocation.getDeploymentTargetNodePort());
- dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
- relays.put(pred, dsRelayContainer);
- } else if (pred instanceof SpDataStream){
- //DataStream
- SpDataStream dataStream = (SpDataStream) pred;
- if (!matchingDeploymentTargets(dataStream, target)){
- //There is a relay that needs to be removed
- dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
- .get(getIndex(pred.getDOM(), target))
- .getEventGrounding())));
-
- dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
- dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
- dsRelayContainer.setName(dataStream.getName() + " (Stream Relay)");
- dsRelayContainer.setInputGrounding(new EventGrounding(dataStream.getEventGrounding()));
- dsRelayContainer.setDeploymentTargetNodeId(dataStream.getDeploymentTargetNodeId());
- dsRelayContainer.setDeploymentTargetNodeHostname(dataStream.getDeploymentTargetNodeHostname());
- dsRelayContainer.setDeploymentTargetNodePort(dataStream.getDeploymentTargetNodePort());
- dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
- relays.put(pred, dsRelayContainer);
- }
- }
- });
- return relays;
- }
-
-
- private List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
- InvocableStreamPipesEntity target, PipelineGraph pipelineGraph,
- List<NamedStreamPipesEntity> foundPredecessors){
- //TODO: Check if this works for all graph topologies
- if (pipelineGraph.outgoingEdgesOf(source)
- .stream()
- .map(pipelineGraph::getEdgeTarget)
- .map(g -> (InvocableStreamPipesEntity) g)
- .collect(Collectors.toSet()).contains(target)){
- foundPredecessors.add(source);
- } else{
- List<NamedStreamPipesEntity> successors = pipelineGraph.outgoingEdgesOf(source)
- .stream()
- .map(pipelineGraph::getEdgeTarget)
- .map(g -> (InvocableStreamPipesEntity) g)
- .collect(Collectors.toList());
- if (successors.isEmpty()) return foundPredecessors;
- successors.forEach(successor -> getPredecessors(successor, target, pipelineGraph, foundPredecessors));
- }
- return foundPredecessors;
- }
-
- private NamedStreamPipesEntity findMatching(NamedStreamPipesEntity entity, PipelineGraph pipelineGraph){
- AtomicReference<NamedStreamPipesEntity> match = new AtomicReference<>();
- PipelineGraphHelpers.findStreams(pipelineGraph).forEach(ds -> {
- NamedStreamPipesEntity foundEntity = compareGraphs(ds, entity, pipelineGraph, new ArrayList<>());
- if (foundEntity != null) match.set(foundEntity);
- });
- return match.get();
- }
-
- private NamedStreamPipesEntity compareGraphs(NamedStreamPipesEntity source, NamedStreamPipesEntity searchedEntity, PipelineGraph pipelineGraph, List<NamedStreamPipesEntity> successors){
- if(source.getDOM().equals(searchedEntity.getDOM())) return source;
- else if (successors.isEmpty())
- successors = pipelineGraph.outgoingEdgesOf(source)
- .stream()
- .map(pipelineGraph::getEdgeTarget)
- .map(g -> (InvocableStreamPipesEntity) g)
- .collect(Collectors.toList());
- Optional<NamedStreamPipesEntity> successor= successors.stream().findFirst();
- if (successor.isPresent()) {
- successors.remove(successor.get());
- return compareGraphs(successor.get(), searchedEntity, pipelineGraph, successors);
- }
- return null;
- }
-
- private PipelineOperationStatus initPipelineOperationStatus() {
- //Duplicate from method in GraphSubmitter
- PipelineOperationStatus status = new PipelineOperationStatus();
- status.setPipelineId(pipeline.getPipelineId());
- status.setPipelineName(pipeline.getName());
- return status;
- }
-
-
- private String extractTopic(EventGrounding eg) {
- return eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
- }
-
- private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
- Set<String> topicSet = new HashSet<>();
- List<SpDataStreamRelayContainer> dsRelayContainers = new ArrayList<>();
- SpDataStreamRelayContainer dsRelayContainer = new SpDataStreamRelayContainer();
- List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
-
- graphs.stream()
- .filter(g -> pipeline.getStreams().stream()
- .filter(ds -> topicSet.add(extractTopic(ds.getEventGrounding())))
- .anyMatch(ds -> (!matchingDeploymentTargets(ds, g) && connected(ds, g))))
- .forEach(g -> pipeline.getStreams()
- .forEach(ds -> {
- dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
- dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
- dsRelayContainer.setName(ds.getName() + " (Stream Relay)");
- dsRelayContainer.setInputGrounding(new EventGrounding(ds.getEventGrounding()));
- dsRelayContainer.setDeploymentTargetNodeId(ds.getDeploymentTargetNodeId());
- dsRelayContainer.setDeploymentTargetNodeHostname(ds.getDeploymentTargetNodeHostname());
- dsRelayContainer.setDeploymentTargetNodePort(ds.getDeploymentTargetNodePort());
-
- dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(g.getInputStreams()
- .get(getIndex(ds.getDOM(), g))
- .getEventGrounding())));
- })
- );
-
- dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
- dsRelayContainers.add(dsRelayContainer);
-
- return dsRelayContainers;
- }
-
- private boolean matchingDeploymentTargets(SpDataStream source, InvocableStreamPipesEntity target) {
- return source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
- }
-
- private boolean connected(SpDataStream source, InvocableStreamPipesEntity target) {
- int index = getIndex(source.getDOM(), target);
- if (index != -1) {
- return target.getConnectedTo().get(index).equals(source.getDOM());
- }
- return false;
- }
-
- private void updateGroupIds(InvocableStreamPipesEntity entity) {
- entity.getInputStreams()
- .stream()
- .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
- .map(is -> is.getEventGrounding().getTransportProtocol())
- .map(KafkaTransportProtocol.class::cast)
- .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
- }
-
- private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
- List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
- graphs.stream().map(g -> {
- if (g instanceof DataProcessorInvocation) {
- return new DataProcessorInvocation((DataProcessorInvocation) g);
- } else {
- return new DataSinkInvocation((DataSinkInvocation) g);
- }
- }).forEach(g -> {
- g.getStaticProperties()
- .stream()
- .filter(SecretStaticProperty.class::isInstance)
- .forEach(sp -> {
- try {
- String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
- ((SecretStaticProperty) sp).getValue());
- ((SecretStaticProperty) sp).setValue(decrypted);
- ((SecretStaticProperty) sp).setEncrypted(false);
- } catch (GeneralSecurityException e) {
- e.printStackTrace();
- }
- });
- decryptedGraphs.add(g);
- });
- return decryptedGraphs;
- }
-
- private void setPipelineStarted(Pipeline pipeline) {
- pipeline.setRunning(true);
- pipeline.setStartedAt(new Date().getTime());
- try {
- getPipelineStorageApi().updatePipeline(pipeline);
- } catch (DocumentConflictException dce) {
- //dce.printStackTrace();
- }
- }
-
- private void setPipelineStopped(Pipeline pipeline) {
- pipeline.setRunning(false);
- getPipelineStorageApi().updatePipeline(pipeline);
- }
-
- private void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
- List<SpDataSet> dataSets) {
- TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
- TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
- }
-
- private IPipelineStorage getPipelineStorageApi() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
- }
-
- private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
- return targetElement.getConnectedTo().indexOf(sourceDomId);
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
index e62ff41..5ee8b51 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
@@ -16,7 +16,7 @@
*
*/
package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
public class StreamRelayEndpointUrlGenerator extends EndpointUrlGenerator<SpDataStreamRelayContainer> {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
new file mode 100644
index 0000000..af0aced
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.manager.data.PipelineGraph;
+import org.apache.streampipes.manager.data.PipelineGraphHelpers;
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.manager.node.NodeClusterManager;
+import org.apache.streampipes.manager.util.TemporaryGraphStorage;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.user.management.encryption.CredentialsManager;
+
+import java.security.GeneralSecurityException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public abstract class AbstractPipelineExecutor {
+
+ protected Pipeline pipeline;
+ protected boolean visualize;
+ protected boolean storeStatus;
+ protected boolean monitor;
+
+ public AbstractPipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
+ this.pipeline = pipeline;
+ this.visualize = visualize;
+ this.storeStatus = storeStatus;
+ this.monitor = monitor;
+ }
+
+ // standard methods
+ protected void setPipelineStarted(Pipeline pipeline) {
+ pipeline.setRunning(true);
+ pipeline.setStartedAt(new Date().getTime());
+ getPipelineStorageApi().updatePipeline(pipeline);
+ }
+
+ protected void setPipelineStopped(Pipeline pipeline) {
+ pipeline.setRunning(false);
+ getPipelineStorageApi().updatePipeline(pipeline);
+ }
+
+ protected void deleteVisualization(String pipelineId) {
+ StorageDispatcher.INSTANCE
+ .getNoSqlStore()
+ .getVisualizationStorageApi()
+ .deleteVisualization(pipelineId);
+ }
+
+ protected void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
+ List<SpDataSet> dataSets) {
+ TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
+ TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
+ }
+
+ protected void storeDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+ relays.forEach(NodeClusterManager::persistDataStreamRelay);
+ }
+
+ protected void deleteDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+ relays.forEach(NodeClusterManager::deleteDataStreamRelay);
+ }
+
+ protected void updateDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+ relays.forEach(NodeClusterManager::updateDataStreamRelay);
+ }
+
+
+ protected PipelineOperationStatus startPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
+ List<SpDataStreamRelayContainer> relays){
+ if (graphs.isEmpty()) {
+ return initPipelineOperationStatus();
+ }
+ return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+ graphs, new ArrayList<>(), relays).invokePipelineElementsAndRelays();
+ }
+
+ protected PipelineOperationStatus stopPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
+ List<SpDataStreamRelayContainer> relays){
+ if (graphs.isEmpty()) {
+ return initPipelineOperationStatus();
+ }
+ return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+ graphs, new ArrayList<>(),relays).detachPipelineElementsAndRelays();
+ }
+
+ protected PipelineOperationStatus startRelays(List<SpDataStreamRelayContainer> relays){
+ return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), new ArrayList<>(), new ArrayList<>(),
+ relays).invokeRelaysOnMigration();
+ }
+
+ protected PipelineOperationStatus stopRelays(List<SpDataStreamRelayContainer> relays){
+ return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),new ArrayList<>(), new ArrayList<>(),
+ relays).detachRelaysOnMigration();
+ }
+
+ protected List<SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
+ InvocableStreamPipesEntity target){
+
+ List<SpDataStreamRelayContainer> relays = new ArrayList<>();
+
+ predecessors.forEach(pred -> {
+ List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+ SpDataStreamRelayContainer relayContainer = new SpDataStreamRelayContainer();
+
+ if (pred instanceof DataProcessorInvocation){
+ //Data Processor
+ DataProcessorInvocation graph = (DataProcessorInvocation) pred;
+ if (differentDeploymentTargets(pred, target)) {
+ dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
+
+ //dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
+ relayContainer = new SpDataStreamRelayContainer(graph);
+ relayContainer.setOutputStreamRelays(dataStreamRelays);
+
+ relays.add(relayContainer);
+ }
+ } else if (pred instanceof SpDataStream){
+ //DataStream
+ SpDataStream stream = (SpDataStream) pred;
+ if (differentDeploymentTargets(stream, target)){
+ //There is a relay that needs to be removed
+ dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
+ .get(getIndex(pred.getDOM(), target))
+ .getEventGrounding())));
+
+ String id = extractUniqueAdpaterId(stream.getElementId());
+ String relayStrategy = pipeline.getEventRelayStrategy();
+
+ relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
+ }
+ }
+ });
+ return relays;
+ }
+
+ protected List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
+ InvocableStreamPipesEntity target,
+ PipelineGraph pipelineGraph,
+ List<NamedStreamPipesEntity> foundPredecessors){
+
+ Set<InvocableStreamPipesEntity> targets = getTargetsAsSet(source, pipelineGraph,
+ InvocableStreamPipesEntity.class);
+
+ //TODO: Check if this works for all graph topologies
+ if (targets.contains(target)){
+ foundPredecessors.add(source);
+ } else {
+ List<NamedStreamPipesEntity> successors = getTargetsAsList(source, pipelineGraph,
+ NamedStreamPipesEntity.class);
+
+ if (successors.isEmpty()) return foundPredecessors;
+ successors.forEach(successor -> getPredecessors(successor, target, pipelineGraph, foundPredecessors));
+ }
+ return foundPredecessors;
+ }
+
+ protected NamedStreamPipesEntity findMatching(NamedStreamPipesEntity entity, PipelineGraph pipelineGraph){
+ AtomicReference<NamedStreamPipesEntity> match = new AtomicReference<>();
+ List<SpDataStream> dataStreams = PipelineGraphHelpers.findStreams(pipelineGraph);
+
+ for (SpDataStream stream : dataStreams) {
+ NamedStreamPipesEntity foundEntity = compareGraphs(stream, entity, pipelineGraph, new ArrayList<>());
+ if (foundEntity != null) {
+ match.set(foundEntity);
+ }
+ }
+ return match.get();
+ }
+
+ private NamedStreamPipesEntity compareGraphs(NamedStreamPipesEntity source,
+ NamedStreamPipesEntity searchedEntity,
+ PipelineGraph pipelineGraph,
+ List<NamedStreamPipesEntity> successors){
+ if(matchingDOM(source, searchedEntity)) {
+ return source;
+ } else if (successors.isEmpty()) {
+ successors = getTargetsAsList(source, pipelineGraph, NamedStreamPipesEntity.class);
+ Optional<NamedStreamPipesEntity> successor = successors.stream().findFirst();
+ if (successor.isPresent()) {
+ successors.remove(successor.get());
+ return compareGraphs(successor.get(), searchedEntity, pipelineGraph, successors);
+ }
+ }
+ return null;
+ }
+
+ protected List<SpDataStreamRelayContainer> generateRelays(List<InvocableStreamPipesEntity> graphs) {
+ return generateDataStreamRelays(graphs).stream()
+ .filter(r -> r.getOutputStreamRelays().size() > 0)
+ .collect(Collectors.toList());
+ }
+
+ private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
+ List<SpDataStreamRelayContainer> relays = new ArrayList<>();
+
+ for (InvocableStreamPipesEntity graph : graphs) {
+ for (SpDataStream stream: pipeline.getStreams()) {
+ if (differentDeploymentTargets(stream, graph) && connected(stream, graph)) {
+
+ List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+ dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(graph.getInputStreams()
+ .get(getIndex(stream.getDOM(), graph))
+ .getEventGrounding())));
+
+ String id = extractUniqueAdpaterId(stream.getElementId());
+ String relayStrategy = pipeline.getEventRelayStrategy();
+
+ relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
+ }
+ }
+ for (DataProcessorInvocation processor : pipeline.getSepas()) {
+ if (differentDeploymentTargets(processor, graph) && connected(processor, graph)) {
+ SpDataStreamRelayContainer processorRelay = new SpDataStreamRelayContainer(processor);
+ relays.add(processorRelay);
+ }
+ }
+ }
+ return relays;
+ }
+
+
+ // Helpers
+
+ /**
+ * Updates group.id for data processor/sink. Note: KafkaTransportProtocol only!!
+ *
+ * @param entity data processor/sink
+ */
+ protected void updateKafkaGroupIds(InvocableStreamPipesEntity entity) {
+ entity.getInputStreams()
+ .stream()
+ .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
+ .map(is -> is.getEventGrounding().getTransportProtocol())
+ .map(KafkaTransportProtocol.class::cast)
+ .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
+ }
+
+ /**
+ * Decrypt potential secrets contained in static properties, e.g., passwords
+ *
+ * @param graphs List of graphs
+ * @return List of decrypted graphs
+ */
+ protected List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+ List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
+ graphs.stream().map(g -> {
+ if (g instanceof DataProcessorInvocation) {
+ return new DataProcessorInvocation((DataProcessorInvocation) g);
+ } else {
+ return new DataSinkInvocation((DataSinkInvocation) g);
+ }
+ }).forEach(g -> {
+ g.getStaticProperties()
+ .stream()
+ .filter(SecretStaticProperty.class::isInstance)
+ .forEach(sp -> {
+ try {
+ String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
+ ((SecretStaticProperty) sp).getValue());
+ ((SecretStaticProperty) sp).setValue(decrypted);
+ ((SecretStaticProperty) sp).setEncrypted(false);
+ } catch (GeneralSecurityException e) {
+ e.printStackTrace();
+ }
+ });
+ decryptedGraphs.add(g);
+ });
+ return decryptedGraphs;
+ }
+
+ /**
+ * Get pipeline storage dispatcher API
+ *
+ * @return IPipelineStorage NoSQL storage interface for pipelines
+ */
+ private IPipelineStorage getPipelineStorageApi() {
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+ }
+
+ /**
+ * Extract topic name
+ *
+ * @param entity
+ * @return
+ */
+ private String extractActualTopic(NamedStreamPipesEntity entity) {
+ if (entity instanceof SpDataStream) {
+ return ((SpDataStream) entity)
+ .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
+ } else if (entity instanceof SpDataStreamRelay) {
+ return ((SpDataStreamRelay) entity)
+ .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
+ }
+ throw new SpRuntimeException("Could not extract actual topic name from entity");
+ }
+
+ // Edge / Migration Helpers
+
+ /**
+ * Compare deployment targets of two pipeline elements, namely data stream/processor (source) and data
+ * processor/sink (target)
+ *
+ * @param source data stream/processor
+ * @param target data processor/sink
+ * @return boolean value that returns true if source and target share the same deployment target, else false
+ */
+ private boolean differentDeploymentTargets(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
+ if (source instanceof SpDataStream) {
+ return !((SpDataStream) source).getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+ } else if (source instanceof DataProcessorInvocation) {
+ return !((DataProcessorInvocation) source).getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+ }
+ throw new SpRuntimeException("Matching deployment targets check failed");
+ }
+
+ /**
+ * Find relays with matching topics
+ *
+ * @param graph data processor
+ * @param target data processor/sink
+ * @return collection of data stream relays
+ */
+ private Collection<? extends SpDataStreamRelay> findRelaysWithMatchingTopic(DataProcessorInvocation graph,
+ InvocableStreamPipesEntity target) {
+ return graph.getOutputStreamRelays().stream().
+ filter(relay ->
+ target.getInputStreams().stream()
+ .map(this::extractActualTopic)
+ .collect(Collectors.toSet())
+ .contains(extractActualTopic(relay)))
+ .collect(Collectors.toList());
+ }
+
+
+ private <T> Set<T> getTargetsAsSet(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
+ Class<T> clazz){
+ return pipelineGraph.outgoingEdgesOf(source)
+ .stream()
+ .map(pipelineGraph::getEdgeTarget)
+ .map(clazz::cast)
+ .collect(Collectors.toSet());
+ }
+
+ private <T> List<T> getTargetsAsList(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
+ Class<T> clazz){
+ return new ArrayList<>(getTargetsAsSet(source, pipelineGraph, clazz));
+ }
+
+ /**
+ * Compare connection of two pipeline elements, namely data stream/processor (source) and data processor/sink
+ * (target) by DOM identifier.
+ *
+ * @param source data stream or data processor
+ * @param target data processor/sink
+ * @return boolean value that returns true if source and target are connected, else false
+ */
+ private boolean connected(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
+ int index = getIndex(source.getDOM(), target);
+ if (index != -1) {
+ return target.getConnectedTo().get(index).equals(source.getDOM());
+ }
+ return false;
+ }
+
+ /**
+ * Get index of data processor/sink connection based on source DOM identifier
+ *
+ * @param sourceDomId source DOM identifier
+ * @param target data processor/sink
+ * @return Integer with index of connection, if invalid returns -1.
+ */
+ private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity target) {
+ return target.getConnectedTo().indexOf(sourceDomId);
+ }
+
+ /**
+ * Checks if DOM are equal
+ *
+ * @param source pipeline element
+ * @param target pipeline element
+ * @return true if DOM is the same, else false
+ */
+ private boolean matchingDOM(NamedStreamPipesEntity source, NamedStreamPipesEntity target) {
+ return source.getDOM().equals(target.getDOM());
+ }
+
+ /**
+ * Get List of InvocableStreamPipes entities, i.e., data processors/sinks from list of NameStreamPipesEntity
+ *
+ * @param graphs List<NamedStreamPipesEntity> graphs
+ * @return
+ */
+ private List<InvocableStreamPipesEntity> getListOfInvocableStreamPipesEntity(List<NamedStreamPipesEntity> graphs) {
+ List<InvocableStreamPipesEntity> invocableEntities = new ArrayList<>();
+ graphs.stream()
+ .filter(i -> i instanceof InvocableStreamPipesEntity)
+ .forEach(i -> invocableEntities.add((InvocableStreamPipesEntity) i));
+ return invocableEntities;
+ }
+
+ /**
+ * Create pipeline operation status with pipeline id and name and set success to true
+ *
+ * @return PipelineOperationStatus
+ */
+ protected PipelineOperationStatus initPipelineOperationStatus() {
+ PipelineOperationStatus status = new PipelineOperationStatus();
+ status.setPipelineId(pipeline.getPipelineId());
+ status.setPipelineName(pipeline.getName());
+ status.setSuccess(true);
+ return status;
+ }
+
+ private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
+ return graphs
+ .stream()
+ .filter(clazz::isInstance)
+ .map(clazz::cast)
+ .collect(Collectors.toList());
+ }
+
+ private String extractUniqueAdpaterId(String s) {
+ return s.substring(s.lastIndexOf("/") + 1);
+ }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
new file mode 100644
index 0000000..17aa1ce
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
+import org.apache.streampipes.manager.util.TemporaryGraphStorage;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.message.PipelineStatusMessage;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class PipelineExecutor extends AbstractPipelineExecutor {
+
+ public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
+ super(pipeline, visualize, storeStatus, monitor);
+ }
+
+ public PipelineOperationStatus startPipeline() {
+
+ pipeline.getSepas().forEach(this::updateKafkaGroupIds);
+ pipeline.getActions().forEach(this::updateKafkaGroupIds);
+
+ List<DataProcessorInvocation> sepas = pipeline.getSepas();
+ List<DataSinkInvocation> secs = pipeline.getActions();
+
+ List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
+ SpDataSet((SpDataSet) s)).collect(Collectors.toList());
+
+ for (SpDataSet ds : dataSets) {
+ ds.setCorrespondingPipeline(pipeline.getPipelineId());
+ }
+
+ List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+ graphs.addAll(sepas);
+ graphs.addAll(secs);
+
+ List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
+
+ graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
+
+ List<SpDataStreamRelayContainer> relays = generateRelays(decryptedGraphs);
+
+ PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+ decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
+
+ if (status.isSuccess()) {
+ storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+ storeDataStreamRelayContainer(relays);
+
+ PipelineStatusManager.addPipelineStatus(
+ pipeline.getPipelineId(),
+ new PipelineStatusMessage(pipeline.getPipelineId(),
+ System.currentTimeMillis(),
+ PipelineStatusMessageType.PIPELINE_STARTED.title(),
+ PipelineStatusMessageType.PIPELINE_STARTED.description()));
+
+ if (storeStatus) setPipelineStarted(pipeline);
+ }
+ return status;
+ }
+
+ public PipelineOperationStatus stopPipeline() {
+ List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
+ List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
+ List<SpDataStreamRelayContainer> relays = generateRelays(graphs);
+
+ PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), graphs,
+ dataSets, relays).detachPipelineElementsAndRelays();
+
+ if (status.isSuccess()) {
+ if (visualize) deleteVisualization(pipeline.getPipelineId());
+ if (storeStatus) setPipelineStopped(pipeline);
+
+ deleteDataStreamRelayContainer(relays);
+
+ PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
+ new PipelineStatusMessage(pipeline.getPipelineId(),
+ System.currentTimeMillis(),
+ PipelineStatusMessageType.PIPELINE_STOPPED.title(),
+ PipelineStatusMessageType.PIPELINE_STOPPED.description()));
+ }
+ return status;
+ }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
new file mode 100644
index 0000000..e610d24
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.manager.data.PipelineGraph;
+import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import org.apache.streampipes.manager.data.PipelineGraphHelpers;
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
+
+ /**
+ * Old pipeline before migration
+ */
+ private final Pipeline pipelineBeforeMigration;
+
+ /**
+ * Pipeline element to be migrated. Contains pair of source and target element description
+ */
+ private final PipelineElementMigrationEntity migrationEntity;
+
+ /**
+ * Predecessors of the pipeline element to be migrated in the migration pipeline
+ */
+ private final List<NamedStreamPipesEntity> predecessorsAfterMigration;
+
+ /**
+ * Predecessors of the pipeline element to be migrated in the old pipeline before migration
+ */
+ private final List<NamedStreamPipesEntity> predecessorsBeforeMigration;
+
+ /**
+ * Collection of relays that were created in the migration process needing to be stored
+ */
+ private final List<SpDataStreamRelayContainer> relaysToBePersisted;
+
+ /**
+ * Collection of relays that were removed in the migration process needing to be deleted from persistent storage.
+ */
+ private final List<SpDataStreamRelayContainer> relaysToBeDeleted;
+
+
+ public PipelineMigrationExecutor(Pipeline pipeline, Pipeline pipelineBeforeMigration,
+ PipelineElementMigrationEntity migrationEntity,
+ boolean visualize, boolean storeStatus, boolean monitor) {
+ super(pipeline, visualize, storeStatus, monitor);
+ this.pipelineBeforeMigration = pipelineBeforeMigration;
+ this.migrationEntity = migrationEntity;
+ this.predecessorsAfterMigration = new ArrayList<>();
+ this.predecessorsBeforeMigration = new ArrayList<>();
+ this.relaysToBePersisted = new ArrayList<>();
+ this.relaysToBeDeleted = new ArrayList<>();
+ }
+
+ // TODO: refactor!
+ public PipelineOperationStatus migratePipelineElement() {
+
+ PipelineOperationStatus status = initPipelineOperationStatus();
+
+ // 1. start new element
+ // 2. stop relay to origin element
+ // 3. start relay to new element
+ // 4. stop origin element
+ // 5. stop origin relay
+
+ prepareMigration();
+
+ // Start target pipeline elements and relays on new target node
+ PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
+ if(!statusStartTarget.isSuccess()){
+ //Target PE could not be started; nothing to roll back
+ return status;
+ }
+
+ // Stop relays from origin predecessor
+ PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
+ if(!statusStopRelays.isSuccess()){
+ rollbackToPreMigrationStepOne(statusStopRelays, status);
+ return status;
+ }
+
+ // Start relays to target after migration
+ PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
+ if(!statusStartRelays.isSuccess()){
+ rollbackToPreMigrationStepTwo(statusStartRelays, status);
+ return status;
+ }
+
+ //Stop origin and associated relay
+ PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
+ if(!statusStopOrigin.isSuccess()){
+ rollbackToPreMigrationStepThree(status);
+ return status;
+ }
+
+ List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+ graphs.addAll(pipeline.getActions());
+ graphs.addAll(pipeline.getSepas());
+
+ List<SpDataSet> dataSets = findDataSets();
+
+ // store new pipeline and relays
+ storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+ storeDataStreamRelayContainer(relaysToBePersisted);
+ deleteDataStreamRelayContainer(relaysToBeDeleted);
+
+ // set global status
+ status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+
+ return status;
+ }
+
+ private void prepareMigration() {
+ //Purge existing relays
+ purgeExistingRelays();
+
+ //Generate new relays
+ PipelineGraph pipelineGraphAfterMigration = new PipelineGraphBuilder(pipeline).buildGraph();
+ buildGraphWithRelays(pipelineGraphAfterMigration);
+
+ //Get predecessors
+ PipelineGraph pipelineGraphBeforeMigration = new PipelineGraphBuilder(pipelineBeforeMigration).buildGraph();
+ findPredecessorsInMigrationPipeline(pipelineGraphAfterMigration);
+
+ //Find counterpart for predecessors in currentPipeline
+ findAndComparePredecessorsInCurrentPipeline(pipelineGraphBeforeMigration);
+ }
+
+ private void rollbackToPreMigrationStepOne(PipelineOperationStatus statusStopRelays,
+ PipelineOperationStatus status) {
+ // Stop target pipeline element and relays on new target node
+ PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
+
+ // Restart relays before migration attempt
+ // extract unique running instance ids of original relays
+ Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStopRelays);
+ PipelineOperationStatus statusRelaysRollback = rollbackToOriginRelaysByInvoke(relayIdsToRollback);
+
+ // Add status to global migration status
+ updateMigrationStatus(statusTargetRollback, status);
+ updateMigrationStatus(statusRelaysRollback, status);
+ }
+
+ private void rollbackToPreMigrationStepTwo(PipelineOperationStatus statusStartRelays,
+ PipelineOperationStatus status) {
+ //Rollback target PE, stopped relays and all successfully started relays
+ PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
+
+ // Restart relays to origin from predecessors before migration
+ PipelineOperationStatus statusRelaysInvokeRollback = startRelays(findRelays(predecessorsBeforeMigration,
+ migrationEntity.getSourceElement()));
+
+ // Stop relays that were started due to migration attempt
+ // extract unique running instance ids of original relays
+ Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStartRelays);
+ PipelineOperationStatus statusRelaysDetachRollback = rollbackTargetRelaysByDetach(relayIdsToRollback);
+
+ // Add status to global migration status
+ updateMigrationStatus(statusTargetRollback, status);
+ updateMigrationStatus(statusRelaysInvokeRollback, status);
+ updateMigrationStatus(statusRelaysDetachRollback, status);
+ }
+
+ private void rollbackToPreMigrationStepThree(PipelineOperationStatus status) {
+ //Rollback everything
+ PipelineOperationStatus statusStopRelays = stopRelays(findRelays(predecessorsAfterMigration,
+ migrationEntity.getTargetElement()));
+
+ PipelineOperationStatus statusStartRelays = startRelays(findRelays(predecessorsBeforeMigration,
+ migrationEntity.getSourceElement()));
+
+ List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
+ List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+ PipelineOperationStatus statusStopTargetAndRelays = stopPipelineElementsAndRelays(graphs, relays);
+
+ // Add status to global migration status
+ updateMigrationStatus(statusStopRelays, status);
+ updateMigrationStatus(statusStartRelays, status);
+ updateMigrationStatus(statusStopTargetAndRelays, status);
+ }
+
+
+ private PipelineOperationStatus rollbackToOriginRelaysByInvoke(Set<String> relayIdsToRollback) {
+ List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
+ predecessorsBeforeMigration, migrationEntity.getSourceElement());
+
+ return startRelays(rollbackRelays);
+ }
+
+ private PipelineOperationStatus rollbackTargetRelaysByDetach(Set<String> relayIdsToRollback) {
+ List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
+ predecessorsAfterMigration, migrationEntity.getTargetElement());
+
+ return stopRelays(rollbackRelays);
+ }
+
+ private PipelineOperationStatus rollbackTargetPipelineElementsAndRelays() {
+ List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
+ List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+ return new GraphSubmitter(pipeline.getPipelineId(),
+ pipeline.getName(), graphs, new ArrayList<>(), relays).detachPipelineElementsAndRelays();
+ }
+
+ private PipelineOperationStatus startRelaysFromPredecessorsAfterMigration(PipelineOperationStatus status) {
+ List<SpDataStreamRelayContainer> relays = findRelays(predecessorsAfterMigration,
+ migrationEntity.getTargetElement());
+
+ updateRelaysToBePersisted(relays);
+
+ PipelineOperationStatus statusStartRelays = startRelays(relays);
+ updateMigrationStatus(statusStartRelays, status);
+
+ return statusStartRelays;
+ }
+
+ private PipelineOperationStatus stopRelaysFromPredecessorsBeforeMigration(PipelineOperationStatus status) {
+ List<SpDataStreamRelayContainer> relays = findRelays(predecessorsBeforeMigration,
+ migrationEntity.getSourceElement());
+
+ updateRelaysToBeDeleted(relays);
+
+ PipelineOperationStatus statusStopRelays = stopRelays(relays);
+ updateMigrationStatus(statusStopRelays, status);
+
+ return statusStopRelays;
+ }
+
+ private PipelineOperationStatus startTargetPipelineElementsAndRelays(PipelineOperationStatus status) {
+ List<InvocableStreamPipesEntity> decryptedGraphs =
+ decryptSecrets(Collections.singletonList(migrationEntity.getTargetElement()));
+ List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(decryptedGraphs);
+
+ updateRelaysToBePersisted(relays);
+
+ PipelineOperationStatus statusStartTarget = startPipelineElementsAndRelays(decryptedGraphs, relays);
+ updateMigrationStatus(statusStartTarget, status);
+
+ return statusStartTarget;
+ }
+
+ private PipelineOperationStatus stopOriginPipelineElementsAndRelays(PipelineOperationStatus status) {
+ List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getSourceElement());
+ List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+ updateRelaysToBeDeleted(relays);
+
+ PipelineOperationStatus statusStopOrigin = stopPipelineElementsAndRelays(graphs, relays);
+ updateMigrationStatus(statusStopOrigin, status);
+
+ return statusStopOrigin;
+ }
+
+ // Helpers
+
+ private void updateRelaysToBePersisted(List<SpDataStreamRelayContainer> relays) {
+ relays.stream()
+ .filter(r -> r.getOutputStreamRelays().size() > 0)
+ .forEach(relaysToBePersisted::add);
+ }
+
+ private void updateRelaysToBeDeleted(List<SpDataStreamRelayContainer> relays) {
+ relays.stream()
+ .filter(r -> r.getOutputStreamRelays().size() > 0)
+ .forEach(relaysToBeDeleted::add);
+ }
+
+ private List<SpDataStreamRelayContainer> findRelaysAndFilterById(Set<String> relayIdsToRollback,
+ List<NamedStreamPipesEntity> predecessor,
+ InvocableStreamPipesEntity target) {
+ return findRelays(predecessor, target).stream()
+ .filter(relay -> relayIdsToRollback.contains(relay.getRunningStreamRelayInstanceId()))
+ .collect(Collectors.toList());
+ }
+
+ private void findAndComparePredecessorsInCurrentPipeline(PipelineGraph pipelineGraphBeforeMigration) {
+ predecessorsAfterMigration.forEach(migrationPredecessor ->
+ predecessorsBeforeMigration.add(findMatching(migrationPredecessor, pipelineGraphBeforeMigration)));
+ }
+
+ private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
+ PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).forEach(stream ->
+ predecessorsAfterMigration.addAll(getPredecessors(stream, migrationEntity.getTargetElement(),
+ pipelineGraphAfterMigration, new ArrayList<>())));
+ }
+
+ private List<SpDataSet> findDataSets() {
+ return pipeline.getStreams().stream()
+ .filter(s -> s instanceof SpDataSet)
+ .map(s -> new SpDataSet((SpDataSet) s))
+ .collect(Collectors.toList());
+ }
+
+ private void buildGraphWithRelays(PipelineGraph pipelineGraphAfterMigration) {
+ new InvocationGraphBuilder(pipelineGraphAfterMigration, pipeline.getPipelineId()).buildGraphs();
+ }
+
+ private void purgeExistingRelays() {
+ pipeline.getSepas().forEach(s -> s.setOutputStreamRelays(new ArrayList<>()));
+ }
+
+ private void updateMigrationStatus(PipelineOperationStatus partialStatus, PipelineOperationStatus status) {
+ // Add status to global migration status
+ partialStatus.getElementStatus().forEach(status::addPipelineElementStatus);
+ }
+
+ private List<SpDataStreamRelayContainer> extractRelaysFromDataProcessor(List<InvocableStreamPipesEntity> graphs) {
+ return graphs.stream()
+ .map(DataProcessorInvocation.class::cast)
+ .map(SpDataStreamRelayContainer::new)
+ .collect(Collectors.toList());
+ }
+
+ private Set<String> extractUniqueRelayIds(PipelineOperationStatus status) {
+ return status.getElementStatus().stream()
+ .filter(PipelineElementStatus::isSuccess)
+ .map(PipelineElementStatus::getElementId)
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
index c404896..2258cb9 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
@@ -15,16 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-// public DataSinkPipelineElementResource() {
-// super(DataSinkInvocation.class);
-// }
-//}
+public class PipelineMigrationHelpers {
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
similarity index 98%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
index 9df92ac..23fc3e2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.manager.execution.http;
+package org.apache.streampipes.manager.execution.pipeline;
import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphBuilder;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
index c404896..fd4bdd9 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
@@ -15,16 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-// public DataSinkPipelineElementResource() {
-// super(DataSinkInvocation.class);
-// }
-//}
+public interface Command {
+ void execute();
+ void rollback();
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
index c404896..4afbf46 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
@@ -15,16 +15,17 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class PipelineElementStartCommand implements Command {
-import javax.ws.rs.Path;
+ @Override
+ public void execute() {
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-// public DataSinkPipelineElementResource() {
-// super(DataSinkInvocation.class);
-// }
-//}
+ }
+
+ @Override
+ public void rollback() {
+
+ }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
similarity index 65%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
index bb186e8..5a2bbd6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
@@ -15,16 +15,16 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+public class PipelineElementStopCommand implements Command {
+ @Override
+ public void execute() {
-import javax.ws.rs.Path;
+ }
-//@Path("/api/v2/node/element/sepa")
-//public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
-//
-// public DataProcessorPipelineElementResource() {
-// super(DataProcessorInvocation.class);
-// }
-//}
+ @Override
+ public void rollback() {
+
+ }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
index c404896..b2e242d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
@@ -15,16 +15,16 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class RelayStartCommand implements Command {
+ @Override
+ public void execute() {
-import javax.ws.rs.Path;
+ }
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-// public DataSinkPipelineElementResource() {
-// super(DataSinkInvocation.class);
-// }
-//}
+ @Override
+ public void rollback() {
+
+ }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
index c404896..1938268 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
@@ -15,16 +15,16 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class RelayStopCommand implements Command {
+ @Override
+ public void execute() {
-import javax.ws.rs.Path;
+ }
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-// public DataSinkPipelineElementResource() {
-// super(DataSinkInvocation.class);
-// }
-//}
+ @Override
+ public void rollback() {
+
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index f7cae4f..533b8a2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -28,7 +28,7 @@ import org.apache.streampipes.manager.data.PipelineGraphHelpers;
import org.apache.streampipes.manager.matching.output.OutputSchemaFactory;
import org.apache.streampipes.manager.matching.output.OutputSchemaGenerator;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
new file mode 100644
index 0000000..4d3f6d5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.migration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.manager.execution.pipeline.PipelineMigrationExecutor;
+import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class PipelineElementMigrationHandler {
+
+ private final PipelineOperationStatus pipelineMigrationStatus;
+ private final Pipeline desiredPipeline;
+ private final Pipeline migrationPipeline;
+ private Pipeline currentPipeline;
+ private final boolean visualize;
+ private final boolean storeStatus;
+ private final boolean monitor;
+
+ public PipelineElementMigrationHandler(Pipeline desiredPipeline, boolean visualize, boolean storeStatus,
+ boolean monitor) {
+ this.pipelineMigrationStatus = new PipelineOperationStatus();
+ this.desiredPipeline = desiredPipeline;
+ this.currentPipeline = getPipelineById(desiredPipeline.getPipelineId());
+ this.migrationPipeline = getPipelineById(desiredPipeline.getPipelineId());
+ this.visualize = visualize;
+ this.storeStatus = storeStatus;
+ this.monitor = monitor;
+ }
+
+ public PipelineOperationStatus handlePipelineMigration() {
+ migratePipelineElementOrRollback();
+ return verifyPipelineMigrationStatus(pipelineMigrationStatus,
+ "Successfully migrated Pipeline Elements in Pipeline " + desiredPipeline.getName(),
+ "Could not migrate all Pipeline Elements in Pipeline " + desiredPipeline.getName());
+ }
+
+ private void migratePipelineElementOrRollback() {
+ List<PipelineElementMigrationEntity> migrationEntityList = getPipelineDelta(desiredPipeline, migrationPipeline);
+
+ migrationEntityList.forEach(entity -> {
+ swapPipelineElement(migrationPipeline, entity);
+
+ PipelineOperationStatus entityStatus = migratePipelineElement(entity);
+
+ entityStatus.getElementStatus()
+ .forEach(this.pipelineMigrationStatus::addPipelineElementStatus);
+
+ if (entityStatus.isSuccess()) {
+ try {
+ currentPipeline = deepCopyPipeline(migrationPipeline);
+ } catch (JsonProcessingException e) {
+ throw new SpRuntimeException("Could not deep copy pipeline for migration: " + e.getMessage(), e);
+ }
+ } else {
+ PipelineElementMigrationEntity failedEntity =
+ new PipelineElementMigrationEntity(entity.getTargetElement(), entity.getSourceElement());
+
+ swapPipelineElement(migrationPipeline, failedEntity);
+ }
+ });
+
+ Operations.overwritePipeline(migrationPipeline);
+ }
+
+ private PipelineOperationStatus migratePipelineElement(PipelineElementMigrationEntity migrationEntity) {
+ return new PipelineMigrationExecutor(migrationPipeline, currentPipeline, migrationEntity, visualize,
+ storeStatus, monitor).migratePipelineElement();
+ }
+
+
+ // Helpers
+
+ private Pipeline getPipelineById(String pipelineId) {
+ return getPipelineStorageApi().getPipeline(pipelineId);
+ }
+
+ private IPipelineStorage getPipelineStorageApi() {
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+ }
+
+ private List<PipelineElementMigrationEntity> getPipelineDelta(Pipeline pipelineX, Pipeline pipelineY){
+ List<PipelineElementMigrationEntity> delta = new ArrayList<>();
+ pipelineX.getSepas().forEach(iX -> {
+ if (pipelineY.getSepas().stream().filter(iY -> iY.getElementId().equals(iX.getElementId()))
+ .noneMatch(iY -> iY.getDeploymentTargetNodeId().equals(iX.getDeploymentTargetNodeId()))){
+ Optional<DataProcessorInvocation> invocationY = pipelineY.getSepas().stream().
+ filter(iY -> iY.getDeploymentRunningInstanceId().equals(iX.getDeploymentRunningInstanceId())).findFirst();
+ invocationY.ifPresent(dataProcessorInvocation -> delta.add(new PipelineElementMigrationEntity(dataProcessorInvocation, iX)));
+ }
+ });
+ return delta;
+ }
+
+ private PipelineOperationStatus verifyPipelineMigrationStatus(PipelineOperationStatus status, String successMessage,
+ String errorMessage) {
+ //Duplicate from method in GraphSubmitter
+ status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+ if (status.isSuccess()) {
+ status.setTitle(successMessage);
+ } else {
+ status.setTitle(errorMessage);
+ }
+ return status;
+ }
+
+ private void swapPipelineElement(Pipeline exchangePipeline,
+ PipelineElementMigrationEntity migrationEntity){
+ if (migrationEntity.getTargetElement() instanceof DataProcessorInvocation){
+ int index = exchangePipeline.getSepas().indexOf(migrationEntity.getSourceElement());
+ exchangePipeline.getSepas().remove(index);
+ exchangePipeline.getSepas().add(index, (DataProcessorInvocation) migrationEntity.getTargetElement());
+ }
+ }
+
+ public static Pipeline deepCopyPipeline(Pipeline object) throws JsonProcessingException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readValue(objectMapper.writeValueAsString(object), Pipeline.class);
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
index b5803d5..dce1e06 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
@@ -17,8 +17,11 @@
*/
package org.apache.streampipes.manager.node;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
@@ -30,25 +33,50 @@ public abstract class AbstractClusterManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterManager.class.getCanonicalName());
private static final String PROTOCOL = "http://";
- private static final String SLASH = "/";
private static final String COLON = ":";
private static final long RETRY_INTERVAL_MS = 5000;
- private static final Object BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
private static final int CONNECT_TIMEOUT = 1000;
+ private static final String BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
+ private static final String BASE_NODE_CONTROLLER_RELAY_ROUTE = "/api/v2/node/stream/relay";
- protected static boolean syncStateUpdateWithRemoteNodeController(NodeInfoDescription desc, boolean activate) {
+ public enum RequestOptions {
+ GET,POST,PUT,DELETE
+ }
+
+ protected static <T> boolean syncWithNodeController(T element, NodeSyncOptions sync) {
+ switch (sync) {
+ case ACTIVATE_NODE:
+ return sync(element, "/activate", RequestOptions.POST, false, NodeInfoDescription.class);
+ case DEACTIVATE_NODE:
+ return sync(element, "/deactivate", RequestOptions.POST, false, NodeInfoDescription.class);
+ case UPDATE_NODE:
+ return sync(element, "", RequestOptions.PUT, true, NodeInfoDescription.class);
+ case RESTART_RELAYS:
+ return sync(element, "/invoke", RequestOptions.POST, true, SpDataStreamRelayContainer.class);
+ default:
+ return false;
+ }
+ }
+
+ private static <T> boolean sync(T element, String subroute, RequestOptions request, boolean withBody, Class<?> type) {
boolean synced = false;
- String url;
- if (activate) {
- url = generateEndpoint(desc, "/activate");
- } else {
- url = generateEndpoint(desc, "/deactivate");
+
+ String body = "{}";
+ if (withBody) {
+ body = jackson(element);
}
- LOG.info("Trying to sync state update with node controller=" + url);
+
+ String url = generateEndpoint(element, subroute, type);
+ LOG.info("Trying to sync with node controller=" + url);
boolean connected = false;
while (!connected) {
- connected = post(url);
+ // call node controller REST endpoints
+ switch (request) {
+ case POST: connected = post(url, body);
+ case PUT : connected = put(url, body);
+ }
+
if (!connected) {
LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
try {
@@ -57,46 +85,43 @@ public abstract class AbstractClusterManager {
e.printStackTrace();
}
}
- }
- synced = true;
- return synced;
- }
-
- protected static boolean syncWithRemoteNodeController(NodeInfoDescription desc) {
- boolean synced = false;
- try {
- String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
- String url = generateEndpoint(desc);
- LOG.info("Trying to sync description updates with node controller=" + url);
-
- boolean connected = false;
- while (!connected) {
- connected = put(url, body);
- if (!connected) {
- LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
synced = true;
- } catch (IOException e) {
- e.printStackTrace();
}
+ LOG.info("Successfully synced with node controller=" + url);
return synced;
}
- protected static String generateEndpoint(NodeInfoDescription desc) {
- return generateEndpoint(desc, "");
+ // Helpers
+
+ private static <T> String generateEndpoint(T desc, String subroute, Class<?> type) {
+ if (type.equals(NodeInfoDescription.class)) {
+ NodeInfoDescription d = (NodeInfoDescription) desc;
+ return PROTOCOL
+ + d.getHostname()
+ + COLON
+ + d.getPort()
+ + BASE_NODE_CONTROLLER_INFO_ROUTE
+ + subroute;
+ } else {
+ SpDataStreamRelayContainer d = (SpDataStreamRelayContainer) desc;
+ return PROTOCOL
+ + d.getDeploymentTargetNodeHostname()
+ + COLON
+ + d.getDeploymentTargetNodePort()
+ + BASE_NODE_CONTROLLER_RELAY_ROUTE
+ + subroute;
+ }
}
- protected static String generateEndpoint(NodeInfoDescription desc, String subroute) {
- return PROTOCOL + desc.getHostname() + COLON + desc.getPort() + BASE_NODE_CONTROLLER_INFO_ROUTE + subroute;
+ private static <T> String jackson(T desc) {
+ try {
+ return JacksonSerializer.getObjectMapper().writeValueAsString(desc);
+ } catch (JsonProcessingException e) {
+ throw new SpRuntimeException("Could not serialize node controller description");
+ }
}
- protected static boolean put(String url, String body) {
+ private static boolean put(String url, String body) {
try {
Request.Put(url)
.bodyString(body, ContentType.APPLICATION_JSON)
@@ -109,10 +134,10 @@ public abstract class AbstractClusterManager {
return false;
}
- protected static boolean post(String url) {
+ private static boolean post(String url, String body) {
try {
Request.Post(url)
- .bodyString("{}", ContentType.APPLICATION_JSON)
+ .bodyString(body, ContentType.APPLICATION_JSON)
.connectTimeout(CONNECT_TIMEOUT)
.execute();
return true;
@@ -121,4 +146,5 @@ public abstract class AbstractClusterManager {
}
return false;
}
+
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
index c2d9278..2071e12 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
@@ -28,6 +28,7 @@ import java.util.List;
import javax.ws.rs.core.MediaType;
+@Deprecated
public class AvailableNodesFetcher {
public AvailableNodesFetcher() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index 3f58f2f..007dc15 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@ -17,9 +17,13 @@
*/
package org.apache.streampipes.manager.node;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.storage.api.INodeDataStreamRelay;
+import org.apache.streampipes.storage.api.INodeInfoStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,70 +32,132 @@ import java.util.List;
import java.util.Optional;
public class NodeClusterManager extends AbstractClusterManager {
-
private static final Logger LOG = LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
+
public static List<NodeInfoDescription> getAvailableNodes() {
//return new AvailableNodesFetcher().fetchNodes();
- return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllActiveNodes();
+ return getNodeStorageApi().getAllActiveNodes();
}
+
public static List<NodeInfoDescription> getAllNodes() {
//return new AvailableNodesFetcher().fetchNodes();
- return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+ return getNodeStorageApi().getAllNodes();
}
public static Message updateNode(NodeInfoDescription desc) {
- boolean successfullyUpdated = syncWithRemoteNodeController(desc);
+ boolean successfullyUpdated = syncWithNodeController(desc, NodeSyncOptions.UPDATE_NODE);
if (successfullyUpdated) {
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().updateNode(desc);
+ getNodeStorageApi().updateNode(desc);
return Notifications.success("Node updated");
}
return Notifications.error("Could not update node");
}
public static boolean deactivateNode(String nodeControllerId) {
- Optional<NodeInfoDescription> storedNode =
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+ Optional<NodeInfoDescription> storedNode = getNodeStorageApi().getNode(nodeControllerId);
boolean status = false;
if (storedNode.isPresent()) {
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deactivateNode(nodeControllerId);
- status = syncStateUpdateWithRemoteNodeController(storedNode.get(), false);
+ getNodeStorageApi().deactivateNode(nodeControllerId);
+ status = syncWithNodeController(storedNode.get(), NodeSyncOptions.DEACTIVATE_NODE);
}
return status;
}
public static boolean activateNode(String nodeControllerId) {
- Optional<NodeInfoDescription> storedNode =
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+ Optional<NodeInfoDescription> storedNode = getNodeStorageApi().getNode(nodeControllerId);
boolean status = false;
if (storedNode.isPresent()) {
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().activateNode(nodeControllerId);
- status = syncStateUpdateWithRemoteNodeController(storedNode.get(), true);
+ getNodeStorageApi().activateNode(nodeControllerId);
+ status = syncWithNodeController(storedNode.get(), NodeSyncOptions.ACTIVATE_NODE);
}
return status;
}
- public static void addNode(NodeInfoDescription desc) {
- List<NodeInfoDescription> allNodes =
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+ public static Message addOrRejoin(NodeInfoDescription desc) {
+ Optional<NodeInfoDescription> latestDesc = getLatestNodeOrElseEmpty(desc.getNodeControllerId());
boolean alreadyRegistered = false;
- if (allNodes.size() > 0) {
- alreadyRegistered = allNodes.stream()
- .anyMatch(n -> n.getNodeControllerId().equals(desc.getNodeControllerId()));
+ if (latestDesc.isPresent()) {
+ alreadyRegistered = true;
}
if (!alreadyRegistered) {
- LOG.info("New cluster node join registration request on from http://{}:{}", desc.getHostname(), desc.getPort());
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().storeNode(desc);
- LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+ LOG.info("New cluster node join request from http://{}:{}", desc.getHostname(), desc.getPort());
+ return addNewNode(desc);
} else {
LOG.info("Re-joined cluster node from http://{}:{}", desc.getHostname(), desc.getPort());
+ return rejoinAndSyncNode(latestDesc.get());
+ }
+ }
+
+ private static Optional<NodeInfoDescription> getLatestNodeOrElseEmpty(String nodeControllerId) {
+ return getNodeStorageApi().getAllNodes().stream()
+ .filter(n -> n.getNodeControllerId().equals(nodeControllerId))
+ .findAny();
+ }
+
+ private static Message addNewNode(NodeInfoDescription desc) throws RuntimeException {
+ try {
+ getNodeStorageApi().storeNode(desc);
+ LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+ return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
+ } catch (Exception e) {
+ return Notifications.success(NotificationType.NODE_JOIN_ERROR);
+ }
+ }
+
+ private static Message rejoinAndSyncNode(NodeInfoDescription desc) {
+ LOG.info("Sync latest node description to http://{}:{}", desc.getHostname(), desc.getPort());
+ boolean success = syncWithNodeController(desc, NodeSyncOptions.UPDATE_NODE);
+ if (success) {
+ return restartRelays(desc);
+ }
+ return Notifications.success(NotificationType.NODE_JOIN_ERROR);
+ }
+
+ private static Message restartRelays(NodeInfoDescription desc) {
+ List<SpDataStreamRelayContainer> runningRelays = getDataStreamRelay(desc.getNodeControllerId());
+ if (runningRelays.size() > 0) {
+ runningRelays.forEach(relay -> {
+ LOG.info("Sync active relays name={} to http://{}:{}", relay.getName(), desc.getHostname(),
+ desc.getPort());
+ syncWithNodeController(relay, NodeSyncOptions.RESTART_RELAYS);
+ });
}
+ return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
}
public static void deleteNode(String nodeControllerId) {
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deleteNode(nodeControllerId);
+ getNodeStorageApi().deleteNode(nodeControllerId);
}
+
+
+ public static void persistDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+ getNodeDataStreamRelayStorageApi().addRelayContainer(relayContainer);
+ }
+
+ public static List<SpDataStreamRelayContainer> getDataStreamRelay(String nodeControllerId) {
+ return getNodeDataStreamRelayStorageApi().getAllByNodeControllerId(nodeControllerId);
+ }
+
+ public static void updateDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+ getNodeDataStreamRelayStorageApi().updateRelayContainer(relayContainer);
+ }
+
+ public static void deleteDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+ getNodeDataStreamRelayStorageApi().deleteRelayContainer(relayContainer);
+ }
+
+ // Helpers
+
+ private static INodeInfoStorage getNodeStorageApi() {
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage();
+ }
+
+ private static INodeDataStreamRelay getNodeDataStreamRelayStorageApi(){
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage();
+ }
+
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
similarity index 67%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
index c404896..644e688 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
@@ -15,16 +15,8 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.node;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-// public DataSinkPipelineElementResource() {
-// super(DataSinkInvocation.class);
-// }
-//}
+public enum NodeSyncOptions {
+ ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS;
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 7861a6f..6047b1d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -18,16 +18,15 @@
package org.apache.streampipes.manager.operations;
-import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.manager.endpoint.EndpointItemFetcher;
-import org.apache.streampipes.manager.execution.http.MigrationHelpers;
-import org.apache.streampipes.manager.execution.http.PipelineExecutor;
-import org.apache.streampipes.manager.execution.http.PipelineStorageService;
+import org.apache.streampipes.manager.execution.pipeline.PipelineExecutor;
+import org.apache.streampipes.manager.execution.pipeline.PipelineStorageService;
import org.apache.streampipes.manager.matching.DataSetGroundingSelector;
import org.apache.streampipes.manager.matching.PipelineVerificationHandler;
+import org.apache.streampipes.manager.migration.PipelineElementMigrationHandler;
import org.apache.streampipes.manager.recommender.ElementRecommender;
import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
import org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher;
@@ -38,28 +37,20 @@ import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.message.DataSetModificationMessage;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.pipeline.*;
import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.model.template.PipelineTemplateDescription;
import org.apache.streampipes.model.template.PipelineTemplateInvocation;
-import org.apache.streampipes.sdk.helpers.Tuple2;
import org.apache.streampipes.storage.management.StorageDispatcher;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
/**
@@ -196,35 +187,9 @@ public class Operations {
return PipelineElementRuntimeInfoFetcher.INSTANCE.getCurrentData(spDataStream);
}
- public static PipelineOperationStatus migratePipelineProcessors(Pipeline newPipeline, boolean visualize, boolean storeStatus,
- boolean monitor) throws SpRuntimeException{
-
-
- Pipeline currentPipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline(newPipeline.getPipelineId());
- Pipeline migrationPipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline(newPipeline.getPipelineId());
- PipelineOperationStatus status = new PipelineOperationStatus();
-
- for(Tuple2<DataProcessorInvocation, DataProcessorInvocation> t : MigrationHelpers.getDelta(newPipeline, migrationPipeline)){
- MigrationHelpers.exchangePipelineElement(migrationPipeline, t);
-
- PipelineOperationStatus migrationStatus = new PipelineExecutor(migrationPipeline, visualize, storeStatus, monitor)
- .migratePipelineElement(currentPipeline, t);
- migrationStatus.getElementStatus().forEach(status::addPipelineElementStatus);
- if (migrationStatus.isSuccess()) {
- try {
- currentPipeline = MigrationHelpers.deepCopyPipeline(migrationPipeline);
- } catch (JsonProcessingException e) {
- throw new SpRuntimeException(e);
- }
- } else {
- Tuple2<DataProcessorInvocation, DataProcessorInvocation> failedMigration = new Tuple2<>(t.b, t.a);
- MigrationHelpers.exchangePipelineElement(migrationPipeline, failedMigration);
- }
- }
-
- overwritePipeline(migrationPipeline);
- return MigrationHelpers.verifyPipelineOperationStatus(status,
- "Successfully migrated Pipeline Elements in Pipeline " + newPipeline.getName(),
- "Could not migrate all Pipeline Elements in Pipeline " + newPipeline.getName());
+ public static PipelineOperationStatus handlePipelineElementMigration(Pipeline desiredPipeline, boolean visualize,
+ boolean storeStatus, boolean monitor) throws SpRuntimeException{
+ return new PipelineElementMigrationHandler(desiredPipeline, visualize, storeStatus, monitor).handlePipelineMigration();
}
+
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
index d0f8c9f..e7db8e0 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
@@ -40,8 +40,7 @@ public class Node extends AbstractRestResource implements INode {
@Produces(MediaType.APPLICATION_JSON)
@Override
public Response addNode(@PathParam("username") String username, NodeInfoDescription desc) {
- NodeClusterManager.addNode(desc);
- return statusMessage(Notifications.success(NotificationType.STORAGE_SUCCESS));
+ return statusMessage(NodeClusterManager.addOrRejoin(desc));
}
@PUT
@@ -51,7 +50,8 @@ public class Node extends AbstractRestResource implements INode {
@Produces(MediaType.APPLICATION_JSON)
@Override
public Response updateNode(@PathParam("username") String username,
- @PathParam("nodeControllerId") String nodeControllerId, NodeInfoDescription desc) {
+ @PathParam("nodeControllerId") String nodeControllerId,
+ NodeInfoDescription desc) {
return statusMessage(NodeClusterManager.updateNode(desc));
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index 28c0dee..e8f005e 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -270,11 +270,12 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
@JacksonSerialized
@Operation(summary = "Migrate pipeline elements to new node",
tags = {"Pipeline"})
- public Response migratePipelineProcessors(@PathParam("username") String username,
- @PathParam("pipelineId") String pipelineId,
- Pipeline pipelineNew) {
+ public Response migratePipelineElements(@PathParam("username") String username,
+ @PathParam("pipelineId") String pipelineId,
+ Pipeline desiredPipeline) {
try {
- PipelineOperationStatus status = Operations.migratePipelineProcessors(pipelineNew, true, true, true);
+ PipelineOperationStatus status = Operations.handlePipelineElementMigration(desiredPipeline,
+ true, true, true);
return ok(status);
} catch (Exception e) {
e.printStackTrace();
diff --git a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
index cf056c2..8a6f628 100644
--- a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
+++ b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
@@ -24,6 +24,8 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.streampipes.model.*;
import org.apache.streampipes.model.connect.rules.value.*;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.connect.adapter.*;
import org.apache.streampipes.model.connect.rules.schema.CreateNestedRuleDescription;
diff --git a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index b9ee0fa..d3da942 100644
--- a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -41,6 +41,8 @@ import org.apache.streampipes.model.dashboard.DashboardWidgetModel;
import org.apache.streampipes.model.dashboard.VisualizablePipeline;
import org.apache.streampipes.model.datalake.DataExplorerWidgetModel;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.graph.*;
import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
diff --git a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
index 55b4781..84ae05e 100644
--- a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
+++ b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
@@ -70,6 +70,7 @@ public class JsonLdTransformer implements RdfTransformer {
StreamPipes.DASHBOARD_WIDGET_MODEL,
StreamPipes.DASHBOARD_MODEL,
StreamPipes.DATA_EXPLORER_WIDGET_MODEL,
+ StreamPipes.DATA_STREAM_RELAY,
StreamPipes.DATA_STREAM_RELAY_CONTAINER,
StreamPipes.NODE_INFO_DESCRIPTION,
StreamPipes.DEPLOYMENT_CONTAINER,
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
index 06fdfea..4d84cca 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
@@ -58,4 +58,6 @@ public interface INoSqlStorage {
IPipelineElementTemplateStorage getPipelineElementTemplateStorage();
INodeInfoStorage getNodeStorage();
+
+ INodeDataStreamRelay getNodeDataStreamRelayStorage();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
similarity index 56%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
copy to streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
index 0ad32ee..67e052b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
@@ -15,19 +15,25 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.container.management.pe;
+package org.apache.streampipes.storage.api;
-import org.apache.streampipes.container.model.node.InvocableRegistration;
-import org.apache.streampipes.model.Response;
-public interface PipelineElementLifeCycle {
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
- void register(InvocableRegistration registration);
+import java.util.List;
+import java.util.Optional;
- Response invoke(String endpoint, String payload);
+public interface INodeDataStreamRelay {
- Response detach(String runningInstanceId);
+ List<SpDataStreamRelayContainer> getAll();
- void unregister();
+ List<SpDataStreamRelayContainer> getAllByNodeControllerId(String id);
+ void addRelayContainer(SpDataStreamRelayContainer relayContainer);
+
+ Optional<SpDataStreamRelayContainer> getRelayContainerById(String id);
+
+ void updateRelayContainer(SpDataStreamRelayContainer relayContainer);
+
+ void deleteRelayContainer(SpDataStreamRelayContainer relayContainer);
}
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index 269fb42..739c22b 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -120,5 +120,8 @@ public enum CouchDbStorageManager implements INoSqlStorage {
return new NodeInfoStorageImpl();
}
-
+ @Override
+ public INodeDataStreamRelay getNodeDataStreamRelayStorage() {
+ return new NodeDataStreamRelayImpl();
+ }
}
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java
new file mode 100644
index 0000000..2564418
--- /dev/null
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.storage.couchdb.impl;
+
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.storage.api.INodeDataStreamRelay;
+import org.apache.streampipes.storage.couchdb.dao.AbstractDao;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class NodeDataStreamRelayImpl extends AbstractDao<SpDataStreamRelayContainer> implements INodeDataStreamRelay {
+
+ public NodeDataStreamRelayImpl() {
+ super(Utils::getCouchDbNodeDataStreamRelayClient, SpDataStreamRelayContainer.class);
+ }
+
+ @Override
+ public List<SpDataStreamRelayContainer> getAll() {
+ return findAll();
+ }
+
+ @Override
+ public List<SpDataStreamRelayContainer> getAllByNodeControllerId(String id) {
+ return getAll().stream()
+ .filter(e -> e.getDeploymentTargetNodeId().equals(id))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void addRelayContainer(SpDataStreamRelayContainer relayContainer) {
+ persist(relayContainer);
+ }
+
+ @Override
+ public Optional<SpDataStreamRelayContainer> getRelayContainerById(String s) {
+ return getAll().stream()
+ .filter(e -> e.getRunningStreamRelayInstanceId().equals(s))
+ .findAny();
+ }
+
+ @Override
+ public void updateRelayContainer(SpDataStreamRelayContainer relayContainer) {
+ Optional<SpDataStreamRelayContainer> rc =
+ getRelayContainerById(relayContainer.getRunningStreamRelayInstanceId());
+ if(rc.isPresent()) {
+ relayContainer.setCouchDbId(rc.get().getCouchDbId());
+ relayContainer.setCouchDbRev(rc.get().getCouchDbRev());
+
+ update(relayContainer);
+ }
+ }
+
+ @Override
+ public void deleteRelayContainer(SpDataStreamRelayContainer relayContainer) {
+ Optional<SpDataStreamRelayContainer> rc = getRelayContainerById(relayContainer.getRunningStreamRelayInstanceId());
+ rc.ifPresent(r -> delete(r.getCouchDbId()));
+ }
+}
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
index c1464ba..be64b45 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
@@ -144,6 +144,12 @@ public class Utils {
return dbClient;
}
+ public static CouchDbClient getCouchDbNodeDataStreamRelayClient() {
+ CouchDbClient dbClient = new CouchDbClient(props("relays"));
+ dbClient.setGsonBuilder(GsonSerializer.getGsonBuilder());
+ return dbClient;
+ }
+
public static CouchDbClient getCouchDbInternalUsersClient() {
CouchDbClient dbClient = new CouchDbClient(props("_users"));
return dbClient;
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index f34d68a..2587943 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,10 +19,10 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2021-02-23 09:17:52.
+// Generated using typescript-generator version 2.27.744 on 2021-02-26 19:57:32.
export class AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
+ "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
elementId: string;
static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
@@ -119,7 +119,7 @@ export class Accuracy extends EventPropertyQualityDefinition {
}
export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+ "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
appId: string;
applicationLinks: ApplicationLink[];
connectedTo: string[];
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
instance.internallyManaged = data.internallyManaged;
instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
- instance.dom = data.dom;
instance.uri = data.uri;
+ instance.dom = data.dom;
return instance;
}
}
@@ -1824,8 +1824,8 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
}
const instance = target || new GenericAdapterSetDescription();
super.fromData(data, instance);
- instance.formatDescription = FormatDescription.fromData(data.formatDescription);
instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+ instance.formatDescription = FormatDescription.fromData(data.formatDescription);
instance.eventSchema = EventSchema.fromData(data.eventSchema);
return instance;
}
@@ -1843,8 +1843,8 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
}
const instance = target || new GenericAdapterStreamDescription();
super.fromData(data, instance);
- instance.formatDescription = FormatDescription.fromData(data.formatDescription);
instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+ instance.formatDescription = FormatDescription.fromData(data.formatDescription);
instance.eventSchema = EventSchema.fromData(data.eventSchema);
return instance;
}
@@ -3074,7 +3074,7 @@ export class SpDataSet extends SpDataStream {
}
export class SpDataStreamRelay extends NamedStreamPipesEntity {
- "@class": "org.apache.streampipes.model.SpDataStreamRelay";
+ "@class": "org.apache.streampipes.model.eventrelay.SpDataStreamRelay";
eventGrounding: EventGrounding;
static fromData(data: SpDataStreamRelay, target?: SpDataStreamRelay): SpDataStreamRelay {
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
index 4014443..8c27f41 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
@@ -29,7 +29,7 @@ import {Component, Input} from "@angular/core";
export class PipelineStatusDialogComponent {
statusDetailsVisible: any;
- elementStati : [[PipelineElementStatus]];
+ elementStati : Array<Array<PipelineElementStatus>>;
@Input()
pipelineOperationStatus: PipelineOperationStatus;
@@ -40,18 +40,15 @@ export class PipelineStatusDialogComponent {
}
ngOnInit(){
- console.log(this.pipelineOperationStatus.elementStatus)
- let nodes: [String];
- nodes = [];
+ let nodes = [];
this.pipelineOperationStatus.elementStatus.forEach(stat => {
if (!nodes.includes(stat.elementNode)){
nodes.push(stat.elementNode)
}
})
- console.log(nodes)
+
nodes.forEach(node =>{
- let nodeStati : [PipelineElementStatus];
- nodeStati = [];
+ let nodeStati = [];
this.pipelineOperationStatus.elementStatus.forEach(stat =>{
if(stat.elementNode == node){
nodeStati.push(stat);
@@ -59,7 +56,6 @@ export class PipelineStatusDialogComponent {
})
this.elementStati.push(nodeStati);
})
- console.log(this.elementStati)
}
close() {