You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/22 12:05:44 UTC
[incubator-streampipes] 02/02: [STREAMPIPES-367] Allow forced stop
of pipelines that fail to be gracefully stopped
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit d04faace8554442f3b1a06b7d8d53a16437969b3
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat May 22 14:05:27 2021 +0200
[STREAMPIPES-367] Allow forced stop of pipelines that fail to be gracefully stopped
---
.../backend/StreamPipesBackendApplication.java | 2 +-
.../manager/execution/http/GraphSubmitter.java | 5 +-
.../manager/execution/http/PipelineExecutor.java | 18 ++++--
.../streampipes/manager/operations/Operations.java | 20 ++++---
.../streampipes/rest/impl/PipelineResource.java | 6 +-
.../rest/impl/nouser/PipelineNoUserResource.java | 2 +-
.../rest/management/PipelineManagement.java | 5 +-
.../pipeline-overview.component.scss | 2 +-
.../pipeline-status-dialog.component.html | 65 +++++++++++++++------
.../pipeline-status-dialog.component.scss | 32 +++++++++-
.../pipeline-status-dialog.component.ts | 68 ++++++++++++++++++++--
.../src/app/pipelines/model/pipeline-model.ts | 17 +-----
.../services/pipeline-operations.service.ts | 60 ++++++++++---------
.../app/platform-services/apis/pipeline.service.ts | 13 ++++-
14 files changed, 222 insertions(+), 93 deletions(-)
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 790ee2b..048ffaa 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -95,7 +95,7 @@ public class StreamPipesBackendApplication {
});
LOG.info("Gracefully stopping all running pipelines...");
- List<PipelineOperationStatus> status = Operations.stopAllPipelines();
+ List<PipelineOperationStatus> status = Operations.stopAllPipelines(true);
status.forEach(s -> {
if (s.isSuccess()) {
LOG.info("Pipeline {} successfully stopped", s.getPipelineName());
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 82919a6..3eddef1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -42,10 +43,10 @@ public class GraphSubmitter {
String pipelineName,
List<InvocableStreamPipesEntity> graphs,
List<SpDataSet> dataSets) {
- this.graphs = graphs;
+ this.graphs = graphs != null ? graphs : new ArrayList<>();
this.pipelineId = pipelineId;
this.pipelineName = pipelineName;
- this.dataSets = dataSets;
+ this.dataSets = dataSets != null ? dataSets : new ArrayList<>();
}
public PipelineOperationStatus invokeGraphs() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index c283bad..582b6b1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -45,13 +45,18 @@ public class PipelineExecutor {
private boolean visualize;
private boolean storeStatus;
private boolean monitor;
+ private boolean forceStop;
- public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
- boolean monitor) {
+ public PipelineExecutor(Pipeline pipeline,
+ boolean visualize,
+ boolean storeStatus,
+ boolean monitor,
+ boolean forceStop) {
this.pipeline = pipeline;
this.visualize = visualize;
this.storeStatus = storeStatus;
this.monitor = monitor;
+ this.forceStop = forceStop;
}
public PipelineOperationStatus startPipeline() {
@@ -146,9 +151,6 @@ public class PipelineExecutor {
.getVisualizationStorageApi()
.deleteVisualization(pipeline.getPipelineId());
}
- if (storeStatus) {
- setPipelineStopped(pipeline);
- }
PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
new PipelineStatusMessage(pipeline.getPipelineId(),
@@ -157,6 +159,12 @@ public class PipelineExecutor {
PipelineStatusMessageType.PIPELINE_STOPPED.description()));
}
+
+ if (status.isSuccess() || forceStop) {
+ if (storeStatus) {
+ setPipelineStopped(pipeline);
+ }
+ }
return status;
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index a3696a3..c7f0295 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -121,31 +121,33 @@ public class Operations {
public static PipelineOperationStatus startPipeline(
Pipeline pipeline, boolean visualize, boolean storeStatus,
boolean monitor) {
- return new PipelineExecutor(pipeline, visualize, storeStatus, monitor).startPipeline();
+ return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, false).startPipeline();
}
public static PipelineOperationStatus stopPipeline(
- Pipeline pipeline) {
- return stopPipeline(pipeline, true, true, false);
+ Pipeline pipeline, boolean forceStop) {
+ return stopPipeline(pipeline, true, true, false, forceStop);
}
- public static List<PipelineOperationStatus> stopAllPipelines() {
+ public static List<PipelineOperationStatus> stopAllPipelines(boolean forceStop) {
List<PipelineOperationStatus> status = new ArrayList<>();
List<Pipeline> pipelines =
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines();
pipelines.forEach(p -> {
if (p.isRunning()) {
- status.add(Operations.stopPipeline(p));
+ status.add(Operations.stopPipeline(p, forceStop));
}
});
return status;
}
- public static PipelineOperationStatus stopPipeline(
- Pipeline pipeline, boolean visualize, boolean storeStatus,
- boolean monitor) {
- return new PipelineExecutor(pipeline, visualize, storeStatus, monitor).stopPipeline();
+ public static PipelineOperationStatus stopPipeline(Pipeline pipeline,
+ boolean visualize,
+ boolean storeStatus,
+ boolean monitor,
+ boolean forceStop) {
+ return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, forceStop).stopPipeline();
}
public static List<RdfEndpointItem> getEndpointUriContents(List<RdfEndpoint> endpoints) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index 093f9c5..1850c09 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -154,10 +154,12 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
@JacksonSerialized
@Operation(summary = "Stop the pipeline with the given id",
tags = {"Pipeline"})
- public Response stop(@PathParam("username") String username, @PathParam("pipelineId") String pipelineId) {
+ public Response stop(@PathParam("username") String username,
+ @PathParam("pipelineId") String pipelineId,
+ @QueryParam("forceStop") @DefaultValue("false") boolean forceStop) {
logger.info("User: " + username + " stopped pipeline: " + pipelineId);
PipelineManagement pm = new PipelineManagement();
- return pm.stopPipeline(pipelineId);
+ return pm.stopPipeline(pipelineId, forceStop);
}
@POST
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineNoUserResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineNoUserResource.java
index dfc0d79..8a31a1c 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineNoUserResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineNoUserResource.java
@@ -45,6 +45,6 @@ public class PipelineNoUserResource extends AbstractRestResource {
public Response stop(@PathParam("pipelineId") String pipelineId) {
logger.info("Pipeline: " + pipelineId + " was stopped by the system");
PipelineManagement pm = new PipelineManagement();
- return pm.stopPipeline(pipelineId);
+ return pm.stopPipeline(pipelineId, false);
}
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/management/PipelineManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/management/PipelineManagement.java
index 64d1176..1b80671 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/management/PipelineManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/management/PipelineManagement.java
@@ -29,10 +29,11 @@ import javax.ws.rs.core.Response;
public class PipelineManagement extends AbstractRestResource {
- public Response stopPipeline(String pipelineId) {
+ public Response stopPipeline(String pipelineId,
+ boolean forceStop) {
try {
Pipeline pipeline = getPipelineStorage().getPipeline(pipelineId);
- PipelineOperationStatus status = Operations.stopPipeline(pipeline);
+ PipelineOperationStatus status = Operations.stopPipeline(pipeline, forceStop);
return ok(status);
} catch
(Exception e) {
diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss
index 13ac91e..52ee77f 100644
--- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss
+++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss
@@ -63,5 +63,5 @@
.light-neutral {
background-color: #b4b4b4;
- box-shadow: 0 0 5px 2px #b4b4b4;
+ box-shadow: 0 0 2px 1px #b4b4b4;
}
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.html b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.html
index 2ef39f5..6a05325 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.html
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.html
@@ -18,28 +18,57 @@
<div class="sp-dialog-container">
<div class="sp-dialog-content p-15">
- <div>
- <div class="success-text" fxLayout="row" fxLayoutAlign="start center">
- <mat-icon color="accent" *ngIf="pipelineOperationStatus.success">done</mat-icon>
- <mat-icon color="accent" *ngIf="!pipelineOperationStatus.success">warning</mat-icon>
- <span> </span>
- {{pipelineOperationStatus.title}}
+ <div fxFlex="100" fxLayoutAlign="center center" fxLayout="column" *ngIf="operationInProgress">
+ <div fxLayoutAlign="center" >
+ <mat-spinner [diameter]="50"
+ fxLayoutAlign="center"
+ style="margin: 10px 0 5px 0" >{{action == 0 ? "Starting..." : "Stopping"}}
+ </mat-spinner>
+ </div>
+ <div fxLayoutAlign="center">
+ <h3>Please wait while the pipeline is {{action == 0 ? "starting..." : "stopping"}}...</h3>
+ </div>
+ </div>
+ <div *ngIf="!operationInProgress && forceStopActive" fxLayout="column" fxLayoutAlign="center center" fxFlex="100">
+ <div class="success-message" fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
+ <div fxLayout="row">
+ <mat-icon color="accent">done</mat-icon>
+ <span> Forced stop successful</span>
+ </div>
+ </div>
+ </div>
+ <div *ngIf="!operationInProgress && !forceStopActive" fxLayout="column" fxLayoutAlign="center center" fxFlex="100">
+ <div class="success-message" fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
+ <div fxLayout="row">
+ <mat-icon color="accent" *ngIf="pipelineOperationStatus.success">done</mat-icon>
+ <mat-icon style="color: red;" *ngIf="!pipelineOperationStatus.success">error</mat-icon>
+ <span> {{pipelineOperationStatus.title}}.</span>
+ </div>
+ <span *ngIf="action == 1 && !pipelineOperationStatus.success" class="message-small">You can perform a forced stop, which will stop and reset the pipeline status.</span>
+ </div>
+ <div fxLayout="row">
+ <button mat-button mat-raised-button class="mat-basic" (click)="toggleStatusDetailsVisible()">
+ <div *ngIf="!statusDetailsVisible">Show Details</div>
+ <div *ngIf="statusDetailsVisible">Hide Details</div>
+ </button>
+ <button mat-button mat-raised-button class="ml-10" color="primary" (click)="forceStopPipeline()" *ngIf="action == 1 && !pipelineOperationStatus.success">
+ <div *ngIf="!statusDetailsVisible">Force stop</div>
+ </button>
</div>
- <button mat-button mat-raised-button (click)="toggleStatusDetailsVisible()">
- <div *ngIf="!statusDetailsVisible">Show Details</div>
- <div *ngIf="statusDetailsVisible">Hide Details</div>
- </button>
- <div fxLayout="column" *ngIf="statusDetailsVisible">
- <div fxFlex="100" fxLayout="column" class="md-whiteframe-z2" *ngFor="let msg of pipelineOperationStatus.elementStatus">
- <div fxFlex="100" fxLayout="column" class="md-padding">
- <div>
- <b>{{msg.elementName}} </b> at URL <b> {{msg.elementId}}</b>
+ <div fxFlex="100" fxLayout="column" *ngIf="statusDetailsVisible" class="w-100">
+ <div fxFlex="100" fxLayout="column" class="mat-elevation-z1 mt-10" *ngFor="let msg of pipelineOperationStatus.elementStatus">
+ <div fxFlex="100" fxLayout="column" class="p-15">
+ <div fxFlex="100" fxLayout="row" fxLayoutAlign="start center">
<mat-icon color="accent" *ngIf="msg.success">done</mat-icon>
- <mat-icon color="accent" *ngIf="!msg.success">warning</mat-icon>
+ <mat-icon style="color:red;" *ngIf="!msg.success">warning</mat-icon>
+ <div fxFlex="100" fxLayout="column" class="ml-5">
+ <span><b>{{msg.elementName}}</b></span>
+ <small>{{msg.elementId.substr(0, msg.elementId.lastIndexOf("/"))}}</small>
+ </div>
</div>
<div>
- <div fxFlex="100" fxLayout="column" *ngIf="msg.optionalMessage">
+ <div fxFlex="100" fxLayout="column" *ngIf="msg.optionalMessage" class="mt-10">
<div class="error-message">
{{msg.optionalMessage}}
</div>
@@ -56,4 +85,4 @@
Close
</button>
</div>
-</div>
\ No newline at end of file
+</div>
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.scss b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.scss
index e11debc..e16093a 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.scss
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.scss
@@ -28,4 +28,34 @@
color: white;
padding: 10px;
width: 100%;
-}
\ No newline at end of file
+}
+
+.success-message {
+ text-align: center;
+ font-size: 14pt;
+ margin-top: 20px;
+ margin-bottom: 20px;
+}
+
+.message-small {
+ text-align: center;
+ font-size: 11pt;
+ margin-top: 10px;
+ margin-bottom: 10px;
+}
+
+.ml-5 {
+ margin-left: 5px;
+}
+
+.mt-10 {
+ margin-top: 10px;
+}
+
+.w-100 {
+ width: 100%;
+}
+
+.ml-10 {
+ margin-left: 10px;
+}
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
index cd5d103..ed65a5f 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
@@ -18,7 +18,10 @@
import {DialogRef} from "../../../core-ui/dialog/base-dialog/dialog-ref";
import {PipelineOperationStatus} from "../../../core-model/gen/streampipes-model";
-import {Component, Input} from "@angular/core";
+import {Component, Input, OnInit} from "@angular/core";
+import {PipelineAction} from "../../model/pipeline-model";
+import {PipelineService} from "../../../platform-services/apis/pipeline.service";
+import {ShepherdService} from "../../../services/tour/shepherd.service";
@Component({
@@ -26,22 +29,75 @@ import {Component, Input} from "@angular/core";
templateUrl: './pipeline-status-dialog.component.html',
styleUrls: ['./pipeline-status-dialog.component.scss']
})
-export class PipelineStatusDialogComponent {
+export class PipelineStatusDialogComponent implements OnInit {
statusDetailsVisible: any;
+ operationInProgress: boolean = true;
+ forceStopActive: boolean = false;
+ pipelineOperationStatus: PipelineOperationStatus;
@Input()
- pipelineOperationStatus: PipelineOperationStatus;
+ pipelineId: string;
+
+ @Input()
+ action: PipelineAction;
- constructor(private DialogRef: DialogRef<PipelineStatusDialogComponent>) {
+ constructor(private dialogRef: DialogRef<PipelineStatusDialogComponent>,
+ private pipelineService: PipelineService,
+ private shepherdService: ShepherdService) {
this.statusDetailsVisible = false;
}
+ ngOnInit(): void {
+ if (this.action == PipelineAction.Start) {
+ this.startPipeline();
+ } else {
+ this.stopPipeline();
+ }
+ }
+
close() {
- this.DialogRef.close();
+ this.dialogRef.close();
};
toggleStatusDetailsVisible() {
this.statusDetailsVisible = !(this.statusDetailsVisible);
}
-}
\ No newline at end of file
+
+ startPipeline() {
+ this.pipelineService.startPipeline(this.pipelineId).subscribe(msg => {
+ this.pipelineOperationStatus = msg;
+ this.operationInProgress = false;
+ if (this.shepherdService.isTourActive()) {
+ this.shepherdService.trigger("pipeline-started");
+ }
+ }, error => {
+ this.operationInProgress = false;
+ this.pipelineOperationStatus = {title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []};
+ });
+ }
+
+ stopPipeline() {
+ this.pipelineService.stopPipeline(this.pipelineId).subscribe(msg => {
+ this.pipelineOperationStatus = msg;
+ this.operationInProgress = false;
+ }, error => {
+ this.operationInProgress = false;
+ this.pipelineOperationStatus = {title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []};
+ });
+ }
+
+ forceStopPipeline() {
+ this.operationInProgress = true;
+ this.forceStopActive = true;
+ this.pipelineService.stopPipeline(this.pipelineId, true).subscribe(msg => {
+ this.pipelineOperationStatus = msg;
+ this.operationInProgress = false;
+ }, error => {
+ this.operationInProgress = false;
+ this.pipelineOperationStatus = {title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []};
+ });
+ }
+
+
+}
diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/StopPipeline.java b/ui/src/app/pipelines/model/pipeline-model.ts
similarity index 60%
rename from streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/StopPipeline.java
rename to ui/src/app/pipelines/model/pipeline-model.ts
index c781fb3..d3bed94 100644
--- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/StopPipeline.java
+++ b/ui/src/app/pipelines/model/pipeline-model.ts
@@ -16,18 +16,7 @@
*
*/
-package org.apache.streampipes.manager.pipeline;
-
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-public class StopPipeline {
-
- public static void main(String[] args) {
- Pipeline pipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline
- ("2720e901-73d8-4d9e-8508-6a89551fe6fe");
-
- Operations.stopPipeline(pipeline);
- }
+export enum PipelineAction {
+ Start,
+ Stop
}
diff --git a/ui/src/app/pipelines/services/pipeline-operations.service.ts b/ui/src/app/pipelines/services/pipeline-operations.service.ts
index 8cd8ef1..cd91d12 100644
--- a/ui/src/app/pipelines/services/pipeline-operations.service.ts
+++ b/ui/src/app/pipelines/services/pipeline-operations.service.ts
@@ -17,17 +17,16 @@
*/
import {ShepherdService} from "../../services/tour/shepherd.service";
-import {EventEmitter, Inject, Injectable} from "@angular/core";
+import {EventEmitter, Injectable} from "@angular/core";
import {PipelineService} from "../../platform-services/apis/pipeline.service";
import {PanelType} from "../../core-ui/dialog/base-dialog/base-dialog.model";
import {DialogService} from "../../core-ui/dialog/base-dialog/base-dialog.service";
import {PipelineStatusDialogComponent} from "../dialog/pipeline-status/pipeline-status-dialog.component";
-import {Pipeline, PipelineOperationStatus} from "../../core-model/gen/streampipes-model";
+import {Pipeline} from "../../core-model/gen/streampipes-model";
import {DeletePipelineDialogComponent} from "../dialog/delete-pipeline/delete-pipeline-dialog.component";
import {DialogRef} from "../../core-ui/dialog/base-dialog/dialog-ref";
import {Router} from "@angular/router";
-
-declare const require: any;
+import {PipelineAction} from "../model/pipeline-model";
@Injectable()
export class PipelineOperationsService {
@@ -48,32 +47,35 @@ export class PipelineOperationsService {
if (toggleRunningOperation) {
toggleRunningOperation('starting');
}
- this.PipelineService.startPipeline(pipelineId).subscribe(msg => {
+ let dialogRef = this.showPipelineOperationsDialog(pipelineId, PipelineAction.Start);
+ this.afterPipelineOperationsDialogClosed(dialogRef, refreshPipelinesEmitter, "starting", toggleRunningOperation);
+ }
+
+ stopPipeline(pipelineId: string,
+ refreshPipelinesEmitter: EventEmitter<boolean>,
+ toggleRunningOperation?) {
+ if (toggleRunningOperation) {
+ toggleRunningOperation('stopping');
+ }
+ let dialogRef = this.showPipelineOperationsDialog(pipelineId, PipelineAction.Stop);
+ this.afterPipelineOperationsDialogClosed(dialogRef, refreshPipelinesEmitter, "stopping", toggleRunningOperation);
+ }
+
+ afterPipelineOperationsDialogClosed(dialogRef: DialogRef<PipelineStatusDialogComponent>,
+ refreshPipelinesEmitter: EventEmitter<boolean>,
+ toggleAction: string,
+ toggleRunningOperation?) {
+ dialogRef.afterClosed().subscribe(msg => {
refreshPipelinesEmitter.emit(true);
if (toggleRunningOperation) {
- toggleRunningOperation('starting');
+ toggleRunningOperation(toggleAction);
}
- if (this.ShepherdService.isTourActive()) {
- this.ShepherdService.trigger("pipeline-started");
- }
- this.showDialog(msg);
- }, error => {
- this.showDialog({title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []})
- });
- };
-
- stopPipeline(pipelineId: string, refreshPipelinesEmitter: EventEmitter<boolean>, toggleRunningOperation?) {
- toggleRunningOperation('stopping');
- this.PipelineService.stopPipeline(pipelineId).subscribe(msg => {
- refreshPipelinesEmitter.emit(true);
- toggleRunningOperation('stopping');
- this.showDialog(msg);
- }, error => {
- this.showDialog({title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []})
});
- };
+ }
- showDeleteDialog(pipeline: Pipeline, refreshPipelinesEmitter: EventEmitter<boolean>, switchToPipelineView?: any) {
+ showDeleteDialog(pipeline: Pipeline,
+ refreshPipelinesEmitter: EventEmitter<boolean>,
+ switchToPipelineView?: any) {
let dialogRef: DialogRef<DeletePipelineDialogComponent> = this.DialogService.open(DeletePipelineDialogComponent, {
panelType: PanelType.STANDARD_PANEL,
title: "Delete Pipeline",
@@ -94,13 +96,15 @@ export class PipelineOperationsService {
})
};
- showDialog(data: PipelineOperationStatus) {
- this.DialogService.open(PipelineStatusDialogComponent, {
+ showPipelineOperationsDialog(pipelineId: string,
+ action: PipelineAction): DialogRef<PipelineStatusDialogComponent> {
+ return this.DialogService.open(PipelineStatusDialogComponent, {
panelType: PanelType.STANDARD_PANEL,
title: "Pipeline Status",
width: "70vw",
data: {
- "pipelineOperationStatus": data
+ "pipelineId": pipelineId,
+ "action": action
}
});
};
diff --git a/ui/src/app/platform-services/apis/pipeline.service.ts b/ui/src/app/platform-services/apis/pipeline.service.ts
index 7335496..060c911 100644
--- a/ui/src/app/platform-services/apis/pipeline.service.ts
+++ b/ui/src/app/platform-services/apis/pipeline.service.ts
@@ -27,6 +27,7 @@ import {
PipelineOperationStatus, PipelineStatusMessage
} from "../../core-model/gen/streampipes-model";
import {map} from "rxjs/operators";
+import {query} from "@angular/animations";
@Injectable()
export class PipelineService {
@@ -56,8 +57,14 @@ export class PipelineService {
.pipe(map(result => PipelineOperationStatus.fromData(result as PipelineOperationStatus)));
}
- stopPipeline(pipelineId): Observable<PipelineOperationStatus> {
- return this.http.get(this.platformServicesCommons.authUserBasePath() + "/pipelines/" + pipelineId + "/stop")
+ stopPipeline(pipelineId: string, forceStop?: boolean): Observable<PipelineOperationStatus> {
+ let queryAppendix = "";
+ if (forceStop) {
+ queryAppendix = "?forceStop=" + forceStop;
+ }
+ return this.http.get(this.platformServicesCommons.authUserBasePath()
+ + "/pipelines/" + pipelineId
+ + "/stop" + queryAppendix)
.pipe(map(result => PipelineOperationStatus.fromData(result as PipelineOperationStatus)));
}
@@ -104,4 +111,4 @@ export class PipelineService {
}));
}
-}
\ No newline at end of file
+}