You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/12/24 15:23:46 UTC
[incubator-streampipes] branch edge-extensions updated: [WIP] add
ui part for processor live-migration, add backend migration endpoint
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new b94f09b [WIP] add ui part for processor live-migration, add backend migration endpoint
b94f09b is described below
commit b94f09b7ddf9272e886dbb04fbc0943903fe6f1a
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Dec 24 16:23:29 2020 +0100
[WIP] add ui part for processor live-migration, add backend migration endpoint
---
.../org/apache/streampipes/rest/api/IPipeline.java | 2 +
.../rest/impl/PipelineWithUserResource.java | 19 ++
.../migrate-pipeline-processors.component.html | 172 +++++++++++++++++
.../migrate-pipeline-processors.component.scss | 98 ++++++++++
.../migrate-pipeline-processors.component.ts | 203 +++++++++++++++++++++
ui/src/app/editor/editor.module.ts | 4 +-
.../pipeline-overview.component.html | 2 +-
.../pipeline-overview.component.ts | 1 +
.../services/pipeline-operations.service.ts | 13 +-
.../app/platform-services/apis/pipeline.service.ts | 8 +
10 files changed, 517 insertions(+), 5 deletions(-)
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipeline.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipeline.java
index 14a1292..d40afe6 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipeline.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipeline.java
@@ -40,4 +40,6 @@ public interface IPipeline extends IPipelineElement {
Response overwritePipeline(String username, String pipelineId, Pipeline pipeline);
Response getPipelineStatus(String username, String pipelineId);
+
+ Response migratePipelineProcessors(String username, String pipelineId, Pipeline pipeline);
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
index 6be4d59..b7316a9 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
@@ -248,4 +248,23 @@ public class PipelineWithUserResource extends AbstractRestInterface implements I
return statusMessage(Notifications.success("Pipeline modified"));
}
+ @POST
+ @Path("/migrate/{pipelineId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @GsonWithIds
+ @Override
+ public Response migratePipelineProcessors(@PathParam("username") String username,
+ @PathParam("pipelineId") String pipelineId,
+ Pipeline pipeline) {
+ Pipeline storedPipeline = getPipelineStorage().getPipeline(pipelineId);
+
+// storedPipeline.setActions(pipeline.getActions());
+// storedPipeline.setSepas(pipeline.getSepas());
+// storedPipeline.setActions(pipeline.getActions());
+// storedPipeline.setCreatedAt(System.currentTimeMillis());
+// storedPipeline.setPipelineCategories(pipeline.getPipelineCategories());
+// Operations.updatePipeline(storedPipeline);
+ return statusMessage(Notifications.success("Pipeline processors migrated"));
+ }
+
}
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.html b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.html
new file mode 100644
index 0000000..88cf5bd
--- /dev/null
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.html
@@ -0,0 +1,172 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<div class="sp-dialog-container">
+ <div class="sp-dialog-content padding-20">
+ <div fxFlex="100" fxLayout="column">
+ <div fxFlex="100" fxLayout="column" *ngIf="!saved && !saving && !storageError">
+ <form [formGroup]="submitPipelineForm">
+ <div fxFlex="100" fxLayout="column" *ngIf="!currentModifiedPipelineId || updateMode=='clone'">
+ <mat-form-field fxFlex><mat-label>Pipeline Name</mat-label>
+ <input [formControlName]="'pipelineName'" matInput name="pipelineName" readonly/>
+ </mat-form-field>
+ <mat-form-field fxFlex><mat-label>Description</mat-label>
+ <input [formControlName]="'pipelineDescription'" matInput readonly/>
+ </mat-form-field>
+ </div>
+ </form>
+ <mat-slide-toggle color="primary" [(ngModel)]="advancedSettings">
+ Configure deployment options
+ </mat-slide-toggle>
+ <mat-divider *ngIf="advancedSettings" style="margin: 1em 0 1em 0;"></mat-divider>
+ <div *ngIf="advancedSettings">
+ <div>
+ <b>Pipeline Operation Policies</b>
+ </div>
+
+ <div style="margin-top: 2em">
+ <div fxFlex="100" fxLayout="row">
+ <div fxFlex="50" fxLayout="row" fxLayoutAlign="left top">
+ <span>Relay Policy</span>
+ </div>
+ </div>
+ <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">
+ <mat-button-toggle-group #relayGroup="matButtonToggleGroup" aria-label="Relay strategy"
+ [disabled]="true"
+ [value]="selectedRelayStrategyVal"
+ (change)="onSelectedRelayStrategyChange(relayGroup.value)">
+ <mat-button-toggle value="purge">purge</mat-button-toggle>
+ <mat-button-toggle value="buffer">buffer</mat-button-toggle>
+ </mat-button-toggle-group>
+ </div>
+ </div>
+
+ <div style="margin-top: 1em">
+ <div fxFlex="100" fxLayout="row">
+ <div fxFlex="50" fxLayout="row" fxLayoutAlign="left top">
+ <span>Execution Policy</span>
+ </div>
+ </div>
+ <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">
+ <mat-radio-group
+ aria-labelledby="execution-policy-radio-group-label"
+ class="execution-policy-radio-group"
+ [(ngModel)]="selectedPipelineExecutionPolicy"
+ (change)="onExecutionPolicyChange($event.value)">
+ <mat-radio-button class="execution-policy-radio-button"
+ *ngFor="let policy of pipelineExecutionPolicies"
+ [disabled]="isExecutinoPolicyDisabled()"
+ [value]="policy">{{policy}}
+ </mat-radio-button>
+ </mat-radio-group>
+ </div>
+ </div>
+
+ <div style="margin: 1em 0 1em 0;">
+ <div>
+ <b>Node Execution Targets</b>
+ </div>
+ <div style="margin-top: 2em">
+ <mat-accordion class="example-headers-align">
+
+ <mat-expansion-panel (opened)="panelOpenState = true"
+ (closed)="panelOpenState = false"
+ [expanded]="panelOpenState">
+ <mat-expansion-panel-header>
+ <mat-panel-description>
+ Modify deployment targets: <b>{{selectedPipelineExecutionPolicy}}</b>
+ <mat-icon>storage</mat-icon>
+ </mat-panel-description>
+ </mat-expansion-panel-header>
+
+ <div *ngFor="let processors of tmpPipeline.sepas">
+ <div fxFlex="100" fxLayout="row">
+ <div fxFlex="50" fxLayout="row" fxLayoutAlign="left center">
+ <span>{{processors.name}}</span>
+ </div>
+ </div>
+
+ <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">
+ <mat-form-field dense>
+ <mat-select [(ngModel)]="processors.deploymentTargetNodeId"
+ [disabled]="disableNodeSelectionForProcessors.value"
+ required>
+ <mat-option [value]="nodeInfo.nodeControllerId"
+ *ngFor="let nodeInfo of deploymentOptions[processors.appId]">
+ <em>{{nodeInfo.hostname}}</em>
+ </mat-option>
+ </mat-select>
+ </mat-form-field>
+ </div>
+
+ </div>
+ <div *ngFor="let sinks of tmpPipeline.actions">
+ <div fxFlex="100" fxLayout="row">
+ <div fxFlex="50" fxLayout="row" fxLayoutAlign="left center">
+ <span>{{sinks.name}}</span>
+ </div>
+ </div>
+
+ <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">
+ <mat-form-field dense>
+ <mat-select [(ngModel)]="sinks.deploymentTargetNodeId"
+ [disabled]="disableNodeSelectionForSinks.value"
+ required>
+ <mat-option [value]="nodeInfo.nodeControllerId"
+ *ngFor="let nodeInfo of deploymentOptions[sinks.appId]">
+ <em>{{nodeInfo.hostname}}</em>
+ </mat-option>
+ </mat-select>
+ </mat-form-field>
+ </div>
+ </div>
+
+ </mat-expansion-panel>
+ </mat-accordion>
+ </div>
+
+ </div>
+
+ </div>
+ </div>
+ <div fxFlex="100" fxLayout="column" fxLayoutAlign="center center" *ngIf="saving">
+ <mat-spinner [mode]="'indeterminate'" [diameter]="50"></mat-spinner>
+ <span class="status-text">Saving pipeline...</span>
+ </div>
+ <div fxFlex="100" fxLayout="column" fxLayoutAlign="center center" *ngIf="saved">
+ <mat-icon color="primary" style="font-size:50pt;height:60px;width:60px;">check_circle</mat-icon>
+ <span class="status-text">Pipeline successfully stored.</span>
+ </div>
+ <div fxFlex="100" fxLayout="column" fxLayoutAlign="center center" *ngIf="storageError">
+ <mat-icon color="primary" style="font-size:50pt;height:60px;width:60px;">error</mat-icon>
+ <span class="status-text">Your pipeline could not be stored.</span>
+ <span class="status-subtext">{{errorMessage}}</span>
+ </div>
+ </div>
+ </div>
+ <mat-divider></mat-divider>
+ <div class="sp-dialog-actions" fxLayoutAlign="left center">
+ <button [disabled]="!submitPipelineForm.valid || saving || saved" mat-button mat-raised-button
+ color="primary" (click)="migratePipelineProccesors()" style="margin-right:10px;">
+ Start Migration
+ </button>
+ <button mat-button mat-raised-button class="mat-basic" (click)="hide()">
+ {{saved ? 'Close' : 'Cancel'}}
+ </button>
+ </div>
+</div>
\ No newline at end of file
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.scss b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.scss
new file mode 100644
index 0000000..fe91666
--- /dev/null
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.scss
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+@import '../../../../scss/sp/sp-dialog.scss';
+
+.sp-dialog-container {
+ width: 520px;
+}
+
+.customize-section {
+ display:flex;
+ flex: 1 1 auto;
+ padding: 20px;
+}
+
+.padding-20 {
+ padding: 20px;
+}
+
+.mb-10 {
+ margin-bottom: 10px;
+}
+
+::ng-deep .pipeline-radio-group .mat-radio-label {
+ padding: 0;
+}
+
+.status-text {
+ font-size: 14pt;
+ margin-top:10px;
+}
+
+.status-subtext {
+ font-size: 12pt;
+}
+
+mat-slider {
+ width: 300px;
+}
+
+.execution-policy-radio-group {
+ display: flex;
+ flex-direction: column;
+ //margin: 15px 0;
+}
+
+.execution-policy-radio-button {
+ margin: 5px;
+}
+
+
+.execution-policy-action-buttons {
+ padding-bottom: 20px;
+}
+
+.example-headers-align .mat-expansion-panel-header-title,
+.example-headers-align .mat-expansion-panel-header-description {
+ flex-basis: 0;
+}
+
+.example-headers-align .mat-expansion-panel-header-description {
+ justify-content: space-between;
+ align-items: center;
+}
+
+.example-headers-align .mat-form-field + .mat-form-field {
+ margin-left: 8px;
+}
+
+.mat-form-field[dense] {
+ .mat-form-field-flex > .mat-form-field-infix {
+ padding: 0.4em 0px !important;
+ }
+ .mat-form-field-label-wrapper {
+ top: -1.5em;
+ }
+ &.mat-form-field-appearance-outline.mat-form-field-can-float.mat-form-field-should-float .mat-form-field-label {
+ transform: translateY(-1.1em) scale(.75);
+ }
+ .mat-form-field-wrapper{
+ padding-bottom: 0;
+ }
+}
\ No newline at end of file
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
new file mode 100644
index 0000000..ce15aed
--- /dev/null
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
@@ -0,0 +1,203 @@
+import {Component, Input, OnInit} from '@angular/core';
+import {
+ DataProcessorInvocation, Message,
+ NodeInfoDescription,
+ Pipeline,
+ StaticNodeMedata
+} from "../../../core-model/gen/streampipes-model";
+import {FormControl, FormGroup, Validators} from "@angular/forms";
+import {EditorService} from "../../services/editor.service";
+import {DialogRef} from "../../../core-ui/dialog/base-dialog/dialog-ref";
+import {ObjectProvider} from "../../services/object-provider.service";
+import {PipelineService} from "../../../platform-services/apis/pipeline.service";
+
+@Component({
+ selector: 'migrate-pipeline-processors',
+ templateUrl: './migrate-pipeline-processors.component.html',
+ styleUrls: ['./migrate-pipeline-processors.component.scss']
+})
+export class MigratePipelineProcessorsComponent implements OnInit {
+
+ submitPipelineForm: FormGroup = new FormGroup({});
+ saving: boolean = false;
+ saved: boolean = false;
+ storageError: boolean = false;
+ errorMessage: string = '';
+ edgeNodes: NodeInfoDescription[];
+ advancedSettings: boolean;
+ deploymentOptions: Array<any> = new Array<any>();
+ selectedRelayStrategyVal: string;
+ selectedPipelineExecutionPolicy: string;
+ disableNodeSelectionForProcessors = new FormControl(false);
+ disableNodeSelectionForSinks = new FormControl(true);
+ tmpPipeline: Pipeline;
+ panelOpenState: boolean;
+ pipelineExecutionPolicies: string[] = ['default', 'locality-aware', 'custom'];
+
+ @Input()
+ pipeline: Pipeline;
+
+ constructor(private editorService: EditorService,
+ private dialogRef: DialogRef<MigratePipelineProcessorsComponent>,
+ private objectProvider: ObjectProvider,
+ private pipelineService: PipelineService) {
+
+ this.advancedSettings = true;
+ this.panelOpenState = true;
+ }
+
+ ngOnInit() {
+ this.tmpPipeline = this.deepCopy(this.pipeline);
+
+ this.loadAndPrepareEdgeNodes();
+
+ this.submitPipelineForm.addControl("pipelineName", new FormControl(this.tmpPipeline.name,
+ [Validators.required,
+ Validators.maxLength(40)]))
+ this.submitPipelineForm.addControl("pipelineDescription", new FormControl(this.tmpPipeline.description,
+ [Validators.maxLength(80)]))
+
+ this.submitPipelineForm.controls["pipelineName"].valueChanges.subscribe(value => {
+ this.tmpPipeline.name = value;
+ });
+
+ this.submitPipelineForm.controls["pipelineDescription"].valueChanges.subscribe(value => {
+ this.tmpPipeline.description = value;
+ });
+
+ this.selectedRelayStrategyVal = "buffer";
+ this.selectedPipelineExecutionPolicy = "custom";
+
+ }
+
+ deepCopy<T>(source: T): T {
+ return Array.isArray(source)
+ ? source.map(item => this.deepCopy(item))
+ : source instanceof Date
+ ? new Date(source.getTime())
+ : source && typeof source === 'object'
+ ? Object.getOwnPropertyNames(source).reduce((o, prop) => {
+ Object.defineProperty(o, prop, Object.getOwnPropertyDescriptor(source, prop));
+ o[prop] = this.deepCopy(source[prop]);
+ return o;
+ }, Object.create(Object.getPrototypeOf(source)))
+ : source as T;
+ }
+
+ displayErrors(data?: string) {
+ this.storageError = true;
+ this.errorMessage = data;
+ }
+
+ loadAndPrepareEdgeNodes() {
+ this.pipelineService.getAvailableEdgeNodes().subscribe(response => {
+ this.edgeNodes = response;
+ this.addAppIds(this.tmpPipeline.sepas, this.edgeNodes);
+ this.addAppIds(this.tmpPipeline.actions, this.edgeNodes);
+ });
+ }
+
+ addAppIds(pipelineElements, edgeNodes: Array<NodeInfoDescription>) {
+ pipelineElements.forEach(p => {
+ this.deploymentOptions[p.appId] = [];
+
+ if (p.deploymentTargetNodeId == null) {
+ p.deploymentTargetNodeId = "default";
+ }
+ this.deploymentOptions[p.appId].push(this.makeDefaultNodeInfo());
+
+ edgeNodes.forEach(nodeInfo => {
+ // only show nodes that actually have supported pipeline elements registered
+ if (nodeInfo.supportedElements.length != 0 &&
+ nodeInfo.supportedElements.some(appId => appId === p.appId)) {
+ this.deploymentOptions[p.appId].push(nodeInfo);
+ }
+ })
+ });
+ }
+
+ makeDefaultNodeInfo() {
+ let nodeInfo = {} as NodeInfoDescription;
+ nodeInfo.nodeControllerId = "default";
+ nodeInfo.hostname = "default";
+ nodeInfo.staticNodeMedata = {} as StaticNodeMedata;
+ nodeInfo.staticNodeMedata.type = "default";
+ nodeInfo.staticNodeMedata.model = "Default Node";
+ return nodeInfo;
+ }
+
+ modifyPipelineElementsDeployments(pipelineElements) {
+ pipelineElements.forEach(p => {
+ let selectedTargetNodeId = p.deploymentTargetNodeId
+
+ // Currently relay only for data processors
+ if (p instanceof DataProcessorInvocation) {
+ p.eventRelayStrategy = this.selectedRelayStrategyVal;
+ }
+
+ if(selectedTargetNodeId != "default") {
+ let selectedNode = this.edgeNodes
+ .filter(node => node.nodeControllerId === selectedTargetNodeId)
+
+ p.deploymentTargetNodeHostname = selectedNode
+ .map(node => node.hostname)[0]
+
+ p.deploymentTargetNodePort = selectedNode
+ .map(node => node.port)[0]
+ }
+ else {
+ p.deploymentTargetNodeHostname = null
+ p.deploymentTargetNodePort = null
+ }
+ })
+ }
+
+ migratePipelineProccesors() {
+ if (this.tmpPipeline.name == "") {
+ //this.showToast("error", "Please enter a name for your pipeline");
+ return false;
+ }
+
+ let migrationRequest;
+
+ this.modifyPipelineElementsDeployments(this.tmpPipeline.sepas)
+ this.modifyPipelineElementsDeployments(this.tmpPipeline.actions)
+ this.tmpPipeline.eventRelayStrategy = this.selectedRelayStrategyVal;
+ this.pipeline = this.tmpPipeline;
+
+ migrationRequest = this.pipelineService.migratePipeline(this.pipeline);
+
+ migrationRequest
+ .subscribe(statusMessage => {
+ if (statusMessage.success) {
+ this.afterMigration(statusMessage, this.pipeline._id);
+ } else {
+ this.displayErrors(statusMessage.notifications[0]);
+ }
+ }, data => {
+ this.displayErrors();
+ });
+ };
+
+ afterMigration(statusMessage: Message, pipelineId?: string) {
+ this.hide();
+ // TODO: show dialog with statusMessagge
+ // this.editorService.removePipelineFromCache().subscribe();
+ }
+
+ hide() {
+ this.dialogRef.close();
+ };
+
+ onSelectedRelayStrategyChange(value: string) {
+ this.selectedRelayStrategyVal = value;
+ }
+
+ onExecutionPolicyChange(value: any) {
+ this.selectedPipelineExecutionPolicy = value;
+ }
+
+ isExecutinoPolicyDisabled() {
+ return true;
+ }
+}
diff --git a/ui/src/app/editor/editor.module.ts b/ui/src/app/editor/editor.module.ts
index d44fc86..601591f 100644
--- a/ui/src/app/editor/editor.module.ts
+++ b/ui/src/app/editor/editor.module.ts
@@ -60,6 +60,7 @@ import {PropertySelectionComponent} from "./components/output-strategy/property-
import {UserDefinedOutputStrategyComponent} from "./components/output-strategy/user-defined-output/user-defined-output.component";
import {ConnectModule} from "../connect/connect.module";
import {MatSliderModule} from "@angular/material/slider";
+import { MigratePipelineProcessorsComponent } from './dialog/migrate-pipeline-processors/migrate-pipeline-processors.component';
@NgModule({
imports: [
@@ -99,7 +100,8 @@ import {MatSliderModule} from "@angular/material/slider";
PropertySelectionComponent,
SavePipelineComponent,
SafeCss,
- WelcomeTourComponent
+ WelcomeTourComponent,
+ MigratePipelineProcessorsComponent
],
providers: [
EditorService,
diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
index bc08501..1a99f9d 100644
--- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
+++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
@@ -77,7 +77,7 @@
<button color="primary" mat-button mat-icon-button matTooltip="Live-Migrate processors"
matTooltipPosition="above"
[disabled]="!pipeline.running"
- (click)="pipelineOperationsService.migrateProcessors(pipeline)">
+ (click)="pipelineOperationsService.migratePipelineProcessors(pipeline)">
<i class="material-icons">storage</i>
</button>
</span>
diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.ts b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.ts
index c848441..4381c61 100644
--- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.ts
+++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.ts
@@ -22,6 +22,7 @@ import {PipelineOperationsService} from "../../services/pipeline-operations.serv
import {MatTableDataSource} from "@angular/material/table";
import {MatPaginator} from "@angular/material/paginator";
import {MatSort} from "@angular/material/sort";
+import {DialogService} from "../../../core-ui/dialog/base-dialog/base-dialog.service";
@Component({
diff --git a/ui/src/app/pipelines/services/pipeline-operations.service.ts b/ui/src/app/pipelines/services/pipeline-operations.service.ts
index 29fd5ba..567ce40 100644
--- a/ui/src/app/pipelines/services/pipeline-operations.service.ts
+++ b/ui/src/app/pipelines/services/pipeline-operations.service.ts
@@ -26,6 +26,7 @@ import {Pipeline, PipelineOperationStatus} from "../../core-model/gen/streampipe
import {DeletePipelineDialogComponent} from "../dialog/delete-pipeline/delete-pipeline-dialog.component";
import {DialogRef} from "../../core-ui/dialog/base-dialog/dialog-ref";
import {Router} from "@angular/router";
+import {MigratePipelineProcessorsComponent} from "../../editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component";
declare const require: any;
@@ -117,7 +118,13 @@ export class PipelineOperationsService {
//this.$state.go("streampipes.pipelinelogs", {pipeline: id});
}
- migrateProcessors(pipeline: Pipeline) {
- console.log(pipeline);
- }
+ migratePipelineProcessors(pipeline: Pipeline) {
+ this.DialogService.open(MigratePipelineProcessorsComponent,{
+ panelType: PanelType.SLIDE_IN_PANEL,
+ title: "Live-Migrate pipeline processors",
+ data: {
+ "pipeline": pipeline
+ }
+ });
+ }
}
\ No newline at end of file
diff --git a/ui/src/app/platform-services/apis/pipeline.service.ts b/ui/src/app/platform-services/apis/pipeline.service.ts
index dde909f..a2f8de8 100644
--- a/ui/src/app/platform-services/apis/pipeline.service.ts
+++ b/ui/src/app/platform-services/apis/pipeline.service.ts
@@ -81,6 +81,14 @@ export class PipelineService {
}));
}
+ migratePipeline(pipeline: Pipeline): Observable<Message> {
+ var pipelineId = pipeline._id;
+ return this.http.post(this.platformServicesCommons.authUserBasePath() + "/pipelines/migrate/" + pipelineId, pipeline)
+ .pipe(map(response => {
+ return Message.fromData(response as Message);
+ }));
+ }
+
getOwnPipelines(): Observable<Pipeline[]> {
return this.http.get(this.platformServicesCommons.authUserBasePath() + "/pipelines/own").pipe(map(response => {
return (response as any[]).map(p => Pipeline.fromData(p));