You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/12/24 15:23:46 UTC

[incubator-streampipes] branch edge-extensions updated: [WIP] add ui part for processor live-migration, add backend migration endpoint

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new b94f09b  [WIP] add ui part for processor live-migration, add backend migration endpoint
b94f09b is described below

commit b94f09b7ddf9272e886dbb04fbc0943903fe6f1a
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Dec 24 16:23:29 2020 +0100

    [WIP] add ui part for processor live-migration, add backend migration endpoint
---
 .../org/apache/streampipes/rest/api/IPipeline.java |   2 +
 .../rest/impl/PipelineWithUserResource.java        |  19 ++
 .../migrate-pipeline-processors.component.html     | 172 +++++++++++++++++
 .../migrate-pipeline-processors.component.scss     |  98 ++++++++++
 .../migrate-pipeline-processors.component.ts       | 203 +++++++++++++++++++++
 ui/src/app/editor/editor.module.ts                 |   4 +-
 .../pipeline-overview.component.html               |   2 +-
 .../pipeline-overview.component.ts                 |   1 +
 .../services/pipeline-operations.service.ts        |  13 +-
 .../app/platform-services/apis/pipeline.service.ts |   8 +
 10 files changed, 517 insertions(+), 5 deletions(-)

diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipeline.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipeline.java
index 14a1292..d40afe6 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipeline.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipeline.java
@@ -40,4 +40,6 @@ public interface IPipeline extends IPipelineElement {
   Response overwritePipeline(String username, String pipelineId, Pipeline pipeline);
 
   Response getPipelineStatus(String username, String pipelineId);
+
+  Response migratePipelineProcessors(String username, String pipelineId, Pipeline pipeline);
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
index 6be4d59..b7316a9 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
@@ -248,4 +248,23 @@ public class PipelineWithUserResource extends AbstractRestInterface implements I
         return statusMessage(Notifications.success("Pipeline modified"));
     }
 
