You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/15 14:15:55 UTC
[incubator-streampipes] 02/02: [WIP] pipeline element live
reconfiguration
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 56656c335c9e871ef3ea1d3a7d4b1656b9f3ec0c
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Mar 15 15:12:47 2021 +0100
[WIP] pipeline element live reconfiguration
---
.../container/api/InvocableEntityResource.java | 11 ++-
.../management/pe/InvocableElementManager.java | 89 ++++++++++++++--------
.../management/pe/PipelineElementLifeCycle.java | 3 +-
.../http/ReconfigurationEndpointUrlGenerator.java | 4 +-
.../execution/http/ReconfigurationSubmitter.java | 3 -
.../streampipes/manager/operations/Operations.java | 5 +-
.../PipelineElementReconfigurationHandler.java | 59 ++++----------
.../streampipes/rest/impl/PipelineResource.java | 7 +-
.../messaging-configuration.component.html | 22 +++++-
.../shared/messaging-settings.model.ts | 1 +
.../connect/services/data-marketplace.service.ts | 6 ++
.../components/edit/quickedit.component.ts | 44 +++++++++--
.../app/platform-services/apis/pipeline.service.ts | 4 +-
13 files changed, 151 insertions(+), 107 deletions(-)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 8397373..e54e6dd 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -32,6 +32,7 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -99,14 +100,12 @@ public class InvocableEntityResource extends AbstractResource {
// Adaptation
@POST
@JacksonSerialized
- @Path("{identifier}/{elementId}/{runningInstanceId}/adapt")
+ @Path("/adapt/{runningInstanceId}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public javax.ws.rs.core.Response adapt(@PathParam("identifier") String identifier,
- @PathParam("elementId") String elementId,
- @PathParam("runningInstanceId") String runningInstanceId,
- String payload) {
+ public javax.ws.rs.core.Response adapt( @PathParam("runningInstanceId") String runningInstanceId,
+ PipelineElementReconfigurationEntity reconfigurationEntity) {
InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
- return ok(InvocableElementManager.getInstance().adapt(graph, payload));
+ return ok(InvocableElementManager.getInstance().adapt(graph, reconfigurationEntity));
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 9d627c9..e15fd5c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.node.controller.container.management.pe;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonSyntaxException;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
@@ -31,11 +30,10 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
import org.apache.streampipes.messaging.mqtt.MqttPublisher;
import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.MqttTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.node.NodeManager;
import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -47,6 +45,7 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,9 +57,11 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
private static final String HTTP_PROTOCOL = "http://";
private static final String COLON = ":";
private static final String SLASH = "/";
+ private static final String DOT = ".";
private static final String SP_URL = "SP_URL";
private static final String CONSUL_LOCATION = "CONSUL_LOCATION";
private static final String CONSUL_REGISTRATION_ROUTE = "v1/agent/service/register";
+ private static final String RECONFIGURATION_TOPIC = "org.apache.streampipes.control.event.reconfigure";
private static final int CONSUL_DEFAULT_PORT = 8500;
private static final Integer CONNECT_TIMEOUT = 10000;
private static InvocableElementManager instance = null;
@@ -132,35 +133,47 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
}
@Override
- public Response adapt(InvocableStreamPipesEntity graph, String reconfigurationEvent) {
- ObjectMapper mapper = new ObjectMapper();
- Response r = new Response();
- r.setElementId(graph.getElementId());
- r.setSuccess(false);
- try{
- TransportProtocol tp = mapper.readValue(mapper.writeValueAsString(graph.getInputStreams().get(0)
- .getEventGrounding().getTransportProtocol()), graph.getInputStreams().get(0)
- .getEventGrounding().getTransportProtocol().getClass());
- tp.getTopicDefinition().setActualTopicName("org.apache.streampipes.control.event.reconfigure."
- + graph.getDeploymentRunningInstanceId());
- EventProducer pub;
- if(tp instanceof KafkaTransportProtocol){
- pub = new SpKafkaProducer();
- pub.connect(tp);
- }else if (tp instanceof JmsTransportProtocol){
- pub = new ActiveMQPublisher();
- pub.connect(tp);
- } else{
- pub = new MqttPublisher();
- pub.connect(tp);
- }
- pub.publish(reconfigurationEvent.getBytes(StandardCharsets.UTF_8));
- pub.disconnect();
- r.setSuccess(true);
- } catch (JsonProcessingException e) {
- r.setOptionalMessage(e.getMessage());
+ public Response adapt(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEntity) {
+
+ Response response = new Response();
+ response.setElementId(graph.getElementId());
+ response.setSuccess(false);
+
+ EventProducer pub = getReconfigurationEventProducerFromInvocable(graph);
+
+ byte [] reconfigurationEvent = reconfigurationToByteArray(reconfigurationEntity);
+ pub.publish(reconfigurationEvent);
+ pub.disconnect();
+
+ response.setSuccess(true);
+ return response;
+ }
+
+ private EventProducer getReconfigurationEventProducerFromInvocable(InvocableStreamPipesEntity graph) {
+ TransportProtocol tp = getReconfigurationTransportProtocol(graph);
+ EventProducer pub;
+ if(tp instanceof KafkaTransportProtocol){
+ pub = new SpKafkaProducer();
+ pub.connect(tp);
+ } else if (tp instanceof JmsTransportProtocol){
+ pub = new ActiveMQPublisher();
+ pub.connect(tp);
+ } else{
+ pub = new MqttPublisher();
+ pub.connect(tp);
}
- return r;
+ return pub;
+ }
+
+ private byte[] reconfigurationToByteArray(PipelineElementReconfigurationEntity entity) {
+ Map<String, String> reconfigurationEventMap = new HashMap<>();
+ entity.getReconfiguredStaticProperties().forEach(staticProperty -> {
+ if (staticProperty instanceof FreeTextStaticProperty) {
+ reconfigurationEventMap.put(staticProperty.getInternalName(),
+ ((FreeTextStaticProperty) staticProperty).getValue());
+ }
+ });
+ return toJson(reconfigurationEventMap).getBytes(StandardCharsets.UTF_8);
}
private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
@@ -186,6 +199,16 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
.setSupportedElements(supportedPipelineElements);
}
+ private TransportProtocol getReconfigurationTransportProtocol(InvocableStreamPipesEntity graph) {
+ TransportProtocol tp = graph.getInputStreams().get(0).getEventGrounding().getTransportProtocol();
+ tp.setTopicDefinition(makeReconfigurationTopic(graph.getDeploymentRunningInstanceId()));
+ return tp;
+ }
+
+ private TopicDefinition makeReconfigurationTopic(String runningInstanceId) {
+ return new SimpleTopicDefinition( RECONFIGURATION_TOPIC + DOT + runningInstanceId);
+ }
+
private String generateBackendEndpoint() {
return HTTP_PROTOCOL
+ NodeControllerConfig.INSTANCE.backendLocation()
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index cdaf13e..5bdc6e6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.node.controller.container.management.pe;
import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
public interface PipelineElementLifeCycle {
@@ -29,7 +30,7 @@ public interface PipelineElementLifeCycle {
Response detach(String runningInstanceId);
- Response adapt(InvocableStreamPipesEntity graph, String reconfigurationEvent);
+ Response adapt(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEvent);
void unregister();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationEndpointUrlGenerator.java
index 0daa820..c7b0dae 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationEndpointUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationEndpointUrlGenerator.java
@@ -40,7 +40,9 @@ public class ReconfigurationEndpointUrlGenerator {
+ COLON
+ reconfigurationEntity.getDeploymentTargetNodePort()
+ BASE_ROUTE
- + RECONFIGURE_ROUTE;
+ + RECONFIGURE_ROUTE
+ + SLASH
+ + reconfigurationEntity.getDeploymentRunningInstanceId();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationSubmitter.java
index d4da1c2..2c108b1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationSubmitter.java
@@ -21,9 +21,6 @@ import com.google.gson.JsonSyntaxException;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index bf3383d..2b56c93 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -193,8 +193,7 @@ public class Operations {
return new PipelineElementMigrationHandler(desiredPipeline, visualize, storeStatus, monitor).handlePipelineMigration();
}
- public static PipelineOperationStatus handlePipelineElementReconfiguration(Pipeline reconfiguredPipeline,
- boolean storeStatus) {
- return new PipelineElementReconfigurationHandler(reconfiguredPipeline, storeStatus).handleReconfiguration();
+ public static PipelineOperationStatus handlePipelineElementReconfiguration(Pipeline reconfiguredPipeline) {
+ return new PipelineElementReconfigurationHandler(reconfiguredPipeline).handleReconfiguration();
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
index 223a85f..31ad843 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
@@ -38,13 +38,11 @@ public class PipelineElementReconfigurationHandler {
private final PipelineOperationStatus pipelineReconfigurationStatus;
private final Pipeline reconfiguredPipeline;
private Pipeline storedPipeline;
- private final boolean storeStatus;
- public PipelineElementReconfigurationHandler(Pipeline reconfiguredPipeline, boolean storeStatus) {
- this.pipelineReconfigurationStatus = new PipelineOperationStatus();
+ public PipelineElementReconfigurationHandler(Pipeline reconfiguredPipeline) {
this.reconfiguredPipeline = reconfiguredPipeline;
this.storedPipeline = getPipelineById(reconfiguredPipeline.getPipelineId());
- this.storeStatus = storeStatus;
+ this.pipelineReconfigurationStatus = initPipelineOperationStatus();
}
public PipelineOperationStatus handleReconfiguration() {
@@ -62,22 +60,12 @@ public class PipelineElementReconfigurationHandler {
entityStatus.getElementStatus().forEach(pipelineReconfigurationStatus::addPipelineElementStatus);
- // TODO needed ?
if (entityStatus.isSuccess()) {
- try {
- storedPipeline = deepCopyPipeline(reconfiguredPipeline);
- } catch (JsonProcessingException e) {
- throw new SpRuntimeException("Could not deep copy pipeline for reconfiguration: " + e.getMessage(),
- e);
- }
+ Operations.overwritePipeline(reconfiguredPipeline);
} else {
- //TODO: what to do when not successful?
+ //TODO: Send old/existing configuration
}
});
-
- if (storeStatus) {
- Operations.overwritePipeline(reconfiguredPipeline);
- }
}
private PipelineOperationStatus reconfigurePipelineElement(PipelineElementReconfigurationEntity entity) {
@@ -87,18 +75,15 @@ public class PipelineElementReconfigurationHandler {
private List<PipelineElementReconfigurationEntity> comparePipelinesAndGetReconfiguration() {
List<PipelineElementReconfigurationEntity> delta = new ArrayList<>();
- List<DataProcessorInvocation> reconfiguredGraphs = filterReconfigurableFsp(reconfiguredPipeline);
- List<DataProcessorInvocation> currentGraphs = filterReconfigurableFsp(storedPipeline);
-
- reconfiguredGraphs.forEach(reconfiguredProcessor -> currentGraphs.forEach(storedProcessor -> {
+ reconfiguredPipeline.getSepas().forEach(reconfiguredProcessor -> storedPipeline.getSepas().forEach(storedProcessor -> {
if (matchingElementIds(reconfiguredProcessor, storedProcessor)) {
List<StaticProperty> list = new ArrayList<>();
getReconfigurableFsp(reconfiguredProcessor).forEach(reconfiguredFsp ->
getReconfigurableFsp(storedProcessor).forEach(storedFsp -> {
- if (compareForChanges(reconfiguredFsp, storedFsp)) {
- list.add(reconfiguredFsp);
- }
- }));
+ if (compareForChanges(reconfiguredFsp, storedFsp)) {
+ list.add(reconfiguredFsp);
+ }
+ }));
PipelineElementReconfigurationEntity entity = reconfigurationEntity(reconfiguredProcessor, list);
if (list.size() > 0 && !exists(delta, entity)) {
delta.add(entity);
@@ -143,24 +128,6 @@ public class PipelineElementReconfigurationHandler {
return one.getElementId().equals(two.getElementId());
}
- private List<DataProcessorInvocation> filterReconfigurableFsp(Pipeline pipeline) {
- List<DataProcessorInvocation> filtered = new ArrayList<>();
- pipeline.getSepas().forEach(processor -> {
- List<StaticProperty> fsp = new ArrayList<>();
- processor.getStaticProperties().forEach(sp -> {
- if (sp instanceof FreeTextStaticProperty && ((FreeTextStaticProperty) sp).isReconfigurable()) {
- fsp.add(sp);
- }
- });
- processor.setStaticProperties(fsp);
- filtered.add(processor);
- });
-
- return filtered;
- }
-
-
-
// Helpers
private PipelineOperationStatus verifyPipelineReconfigurationStatus(PipelineOperationStatus status,
@@ -183,9 +150,11 @@ public class PipelineElementReconfigurationHandler {
return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
}
- public static Pipeline deepCopyPipeline(Pipeline object) throws JsonProcessingException {
- ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.readValue(objectMapper.writeValueAsString(object), Pipeline.class);
+ private PipelineOperationStatus initPipelineOperationStatus() {
+ PipelineOperationStatus status = new PipelineOperationStatus();
+ status.setPipelineId(reconfiguredPipeline.getPipelineId());
+ status.setPipelineName(reconfiguredPipeline.getName());
+ return status;
}
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index ee0559c..6286272 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -273,13 +273,8 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
public Response reconfigurePipelineElements(@PathParam("username") String username,
@PathParam("pipelineId") String pipelineId,
Pipeline reconfiguredPipeline) {
- // check which elements got adapted (stored vs new pipeline) for which deployment target
- // transform modified graphs (graphs = DataProcessorInvokation) to JSON payload (light):
- // { "id" : "<runningInstanceId"
- // POST /<runninginstanceid> with payload(JSON per modified graph)
- // lookup(runninginstanceid) -> pub.connect(broker) -> pub.publish(event)
try {
- return ok(Operations.handlePipelineElementReconfiguration(reconfiguredPipeline, true));
+ return ok(Operations.handlePipelineElementReconfiguration(reconfiguredPipeline));
} catch (Exception e) {
e.printStackTrace();
return statusMessage(Notifications.error(NotificationType.UNKNOWN_ERROR));
diff --git a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
index d9720ad..ea9a876 100644
--- a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
+++ b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
@@ -77,7 +77,7 @@
<div fxFlex="100" fxLayout="column" fxLayoutAlign="start stretch" style="margin-top:40px;">
<div fxFlex="100" class="assemblyOptions sp-blue-bg" style="padding:5px;">
<div fxLayout="row" fxLayoutAlign="start center" fxFlex="100">
- <h4>Protocols</h4>
+ <h4>Cloud Protocols</h4>
<span flex></span>
</div>
</div>
@@ -93,6 +93,26 @@
</button>
</div>
</div>
+ </div>
+ <div fxFlex="100" fxLayout="column" fxLayoutAlign="start stretch" style="margin-top:40px;">
+ <div fxFlex="100" class="assemblyOptions sp-blue-bg" style="padding:5px;">
+ <div fxLayout="row" fxLayoutAlign="start center" fxFlex="100">
+ <h4>Edge Protocol</h4>
+ <span flex></span>
+ </div>
+ </div>
+ <div fxFlex="100" fxLayout="column" fxLayoutAlign="start start" class="sp-blue-border page-container-padding-inner">
+ <div cdkDropList class="data-format-list" (cdkDropListDropped)="dropProtocol($event)"
+ *ngIf="loadingCompleted">
+ <div class="data-format-box"cdkDrag>
+ {{messagingSettings.edgeNodeProtocol}}
+ </div>
+ </div>
+ <div fxLayoutAlign="end center">
+<!-- <button mat-raised-button color="primary" type="submit" class="md-raised md-primary submit-button" (click)="updateMessagingSettings()">Update-->
+<!-- </button>-->
+ </div>
+ </div>
</div>
</div>
\ No newline at end of file
diff --git a/ui/src/app/configuration/shared/messaging-settings.model.ts b/ui/src/app/configuration/shared/messaging-settings.model.ts
index 501e63e..a33ce26 100644
--- a/ui/src/app/configuration/shared/messaging-settings.model.ts
+++ b/ui/src/app/configuration/shared/messaging-settings.model.ts
@@ -25,4 +25,5 @@ export interface MessagingSettings {
prioritizedFormats: [string];
prioritizedProtocols: [string];
+ edgeNodeProtocol: string;
}
\ No newline at end of file
diff --git a/ui/src/app/connect/services/data-marketplace.service.ts b/ui/src/app/connect/services/data-marketplace.service.ts
index d0a43ab..5664b83 100644
--- a/ui/src/app/connect/services/data-marketplace.service.ts
+++ b/ui/src/app/connect/services/data-marketplace.service.ts
@@ -148,6 +148,12 @@ export class DataMarketplaceService {
newAdapterDescription.includedLocales = protocol.includedLocales;
newAdapterDescription.includesLocales = protocol.includesLocales;
+ newAdapterDescription.deploymentTargetNodeId = protocol.deploymentTargetNodeId;
+ newAdapterDescription.deploymentTargetNodeHostname = protocol.deploymentTargetNodeHostname;
+ newAdapterDescription.deploymentTargetNodePort = protocol.deploymentTargetNodePort;
+ newAdapterDescription.elementEndpointHostname = protocol.elementEndpointHostname;
+ newAdapterDescription.elementEndpointPort = protocol.elementEndpointPort;
+
if (
newAdapterDescription instanceof GenericAdapterSetDescription ||
newAdapterDescription instanceof GenericAdapterStreamDescription
diff --git a/ui/src/app/pipeline-details/components/edit/quickedit.component.ts b/ui/src/app/pipeline-details/components/edit/quickedit.component.ts
index dee5d31..3e9e256 100644
--- a/ui/src/app/pipeline-details/components/edit/quickedit.component.ts
+++ b/ui/src/app/pipeline-details/components/edit/quickedit.component.ts
@@ -30,10 +30,14 @@ import {
DataProcessorInvocation,
DataSinkInvocation,
EventSchema,
- Pipeline
+ Pipeline, PipelineOperationStatus
} from "../../../core-model/gen/streampipes-model";
import {PipelineElementUnion} from "../../../editor/model/editor.model";
import {FormBuilder, FormGroup} from "@angular/forms";
+import {PipelineStatusDialogComponent} from "../../../pipelines/dialog/pipeline-status/pipeline-status-dialog.component";
+import {PanelType} from "../../../core-ui/dialog/base-dialog/base-dialog.model";
+import {DialogService} from "../../../core-ui/dialog/base-dialog/base-dialog.service";
+import {DialogRef} from "../../../core-ui/dialog/base-dialog/dialog-ref";
@Component({
selector: 'quick-edit',
@@ -59,10 +63,12 @@ export class QuickEditComponent implements OnInit, AfterViewInit{
isDataProcessor: boolean = false;
pipelineUpdating: boolean = false;
+ pipelineReconfiguration: boolean = false;
constructor(private pipelineService: PipelineService,
private fb: FormBuilder,
- private changeDetectorRef: ChangeDetectorRef) {
+ private changeDetectorRef: ChangeDetectorRef,
+ private dialogService: DialogService) {
}
@@ -148,12 +154,38 @@ export class QuickEditComponent implements OnInit, AfterViewInit{
}
reconfigurePipeline() {
- this.pipelineUpdating = true;
+ this.pipelineReconfiguration = true;
this.updatePipelineElement();
- this.pipelineService.reconfigurePipeline(this.pipeline).subscribe(data => {
- this.reloadPipelineEmitter.emit();
- this.pipelineUpdating = false;
+ this.pipelineService.reconfigurePipeline(this.pipeline).subscribe(statusMessage => {
+ if (statusMessage.success) {
+ this.afterReconfiguration(statusMessage, this.pipeline._id)
+ this.reloadPipelineEmitter.emit();
+ this.pipelineReconfiguration = false;
+ } else {
+ this.displayErrors(statusMessage);
+ }
+ }, data => {
+ this.displayErrors();
+ });
+ }
+
+ afterReconfiguration(statusMessage: PipelineOperationStatus, pipelineId?: string) {
+ this.showDialog(statusMessage);
+ }
+
+ showDialog(data: PipelineOperationStatus) {
+ this.dialogService.open(PipelineStatusDialogComponent, {
+ panelType: PanelType.STANDARD_PANEL,
+ title: "Pipeline Status",
+ width: "70vw",
+ data: {
+ "pipelineOperationStatus": data
+ }
});
+ };
+
+ displayErrors(statusMessage?: PipelineOperationStatus) {
+ this.showDialog(statusMessage);
}
}
diff --git a/ui/src/app/platform-services/apis/pipeline.service.ts b/ui/src/app/platform-services/apis/pipeline.service.ts
index fac5f2e..77c67fd 100644
--- a/ui/src/app/platform-services/apis/pipeline.service.ts
+++ b/ui/src/app/platform-services/apis/pipeline.service.ts
@@ -110,11 +110,11 @@ export class PipelineService {
}));
}
- reconfigurePipeline(pipeline: Pipeline): Observable<PipelineElementStatus> {
+ reconfigurePipeline(pipeline: Pipeline): Observable<PipelineOperationStatus> {
var pipelineId = pipeline._id;
return this.http.put(this.platformServicesCommons.authUserBasePath() + "/pipelines/reconfigure/" + pipelineId, pipeline)
.pipe(map(response => {
- return PipelineElementStatus.fromData(response as PipelineElementStatus);
+ return PipelineOperationStatus.fromData(response as PipelineOperationStatus);
}));
}
}
\ No newline at end of file