You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/15 14:15:55 UTC

[incubator-streampipes] 02/02: [WIP] pipeline element live reconfiguration

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 56656c335c9e871ef3ea1d3a7d4b1656b9f3ec0c
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Mar 15 15:12:47 2021 +0100

    [WIP] pipeline element live reconfiguration
---
 .../container/api/InvocableEntityResource.java     | 11 ++-
 .../management/pe/InvocableElementManager.java     | 89 ++++++++++++++--------
 .../management/pe/PipelineElementLifeCycle.java    |  3 +-
 .../http/ReconfigurationEndpointUrlGenerator.java  |  4 +-
 .../execution/http/ReconfigurationSubmitter.java   |  3 -
 .../streampipes/manager/operations/Operations.java |  5 +-
 .../PipelineElementReconfigurationHandler.java     | 59 ++++----------
 .../streampipes/rest/impl/PipelineResource.java    |  7 +-
 .../messaging-configuration.component.html         | 22 +++++-
 .../shared/messaging-settings.model.ts             |  1 +
 .../connect/services/data-marketplace.service.ts   |  6 ++
 .../components/edit/quickedit.component.ts         | 44 +++++++++--
 .../app/platform-services/apis/pipeline.service.ts |  4 +-
 13 files changed, 151 insertions(+), 107 deletions(-)

diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 8397373..e54e6dd 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -32,6 +32,7 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
 import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -99,14 +100,12 @@ public class InvocableEntityResource extends AbstractResource {
     // Adaptation
     @POST
     @JacksonSerialized
-    @Path("{identifier}/{elementId}/{runningInstanceId}/adapt")
+    @Path("/adapt/{runningInstanceId}")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public javax.ws.rs.core.Response adapt(@PathParam("identifier") String identifier,
-                                           @PathParam("elementId") String elementId,
-                                           @PathParam("runningInstanceId") String runningInstanceId,
-                                           String payload) {
+    public javax.ws.rs.core.Response adapt( @PathParam("runningInstanceId") String runningInstanceId,
+                                            PipelineElementReconfigurationEntity reconfigurationEntity) {
         InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
-        return ok(InvocableElementManager.getInstance().adapt(graph, payload));
+        return ok(InvocableElementManager.getInstance().adapt(graph, reconfigurationEntity));
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 9d627c9..e15fd5c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -18,7 +18,6 @@
 package org.apache.streampipes.node.controller.container.management.pe;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
@@ -31,11 +30,10 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 import org.apache.streampipes.messaging.mqtt.MqttPublisher;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.MqttTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.node.NodeManager;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -47,6 +45,7 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -58,9 +57,11 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
     private static final String HTTP_PROTOCOL = "http://";
     private static final String COLON = ":";
     private static final String SLASH = "/";
+    private static final String DOT = ".";
     private static final String SP_URL = "SP_URL";
     private static final String CONSUL_LOCATION = "CONSUL_LOCATION";
     private static final String CONSUL_REGISTRATION_ROUTE = "v1/agent/service/register";
+    private static final String RECONFIGURATION_TOPIC = "org.apache.streampipes.control.event.reconfigure";
     private static final int CONSUL_DEFAULT_PORT = 8500;
     private static final Integer CONNECT_TIMEOUT = 10000;
     private static InvocableElementManager instance = null;
@@ -132,35 +133,47 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
     }
 
     @Override
-    public Response adapt(InvocableStreamPipesEntity graph, String reconfigurationEvent) {
-        ObjectMapper mapper = new ObjectMapper();
-        Response r = new Response();
-        r.setElementId(graph.getElementId());
-        r.setSuccess(false);
-        try{
-            TransportProtocol tp = mapper.readValue(mapper.writeValueAsString(graph.getInputStreams().get(0)
-                    .getEventGrounding().getTransportProtocol()), graph.getInputStreams().get(0)
-                    .getEventGrounding().getTransportProtocol().getClass());
-            tp.getTopicDefinition().setActualTopicName("org.apache.streampipes.control.event.reconfigure."
-                    + graph.getDeploymentRunningInstanceId());
-            EventProducer pub;
-            if(tp instanceof KafkaTransportProtocol){
-                pub = new SpKafkaProducer();
-                pub.connect(tp);
-            }else if (tp instanceof JmsTransportProtocol){
-                pub = new ActiveMQPublisher();
-                pub.connect(tp);
-            } else{
-                pub = new MqttPublisher();
-                pub.connect(tp);
-            }
-            pub.publish(reconfigurationEvent.getBytes(StandardCharsets.UTF_8));
-            pub.disconnect();
-            r.setSuccess(true);
-        } catch (JsonProcessingException e) {
-            r.setOptionalMessage(e.getMessage());
+    public Response adapt(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEntity) {
+
+        Response response = new Response();
+        response.setElementId(graph.getElementId());
+        response.setSuccess(false);
+
+        EventProducer pub = getReconfigurationEventProducerFromInvocable(graph);
+
+        byte [] reconfigurationEvent = reconfigurationToByteArray(reconfigurationEntity);
+        pub.publish(reconfigurationEvent);
+        pub.disconnect();
+
+        response.setSuccess(true);
+        return response;
+    }
+
+    private EventProducer getReconfigurationEventProducerFromInvocable(InvocableStreamPipesEntity graph) {
+        TransportProtocol tp = getReconfigurationTransportProtocol(graph);
+        EventProducer pub;
+        if(tp instanceof KafkaTransportProtocol){
+            pub = new SpKafkaProducer();
+            pub.connect(tp);
+        } else if (tp instanceof JmsTransportProtocol){
+            pub = new ActiveMQPublisher();
+            pub.connect(tp);
+        } else{
+            pub = new MqttPublisher();
+            pub.connect(tp);
         }
-        return r;
+        return pub;
+    }
+
+    private byte[] reconfigurationToByteArray(PipelineElementReconfigurationEntity entity) {
+        Map<String, String> reconfigurationEventMap = new HashMap<>();
+        entity.getReconfiguredStaticProperties().forEach(staticProperty -> {
+            if (staticProperty instanceof FreeTextStaticProperty) {
+                reconfigurationEventMap.put(staticProperty.getInternalName(),
+                        ((FreeTextStaticProperty) staticProperty).getValue());
+            }
+        });
+        return toJson(reconfigurationEventMap).getBytes(StandardCharsets.UTF_8);
     }
 
     private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
@@ -186,6 +199,16 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
                 .setSupportedElements(supportedPipelineElements);
     }
 
+    private TransportProtocol getReconfigurationTransportProtocol(InvocableStreamPipesEntity graph) {
+        TransportProtocol tp = graph.getInputStreams().get(0).getEventGrounding().getTransportProtocol();
+        tp.setTopicDefinition(makeReconfigurationTopic(graph.getDeploymentRunningInstanceId()));
+        return tp;
+    }
+
+    private TopicDefinition makeReconfigurationTopic(String runningInstanceId) {
+        return new SimpleTopicDefinition( RECONFIGURATION_TOPIC + DOT + runningInstanceId);
+    }
+
     private String generateBackendEndpoint() {
         return HTTP_PROTOCOL
                 + NodeControllerConfig.INSTANCE.backendLocation()
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index cdaf13e..5bdc6e6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.node.controller.container.management.pe;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
 
 public interface PipelineElementLifeCycle {
 
@@ -29,7 +30,7 @@ public interface PipelineElementLifeCycle {
 
     Response detach(String runningInstanceId);
 
-    Response adapt(InvocableStreamPipesEntity graph, String reconfigurationEvent);
+    Response adapt(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEvent);
 
     void unregister();
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationEndpointUrlGenerator.java
index 0daa820..c7b0dae 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationEndpointUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationEndpointUrlGenerator.java
@@ -40,7 +40,9 @@ public class ReconfigurationEndpointUrlGenerator {
                 + COLON
                 + reconfigurationEntity.getDeploymentTargetNodePort()
                 + BASE_ROUTE
-                + RECONFIGURE_ROUTE;
+                + RECONFIGURE_ROUTE
+                + SLASH
+                + reconfigurationEntity.getDeploymentRunningInstanceId();
 
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationSubmitter.java
index d4da1c2..2c108b1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ReconfigurationSubmitter.java
@@ -21,9 +21,6 @@ import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index bf3383d..2b56c93 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -193,8 +193,7 @@ public class Operations {
     return new PipelineElementMigrationHandler(desiredPipeline, visualize, storeStatus, monitor).handlePipelineMigration();
   }
 
-  public static PipelineOperationStatus handlePipelineElementReconfiguration(Pipeline reconfiguredPipeline,
-                                                                             boolean storeStatus) {
-    return new PipelineElementReconfigurationHandler(reconfiguredPipeline, storeStatus).handleReconfiguration();
+  public static PipelineOperationStatus handlePipelineElementReconfiguration(Pipeline reconfiguredPipeline) {
+    return new PipelineElementReconfigurationHandler(reconfiguredPipeline).handleReconfiguration();
   }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
index 223a85f..31ad843 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
@@ -38,13 +38,11 @@ public class PipelineElementReconfigurationHandler {
     private final PipelineOperationStatus pipelineReconfigurationStatus;
     private final Pipeline reconfiguredPipeline;
     private Pipeline storedPipeline;
-    private final boolean storeStatus;
 
-    public PipelineElementReconfigurationHandler(Pipeline reconfiguredPipeline, boolean storeStatus) {
-        this.pipelineReconfigurationStatus = new PipelineOperationStatus();
+    public PipelineElementReconfigurationHandler(Pipeline reconfiguredPipeline) {
         this.reconfiguredPipeline = reconfiguredPipeline;
         this.storedPipeline = getPipelineById(reconfiguredPipeline.getPipelineId());
-        this.storeStatus = storeStatus;
+        this.pipelineReconfigurationStatus = initPipelineOperationStatus();
     }
 
     public PipelineOperationStatus handleReconfiguration() {
@@ -62,22 +60,12 @@ public class PipelineElementReconfigurationHandler {
 
             entityStatus.getElementStatus().forEach(pipelineReconfigurationStatus::addPipelineElementStatus);
 
-            // TODO needed ?
             if (entityStatus.isSuccess()) {
-                try {
-                    storedPipeline = deepCopyPipeline(reconfiguredPipeline);
-                } catch (JsonProcessingException e) {
-                    throw new SpRuntimeException("Could not deep copy pipeline for reconfiguration: " + e.getMessage(),
-                            e);
-                }
+                Operations.overwritePipeline(reconfiguredPipeline);
             } else {
-                //TODO: what to do when not successful?
+                //TODO: Send old/existing configuration
             }
         });
-
-        if (storeStatus) {
-            Operations.overwritePipeline(reconfiguredPipeline);
-        }
     }
 
     private PipelineOperationStatus reconfigurePipelineElement(PipelineElementReconfigurationEntity entity) {
@@ -87,18 +75,15 @@ public class PipelineElementReconfigurationHandler {
     private List<PipelineElementReconfigurationEntity> comparePipelinesAndGetReconfiguration() {
         List<PipelineElementReconfigurationEntity> delta = new ArrayList<>();
 
-        List<DataProcessorInvocation> reconfiguredGraphs = filterReconfigurableFsp(reconfiguredPipeline);
-        List<DataProcessorInvocation> currentGraphs = filterReconfigurableFsp(storedPipeline);
-
-        reconfiguredGraphs.forEach(reconfiguredProcessor -> currentGraphs.forEach(storedProcessor -> {
+        reconfiguredPipeline.getSepas().forEach(reconfiguredProcessor -> storedPipeline.getSepas().forEach(storedProcessor -> {
             if (matchingElementIds(reconfiguredProcessor, storedProcessor)) {
                 List<StaticProperty> list = new ArrayList<>();
                 getReconfigurableFsp(reconfiguredProcessor).forEach(reconfiguredFsp ->
                         getReconfigurableFsp(storedProcessor).forEach(storedFsp -> {
-                    if (compareForChanges(reconfiguredFsp, storedFsp)) {
-                        list.add(reconfiguredFsp);
-                    }
-                }));
+                            if (compareForChanges(reconfiguredFsp, storedFsp)) {
+                                list.add(reconfiguredFsp);
+                            }
+                        }));
                 PipelineElementReconfigurationEntity entity = reconfigurationEntity(reconfiguredProcessor, list);
                 if (list.size() > 0 && !exists(delta, entity)) {
                     delta.add(entity);
@@ -143,24 +128,6 @@ public class PipelineElementReconfigurationHandler {
         return one.getElementId().equals(two.getElementId());
     }
 
-    private List<DataProcessorInvocation> filterReconfigurableFsp(Pipeline pipeline) {
-        List<DataProcessorInvocation> filtered = new ArrayList<>();
-        pipeline.getSepas().forEach(processor -> {
-            List<StaticProperty> fsp = new ArrayList<>();
-            processor.getStaticProperties().forEach(sp -> {
-                if (sp instanceof FreeTextStaticProperty && ((FreeTextStaticProperty) sp).isReconfigurable()) {
-                    fsp.add(sp);
-                }
-            });
-            processor.setStaticProperties(fsp);
-            filtered.add(processor);
-        });
-
-        return filtered;
-    }
-
-
-
     // Helpers
 
     private PipelineOperationStatus verifyPipelineReconfigurationStatus(PipelineOperationStatus status,
@@ -183,9 +150,11 @@ public class PipelineElementReconfigurationHandler {
         return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
     }
 
-    public static Pipeline deepCopyPipeline(Pipeline object) throws JsonProcessingException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        return objectMapper.readValue(objectMapper.writeValueAsString(object), Pipeline.class);
+    private PipelineOperationStatus initPipelineOperationStatus() {
+        PipelineOperationStatus status = new PipelineOperationStatus();
+        status.setPipelineId(reconfiguredPipeline.getPipelineId());
+        status.setPipelineName(reconfiguredPipeline.getName());
+        return status;
     }
 
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index ee0559c..6286272 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -273,13 +273,8 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   public Response reconfigurePipelineElements(@PathParam("username") String username,
                                     @PathParam("pipelineId") String pipelineId,
                                     Pipeline reconfiguredPipeline) {
-    // check which elements got adapted (stored vs new pipeline) for which deployment target
-    // transform modified graphs (graphs = DataProcessorInvokation) to JSON payload (light):
-    // { "id" : "<runningInstanceId"
-    // POST /<runninginstanceid> with payload(JSON per modified graph)
-    // lookup(runninginstanceid) -> pub.connect(broker) -> pub.publish(event)
     try {
-      return ok(Operations.handlePipelineElementReconfiguration(reconfiguredPipeline, true));
+      return ok(Operations.handlePipelineElementReconfiguration(reconfiguredPipeline));
     } catch (Exception e) {
       e.printStackTrace();
       return statusMessage(Notifications.error(NotificationType.UNKNOWN_ERROR));
diff --git a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
index d9720ad..ea9a876 100644
--- a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
+++ b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
@@ -77,7 +77,7 @@
     <div fxFlex="100" fxLayout="column" fxLayoutAlign="start stretch" style="margin-top:40px;">
         <div fxFlex="100" class="assemblyOptions sp-blue-bg" style="padding:5px;">
             <div fxLayout="row" fxLayoutAlign="start center" fxFlex="100">
-                <h4>Protocols</h4>
+                <h4>Cloud Protocols</h4>
                 <span flex></span>
             </div>
         </div>
@@ -93,6 +93,26 @@
                 </button>
             </div>
         </div>
+    </div>
+    <div fxFlex="100" fxLayout="column" fxLayoutAlign="start stretch" style="margin-top:40px;">
+        <div fxFlex="100" class="assemblyOptions sp-blue-bg" style="padding:5px;">
+            <div fxLayout="row" fxLayoutAlign="start center" fxFlex="100">
+                <h4>Edge Protocol</h4>
+                <span flex></span>
+            </div>
+        </div>
+        <div fxFlex="100" fxLayout="column" fxLayoutAlign="start start" class="sp-blue-border page-container-padding-inner">
+            <div cdkDropList class="data-format-list" (cdkDropListDropped)="dropProtocol($event)"
+                 *ngIf="loadingCompleted">
+                <div class="data-format-box"cdkDrag>
+                    {{messagingSettings.edgeNodeProtocol}}
+                </div>
+            </div>
+            <div fxLayoutAlign="end center">
+<!--                <button mat-raised-button color="primary" type="submit" class="md-raised md-primary submit-button" (click)="updateMessagingSettings()">Update-->
+<!--                </button>-->
+            </div>
+        </div>
 
     </div>
 </div>
\ No newline at end of file
diff --git a/ui/src/app/configuration/shared/messaging-settings.model.ts b/ui/src/app/configuration/shared/messaging-settings.model.ts
index 501e63e..a33ce26 100644
--- a/ui/src/app/configuration/shared/messaging-settings.model.ts
+++ b/ui/src/app/configuration/shared/messaging-settings.model.ts
@@ -25,4 +25,5 @@ export interface MessagingSettings {
 
    prioritizedFormats: [string];
    prioritizedProtocols: [string];
+   edgeNodeProtocol: string;
 }
\ No newline at end of file
diff --git a/ui/src/app/connect/services/data-marketplace.service.ts b/ui/src/app/connect/services/data-marketplace.service.ts
index d0a43ab..5664b83 100644
--- a/ui/src/app/connect/services/data-marketplace.service.ts
+++ b/ui/src/app/connect/services/data-marketplace.service.ts
@@ -148,6 +148,12 @@ export class DataMarketplaceService {
             newAdapterDescription.includedLocales = protocol.includedLocales;
             newAdapterDescription.includesLocales = protocol.includesLocales;
 
+            newAdapterDescription.deploymentTargetNodeId = protocol.deploymentTargetNodeId;
+            newAdapterDescription.deploymentTargetNodeHostname = protocol.deploymentTargetNodeHostname;
+            newAdapterDescription.deploymentTargetNodePort = protocol.deploymentTargetNodePort;
+            newAdapterDescription.elementEndpointHostname = protocol.elementEndpointHostname;
+            newAdapterDescription.elementEndpointPort = protocol.elementEndpointPort;
+
             if (
                 newAdapterDescription instanceof GenericAdapterSetDescription ||
                 newAdapterDescription instanceof GenericAdapterStreamDescription
diff --git a/ui/src/app/pipeline-details/components/edit/quickedit.component.ts b/ui/src/app/pipeline-details/components/edit/quickedit.component.ts
index dee5d31..3e9e256 100644
--- a/ui/src/app/pipeline-details/components/edit/quickedit.component.ts
+++ b/ui/src/app/pipeline-details/components/edit/quickedit.component.ts
@@ -30,10 +30,14 @@ import {
     DataProcessorInvocation,
     DataSinkInvocation,
     EventSchema,
-    Pipeline
+    Pipeline, PipelineOperationStatus
 } from "../../../core-model/gen/streampipes-model";
 import {PipelineElementUnion} from "../../../editor/model/editor.model";
 import {FormBuilder, FormGroup} from "@angular/forms";
+import {PipelineStatusDialogComponent} from "../../../pipelines/dialog/pipeline-status/pipeline-status-dialog.component";
+import {PanelType} from "../../../core-ui/dialog/base-dialog/base-dialog.model";
+import {DialogService} from "../../../core-ui/dialog/base-dialog/base-dialog.service";
+import {DialogRef} from "../../../core-ui/dialog/base-dialog/dialog-ref";
 
 @Component({
     selector: 'quick-edit',
@@ -59,10 +63,12 @@ export class QuickEditComponent implements OnInit, AfterViewInit{
     isDataProcessor: boolean = false;
 
     pipelineUpdating: boolean = false;
+    pipelineReconfiguration: boolean = false;
 
     constructor(private pipelineService: PipelineService,
                 private fb: FormBuilder,
-                private changeDetectorRef: ChangeDetectorRef) {
+                private changeDetectorRef: ChangeDetectorRef,
+                private dialogService: DialogService) {
 
     }
 
@@ -148,12 +154,38 @@ export class QuickEditComponent implements OnInit, AfterViewInit{
     }
 
     reconfigurePipeline() {
-        this.pipelineUpdating = true;
+        this.pipelineReconfiguration = true;
         this.updatePipelineElement();
-        this.pipelineService.reconfigurePipeline(this.pipeline).subscribe(data => {
-            this.reloadPipelineEmitter.emit();
-            this.pipelineUpdating = false;
+        this.pipelineService.reconfigurePipeline(this.pipeline).subscribe(statusMessage => {
+            if (statusMessage.success) {
+                this.afterReconfiguration(statusMessage, this.pipeline._id)
+                this.reloadPipelineEmitter.emit();
+                this.pipelineReconfiguration = false;
+            } else {
+                this.displayErrors(statusMessage);
+            }
+        }, data => {
+            this.displayErrors();
+        });
+    }
+
+    afterReconfiguration(statusMessage: PipelineOperationStatus, pipelineId?: string) {
+        this.showDialog(statusMessage);
+    }
+
+    showDialog(data: PipelineOperationStatus) {
+        this.dialogService.open(PipelineStatusDialogComponent, {
+            panelType: PanelType.STANDARD_PANEL,
+            title: "Pipeline Status",
+            width: "70vw",
+            data: {
+                "pipelineOperationStatus": data
+            }
         });
+    };
+
+    displayErrors(statusMessage?: PipelineOperationStatus) {
+        this.showDialog(statusMessage);
     }
 }
 
diff --git a/ui/src/app/platform-services/apis/pipeline.service.ts b/ui/src/app/platform-services/apis/pipeline.service.ts
index fac5f2e..77c67fd 100644
--- a/ui/src/app/platform-services/apis/pipeline.service.ts
+++ b/ui/src/app/platform-services/apis/pipeline.service.ts
@@ -110,11 +110,11 @@ export class PipelineService {
         }));
   }
 
-  reconfigurePipeline(pipeline: Pipeline): Observable<PipelineElementStatus> {
+  reconfigurePipeline(pipeline: Pipeline): Observable<PipelineOperationStatus> {
     var pipelineId = pipeline._id;
     return this.http.put(this.platformServicesCommons.authUserBasePath() + "/pipelines/reconfigure/" + pipelineId, pipeline)
         .pipe(map(response => {
-          return PipelineElementStatus.fromData(response as PipelineElementStatus);
+          return PipelineOperationStatus.fromData(response as PipelineOperationStatus);
         }));
   }
 }
\ No newline at end of file