+    @POST
+    @Path("/migrate/{pipelineId}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @GsonWithIds
+    @Override
+    public Response migratePipelineProcessors(@PathParam("username") String username,
+                                              @PathParam("pipelineId") String pipelineId,
+                                              Pipeline pipeline) {
+        Pipeline storedPipeline = getPipelineStorage().getPipeline(pipelineId);
+
+//        storedPipeline.setActions(pipeline.getActions());
+//        storedPipeline.setSepas(pipeline.getSepas());
+//        storedPipeline.setActions(pipeline.getActions());
+//        storedPipeline.setCreatedAt(System.currentTimeMillis());
+//        storedPipeline.setPipelineCategories(pipeline.getPipelineCategories());
+//        Operations.updatePipeline(storedPipeline);
+        return statusMessage(Notifications.success("Pipeline processors migrated"));
+    }
+
 }
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.html b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.html
new file mode 100644
index 0000000..88cf5bd
--- /dev/null
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.html
@@ -0,0 +1,172 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<div class="sp-dialog-container">
+    <div class="sp-dialog-content padding-20">
+        <div fxFlex="100" fxLayout="column">
+            <div fxFlex="100" fxLayout="column" *ngIf="!saved && !saving && !storageError">
+                <form [formGroup]="submitPipelineForm">
+                    <div fxFlex="100" fxLayout="column" *ngIf="!currentModifiedPipelineId || updateMode=='clone'">
+                        <mat-form-field fxFlex><mat-label>Pipeline Name</mat-label>
+                            <input [formControlName]="'pipelineName'" matInput name="pipelineName" readonly/>
+                        </mat-form-field>
+                        <mat-form-field fxFlex><mat-label>Description</mat-label>
+                            <input [formControlName]="'pipelineDescription'" matInput readonly/>
+                        </mat-form-field>
+                    </div>
+                </form>
+                <mat-slide-toggle color="primary" [(ngModel)]="advancedSettings">
+                    Configure deployment options
+                </mat-slide-toggle>
+                <mat-divider *ngIf="advancedSettings" style="margin: 1em 0 1em 0;"></mat-divider>
+                <div *ngIf="advancedSettings">
+                    <div>
+                        <b>Pipeline Operation Policies</b>
+                    </div>
+
+                    <div style="margin-top: 2em">
+                        <div fxFlex="100" fxLayout="row">
+                            <div fxFlex="50" fxLayout="row" fxLayoutAlign="left top">
+                                <span>Relay Policy</span>
+                            </div>
+                        </div>
+                        <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">
+                            <mat-button-toggle-group #relayGroup="matButtonToggleGroup" aria-label="Relay strategy"
+                                                     [disabled]="true"
+                                                     [value]="selectedRelayStrategyVal"
+                                                     (change)="onSelectedRelayStrategyChange(relayGroup.value)">
+                                <mat-button-toggle value="purge">purge</mat-button-toggle>
+                                <mat-button-toggle value="buffer">buffer</mat-button-toggle>
+                            </mat-button-toggle-group>
+                        </div>
+                    </div>
+
+                    <div style="margin-top: 1em">
+                        <div fxFlex="100" fxLayout="row">
+                            <div fxFlex="50" fxLayout="row" fxLayoutAlign="left top">
+                                <span>Execution Policy</span>
+                            </div>
+                        </div>
+                        <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">
+                            <mat-radio-group
+                                    aria-labelledby="execution-policy-radio-group-label"
+                                    class="execution-policy-radio-group"
+                                    [(ngModel)]="selectedPipelineExecutionPolicy"
+                                    (change)="onExecutionPolicyChange($event.value)">
+                                <mat-radio-button class="execution-policy-radio-button"
+                                                  *ngFor="let policy of pipelineExecutionPolicies"
+                                                  [disabled]="isExecutinoPolicyDisabled()"
+                                                  [value]="policy">{{policy}}
+                                </mat-radio-button>
+                            </mat-radio-group>
+                        </div>
+                    </div>
+
+                    <div style="margin: 1em 0 1em 0;">
+                        <div>
+                            <b>Node Execution Targets</b>
+                        </div>
+                        <div style="margin-top: 2em">
+                            <mat-accordion class="example-headers-align">
+
+                                <mat-expansion-panel (opened)="panelOpenState = true"
+                                                     (closed)="panelOpenState = false"
+                                                     [expanded]="panelOpenState">
+                                    <mat-expansion-panel-header>
+                                        <mat-panel-description>
+                                            Modify deployment targets: <b>{{selectedPipelineExecutionPolicy}}</b>
+                                            <mat-icon>storage</mat-icon>
+                                        </mat-panel-description>
+                                    </mat-expansion-panel-header>
+
+                                    <div *ngFor="let processors of tmpPipeline.sepas">
+                                        <div fxFlex="100" fxLayout="row">
+                                            <div fxFlex="50" fxLayout="row" fxLayoutAlign="left center">
+                                                <span>{{processors.name}}</span>
+                                            </div>
+                                        </div>
+
+                                        <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">
+                                            <mat-form-field dense>
+                                                <mat-select [(ngModel)]="processors.deploymentTargetNodeId"
+                                                            [disabled]="disableNodeSelectionForProcessors.value"
+                                                            required>
+                                                    <mat-option [value]="nodeInfo.nodeControllerId"
+                                                                *ngFor="let nodeInfo of deploymentOptions[processors.appId]">
+                                                        <em>{{nodeInfo.hostname}}</em>
+                                                    </mat-option>
+                                                </mat-select>
+                                            </mat-form-field>
+                                        </div>
+
+                                    </div>
+                                    <div *ngFor="let sinks of tmpPipeline.actions">
+                                        <div fxFlex="100" fxLayout="row">
+                                            <div fxFlex="50" fxLayout="row" fxLayoutAlign="left center">
+                                                <span>{{sinks.name}}</span>
+                                            </div>
+                                        </div>
+
+                                        <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">
+                                            <mat-form-field dense>
+                                                <mat-select [(ngModel)]="sinks.deploymentTargetNodeId"
+                                                            [disabled]="disableNodeSelectionForSinks.value"
+                                                            required>
+                                                    <mat-option [value]="nodeInfo.nodeControllerId"
+                                                                *ngFor="let nodeInfo of deploymentOptions[sinks.appId]">
+                                                        <em>{{nodeInfo.hostname}}</em>
+                                                    </mat-option>
+                                                </mat-select>
+                                            </mat-form-field>
+                                        </div>
+                                    </div>
+
+                                </mat-expansion-panel>
+                            </mat-accordion>
+                        </div>
+
+                    </div>
+
+                </div>
+            </div>
+            <div fxFlex="100" fxLayout="column" fxLayoutAlign="center center" *ngIf="saving">
+                <mat-spinner [mode]="'indeterminate'" [diameter]="50"></mat-spinner>
+                <span class="status-text">Saving pipeline...</span>
+            </div>
+            <div fxFlex="100" fxLayout="column" fxLayoutAlign="center center" *ngIf="saved">
+                <mat-icon color="primary" style="font-size:50pt;height:60px;width:60px;">check_circle</mat-icon>
+                <span class="status-text">Pipeline successfully stored.</span>
+            </div>
+            <div fxFlex="100" fxLayout="column" fxLayoutAlign="center center" *ngIf="storageError">
+                <mat-icon color="primary" style="font-size:50pt;height:60px;width:60px;">error</mat-icon>
+                <span class="status-text">Your pipeline could not be stored.</span>
+                <span class="status-subtext">{{errorMessage}}</span>
+            </div>
+        </div>
+    </div>
+    <mat-divider></mat-divider>
+    <div class="sp-dialog-actions" fxLayoutAlign="left center">
+        <button [disabled]="!submitPipelineForm.valid || saving || saved" mat-button mat-raised-button
+                color="primary" (click)="migratePipelineProccesors()" style="margin-right:10px;">
+            Start Migration
+        </button>
+        <button mat-button mat-raised-button class="mat-basic" (click)="hide()">
+            {{saved ? 'Close' : 'Cancel'}}
+        </button>
+    </div>
+</div>
\ No newline at end of file
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.scss b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.scss
new file mode 100644
index 0000000..fe91666
--- /dev/null
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.scss
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+@import '../../../../scss/sp/sp-dialog.scss';
+
+.sp-dialog-container {
+  width: 520px;
+}
+
+.customize-section {
+  display:flex;
+  flex: 1 1 auto;
+  padding: 20px;
+}
+
+.padding-20 {
+  padding: 20px;
+}
+
+.mb-10 {
+  margin-bottom: 10px;
+}
+
+::ng-deep .pipeline-radio-group .mat-radio-label {
+  padding: 0;
+}
+
+.status-text {
+  font-size: 14pt;
+  margin-top:10px;
+}
+
+.status-subtext {
+  font-size: 12pt;
+}
+
+mat-slider {
+  width: 300px;
+}
+
+.execution-policy-radio-group {
+  display: flex;
+  flex-direction: column;
+  //margin: 15px 0;
+}
+
+.execution-policy-radio-button {
+  margin: 5px;
+}
+
+
+.execution-policy-action-buttons {
+  padding-bottom: 20px;
+}
+
+.example-headers-align .mat-expansion-panel-header-title,
+.example-headers-align .mat-expansion-panel-header-description {
+  flex-basis: 0;
+}
+
+.example-headers-align .mat-expansion-panel-header-description {
+  justify-content: space-between;
+  align-items: center;
+}
+
+.example-headers-align .mat-form-field + .mat-form-field {
+  margin-left: 8px;
+}
+
+.mat-form-field[dense] {
+  .mat-form-field-flex > .mat-form-field-infix {
+    padding: 0.4em 0px !important;
+  }
+  .mat-form-field-label-wrapper {
+    top: -1.5em;
+  }
+  &.mat-form-field-appearance-outline.mat-form-field-can-float.mat-form-field-should-float .mat-form-field-label {
+    transform: translateY(-1.1em) scale(.75);
+  }
+  .mat-form-field-wrapper{
+    padding-bottom: 0;
+  }
+}
\ No newline at end of file
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
new file mode 100644
index 0000000..ce15aed
--- /dev/null
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
@@ -0,0 +1,203 @@
+import {Component, Input, OnInit} from '@angular/core';
+import {
+  DataProcessorInvocation, Message,
+  NodeInfoDescription,
+  Pipeline,
+  StaticNodeMedata
+} from "../../../core-model/gen/streampipes-model";
+import {FormControl, FormGroup, Validators} from "@angular/forms";
+import {EditorService} from "../../services/editor.service";
+import {DialogRef} from "../../../core-ui/dialog/base-dialog/dialog-ref";
+import {ObjectProvider} from "../../services/object-provider.service";
+import {PipelineService} from "../../../platform-services/apis/pipeline.service";
+
+@Component({
+  selector: 'migrate-pipeline-processors',
+  templateUrl: './migrate-pipeline-processors.component.html',
+  styleUrls: ['./migrate-pipeline-processors.component.scss']
+})
+export class MigratePipelineProcessorsComponent implements OnInit {
+
+  submitPipelineForm: FormGroup = new FormGroup({});
+  saving: boolean = false;
+  saved: boolean = false;
+  storageError: boolean = false;
+  errorMessage: string = '';
+  edgeNodes: NodeInfoDescription[];
+  advancedSettings: boolean;
+  deploymentOptions: Array<any> = new Array<any>();
+  selectedRelayStrategyVal: string;
+  selectedPipelineExecutionPolicy: string;
+  disableNodeSelectionForProcessors = new FormControl(false);
+  disableNodeSelectionForSinks = new FormControl(true);
+  tmpPipeline: Pipeline;
+  panelOpenState: boolean;
+  pipelineExecutionPolicies: string[] = ['default', 'locality-aware', 'custom'];
+
+  @Input()
+  pipeline: Pipeline;
+
+  constructor(private editorService: EditorService,
+              private dialogRef: DialogRef<MigratePipelineProcessorsComponent>,
+              private objectProvider: ObjectProvider,
+              private pipelineService: PipelineService) {
+
+    this.advancedSettings = true;
+    this.panelOpenState = true;
+  }
+
+  ngOnInit() {
+    this.tmpPipeline = this.deepCopy(this.pipeline);
+
+    this.loadAndPrepareEdgeNodes();
+
+    this.submitPipelineForm.addControl("pipelineName", new FormControl(this.tmpPipeline.name,
+        [Validators.required,
+          Validators.maxLength(40)]))
+    this.submitPipelineForm.addControl("pipelineDescription", new FormControl(this.tmpPipeline.description,
+        [Validators.maxLength(80)]))
+
+    this.submitPipelineForm.controls["pipelineName"].valueChanges.subscribe(value => {
+      this.tmpPipeline.name = value;
+    });
+
+    this.submitPipelineForm.controls["pipelineDescription"].valueChanges.subscribe(value => {
+      this.tmpPipeline.description = value;
+    });
+
+    this.selectedRelayStrategyVal = "buffer";
+    this.selectedPipelineExecutionPolicy = "custom";
+
+  }
+
+  deepCopy<T>(source: T): T {
+    return Array.isArray(source)
+        ? source.map(item => this.deepCopy(item))
+        : source instanceof Date
+            ? new Date(source.getTime())
+            : source && typeof source === 'object'
+                ? Object.getOwnPropertyNames(source).reduce((o, prop) => {
+                  Object.defineProperty(o, prop, Object.getOwnPropertyDescriptor(source, prop));
+                  o[prop] = this.deepCopy(source[prop]);
+                  return o;
+                }, Object.create(Object.getPrototypeOf(source)))
+                : source as T;
+  }
+
+  displayErrors(data?: string) {
+    this.storageError = true;
+    this.errorMessage = data;
+  }
+
+  loadAndPrepareEdgeNodes() {
+    this.pipelineService.getAvailableEdgeNodes().subscribe(response => {
+      this.edgeNodes = response;
+      this.addAppIds(this.tmpPipeline.sepas, this.edgeNodes);
+      this.addAppIds(this.tmpPipeline.actions, this.edgeNodes);
+    });
+  }
+
+  addAppIds(pipelineElements, edgeNodes: Array<NodeInfoDescription>) {
+    pipelineElements.forEach(p => {
+      this.deploymentOptions[p.appId] = [];
+
+      if (p.deploymentTargetNodeId == null) {
+        p.deploymentTargetNodeId = "default";
+      }
+      this.deploymentOptions[p.appId].push(this.makeDefaultNodeInfo());
+
+      edgeNodes.forEach(nodeInfo => {
+        // only show nodes that actually have supported pipeline elements registered
+        if (nodeInfo.supportedElements.length != 0 &&
+            nodeInfo.supportedElements.some(appId => appId === p.appId)) {
+          this.deploymentOptions[p.appId].push(nodeInfo);
+        }
+      })
+    });
+  }
+
+  makeDefaultNodeInfo() {
+    let nodeInfo = {} as NodeInfoDescription;
+    nodeInfo.nodeControllerId = "default";
+    nodeInfo.hostname = "default";
+    nodeInfo.staticNodeMedata = {} as StaticNodeMedata;
+    nodeInfo.staticNodeMedata.type = "default";
+    nodeInfo.staticNodeMedata.model = "Default Node";
+    return nodeInfo;
+  }
+
+  modifyPipelineElementsDeployments(pipelineElements) {
+    pipelineElements.forEach(p => {
+      let selectedTargetNodeId = p.deploymentTargetNodeId
+
+      // Currently relay only for data processors
+      if (p instanceof DataProcessorInvocation) {
+        p.eventRelayStrategy = this.selectedRelayStrategyVal;
+      }
+
+      if(selectedTargetNodeId != "default") {
+        let selectedNode = this.edgeNodes
+            .filter(node => node.nodeControllerId === selectedTargetNodeId)
+
+        p.deploymentTargetNodeHostname = selectedNode
+            .map(node => node.hostname)[0]
+
+        p.deploymentTargetNodePort = selectedNode
+            .map(node => node.port)[0]
+      }
+      else {
+        p.deploymentTargetNodeHostname = null
+        p.deploymentTargetNodePort = null
+      }
+    })
+  }
+
+  migratePipelineProccesors() {
+    if (this.tmpPipeline.name == "") {
+      //this.showToast("error", "Please enter a name for your pipeline");
+      return false;
+    }
+
+    let migrationRequest;
+
+    this.modifyPipelineElementsDeployments(this.tmpPipeline.sepas)
+    this.modifyPipelineElementsDeployments(this.tmpPipeline.actions)
+    this.tmpPipeline.eventRelayStrategy = this.selectedRelayStrategyVal;
+    this.pipeline = this.tmpPipeline;
+
+    migrationRequest = this.pipelineService.migratePipeline(this.pipeline);
+
+    migrationRequest
+        .subscribe(statusMessage => {
+          if (statusMessage.success) {
+            this.afterMigration(statusMessage, this.pipeline._id);
+          } else {
+            this.displayErrors(statusMessage.notifications[0]);
+          }
+        }, data => {
+          this.displayErrors();
+        });
+  };
+
+  afterMigration(statusMessage: Message, pipelineId?: string) {
+    this.hide();
+    // TODO: show dialog with statusMessagge
+    // this.editorService.removePipelineFromCache().subscribe();
+  }
+
+  hide() {
+    this.dialogRef.close();
+  };
+
+  onSelectedRelayStrategyChange(value: string) {
+    this.selectedRelayStrategyVal = value;
+  }
+
+  onExecutionPolicyChange(value: any) {
+    this.selectedPipelineExecutionPolicy = value;
+  }
+
+  isExecutinoPolicyDisabled() {
+    return true;
+  }
+}
diff --git a/ui/src/app/editor/editor.module.ts b/ui/src/app/editor/editor.module.ts
index d44fc86..601591f 100644
--- a/ui/src/app/editor/editor.module.ts
+++ b/ui/src/app/editor/editor.module.ts
@@ -60,6 +60,7 @@ import {PropertySelectionComponent} from "./components/output-strategy/property-
 import {UserDefinedOutputStrategyComponent} from "./components/output-strategy/user-defined-output/user-defined-output.component";
 import {ConnectModule} from "../connect/connect.module";
 import {MatSliderModule} from "@angular/material/slider";
+import { MigratePipelineProcessorsComponent } from './dialog/migrate-pipeline-processors/migrate-pipeline-processors.component';
 
 @NgModule({
     imports: [
@@ -99,7 +100,8 @@ import {MatSliderModule} from "@angular/material/slider";
         PropertySelectionComponent,
         SavePipelineComponent,
         SafeCss,
-        WelcomeTourComponent
+        WelcomeTourComponent,
+        MigratePipelineProcessorsComponent
     ],
     providers: [
         EditorService,
diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
index bc08501..1a99f9d 100644
--- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
+++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
@@ -77,7 +77,7 @@
                     <button color="primary" mat-button mat-icon-button matTooltip="Live-Migrate processors"
                             matTooltipPosition="above"
                             [disabled]="!pipeline.running"
-                            (click)="pipelineOperationsService.migrateProcessors(pipeline)">
+                            (click)="pipelineOperationsService.migratePipelineProcessors(pipeline)">
                         <i class="material-icons">storage</i>
                     </button>
                 </span>
diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.ts b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.ts
index c848441..4381c61 100644
--- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.ts
+++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.ts
@@ -22,6 +22,7 @@ import {PipelineOperationsService} from "../../services/pipeline-operations.serv
 import {MatTableDataSource} from "@angular/material/table";
 import {MatPaginator} from "@angular/material/paginator";
 import {MatSort} from "@angular/material/sort";
+import {DialogService} from "../../../core-ui/dialog/base-dialog/base-dialog.service";
 
 
 @Component({
diff --git a/ui/src/app/pipelines/services/pipeline-operations.service.ts b/ui/src/app/pipelines/services/pipeline-operations.service.ts
index 29fd5ba..567ce40 100644
--- a/ui/src/app/pipelines/services/pipeline-operations.service.ts
+++ b/ui/src/app/pipelines/services/pipeline-operations.service.ts
@@ -26,6 +26,7 @@ import {Pipeline, PipelineOperationStatus} from "../../core-model/gen/streampipe
 import {DeletePipelineDialogComponent} from "../dialog/delete-pipeline/delete-pipeline-dialog.component";
 import {DialogRef} from "../../core-ui/dialog/base-dialog/dialog-ref";
 import {Router} from "@angular/router";
+import {MigratePipelineProcessorsComponent} from "../../editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component";
 
 declare const require: any;
 
@@ -117,7 +118,13 @@ export class PipelineOperationsService {
     //this.$state.go("streampipes.pipelinelogs", {pipeline: id});
   }
 
-    migrateProcessors(pipeline: Pipeline) {
-        console.log(pipeline);
-    }
+  migratePipelineProcessors(pipeline: Pipeline) {
+    this.DialogService.open(MigratePipelineProcessorsComponent,{
+      panelType: PanelType.SLIDE_IN_PANEL,
+      title: "Live-Migrate pipeline processors",
+      data: {
+        "pipeline": pipeline
+      }
+    });
+  }
 }
\ No newline at end of file
diff --git a/ui/src/app/platform-services/apis/pipeline.service.ts b/ui/src/app/platform-services/apis/pipeline.service.ts
index dde909f..a2f8de8 100644
--- a/ui/src/app/platform-services/apis/pipeline.service.ts
+++ b/ui/src/app/platform-services/apis/pipeline.service.ts
@@ -81,6 +81,14 @@ export class PipelineService {
         }));
   }
 
+  migratePipeline(pipeline: Pipeline): Observable<Message> {
+    var pipelineId = pipeline._id;
+    return this.http.post(this.platformServicesCommons.authUserBasePath() + "/pipelines/migrate/" + pipelineId, pipeline)
+        .pipe(map(response => {
+          return Message.fromData(response as Message);
+        }));
+  }
+
   getOwnPipelines(): Observable<Pipeline[]> {
     return this.http.get(this.platformServicesCommons.authUserBasePath() + "/pipelines/own").pipe(map(response => {
       return (response as any[]).map(p => Pipeline.fromData(p));