You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/22 12:05:44 UTC

[incubator-streampipes] 02/02: [STREAMPIPES-367] Allow forced stop of pipelines that fail to be gracefully stopped

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit d04faace8554442f3b1a06b7d8d53a16437969b3
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat May 22 14:05:27 2021 +0200

    [STREAMPIPES-367] Allow forced stop of pipelines that fail to be gracefully stopped
---
 .../backend/StreamPipesBackendApplication.java     |  2 +-
 .../manager/execution/http/GraphSubmitter.java     |  5 +-
 .../manager/execution/http/PipelineExecutor.java   | 18 ++++--
 .../streampipes/manager/operations/Operations.java | 20 ++++---
 .../streampipes/rest/impl/PipelineResource.java    |  6 +-
 .../rest/impl/nouser/PipelineNoUserResource.java   |  2 +-
 .../rest/management/PipelineManagement.java        |  5 +-
 .../pipeline-overview.component.scss               |  2 +-
 .../pipeline-status-dialog.component.html          | 65 +++++++++++++++------
 .../pipeline-status-dialog.component.scss          | 32 +++++++++-
 .../pipeline-status-dialog.component.ts            | 68 ++++++++++++++++++++--
 .../src/app/pipelines/model/pipeline-model.ts      | 17 +-----
 .../services/pipeline-operations.service.ts        | 60 ++++++++++---------
 .../app/platform-services/apis/pipeline.service.ts | 13 ++++-
 14 files changed, 222 insertions(+), 93 deletions(-)

diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 790ee2b..048ffaa 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -95,7 +95,7 @@ public class StreamPipesBackendApplication {
     });
 
     LOG.info("Gracefully stopping all running pipelines...");
-    List<PipelineOperationStatus> status = Operations.stopAllPipelines();
+    List<PipelineOperationStatus> status = Operations.stopAllPipelines(true);
     status.forEach(s -> {
       if (s.isSuccess()) {
         LOG.info("Pipeline {} successfully stopped", s.getPipelineName());
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 82919a6..3eddef1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
@@ -42,10 +43,10 @@ public class GraphSubmitter {
                         String pipelineName,
                         List<InvocableStreamPipesEntity> graphs,
                         List<SpDataSet> dataSets) {
-    this.graphs = graphs;
+    this.graphs = graphs != null ? graphs : new ArrayList<>();
     this.pipelineId = pipelineId;
     this.pipelineName = pipelineName;
-    this.dataSets = dataSets;
+    this.dataSets = dataSets != null ? dataSets : new ArrayList<>();
   }
 
   public PipelineOperationStatus invokeGraphs() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index c283bad..582b6b1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -45,13 +45,18 @@ public class PipelineExecutor {
   private boolean visualize;
   private boolean storeStatus;
   private boolean monitor;
+  private boolean forceStop;
 
-  public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
-                          boolean monitor) {
+  public PipelineExecutor(Pipeline pipeline,
+                          boolean visualize,
+                          boolean storeStatus,
+                          boolean monitor,
+                          boolean forceStop) {
     this.pipeline = pipeline;
     this.visualize = visualize;
     this.storeStatus = storeStatus;
     this.monitor = monitor;
+    this.forceStop = forceStop;
   }
 
   public PipelineOperationStatus startPipeline() {
@@ -146,9 +151,6 @@ public class PipelineExecutor {
                 .getVisualizationStorageApi()
                 .deleteVisualization(pipeline.getPipelineId());
       }
-      if (storeStatus) {
-        setPipelineStopped(pipeline);
-      }
 
       PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
               new PipelineStatusMessage(pipeline.getPipelineId(),
@@ -157,6 +159,12 @@ public class PipelineExecutor {
                       PipelineStatusMessageType.PIPELINE_STOPPED.description()));
 
     }
+
+    if (status.isSuccess() || forceStop) {
+      if (storeStatus) {
+        setPipelineStopped(pipeline);
+      }
+    }
     return status;
   }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index a3696a3..c7f0295 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -121,31 +121,33 @@ public class Operations {
   public static PipelineOperationStatus startPipeline(
           Pipeline pipeline, boolean visualize, boolean storeStatus,
           boolean monitor) {
-    return new PipelineExecutor(pipeline, visualize, storeStatus, monitor).startPipeline();
+    return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, false).startPipeline();
   }
 
   public static PipelineOperationStatus stopPipeline(
-          Pipeline pipeline) {
-    return stopPipeline(pipeline, true, true, false);
+          Pipeline pipeline, boolean forceStop) {
+    return stopPipeline(pipeline, true, true, false, forceStop);
   }
 
-  public static List<PipelineOperationStatus> stopAllPipelines() {
+  public static List<PipelineOperationStatus> stopAllPipelines(boolean forceStop) {
     List<PipelineOperationStatus> status = new ArrayList<>();
     List<Pipeline> pipelines =
             StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines();
 
     pipelines.forEach(p -> {
       if (p.isRunning()) {
-        status.add(Operations.stopPipeline(p));
+        status.add(Operations.stopPipeline(p, forceStop));
       }
     });
     return status;
   }
 
-  public static PipelineOperationStatus stopPipeline(
-          Pipeline pipeline, boolean visualize, boolean storeStatus,
-          boolean monitor) {
-    return new PipelineExecutor(pipeline, visualize, storeStatus, monitor).stopPipeline();
+  public static PipelineOperationStatus stopPipeline(Pipeline pipeline,
+                                                     boolean visualize,
+                                                     boolean storeStatus,
+                                                     boolean monitor,
+                                                     boolean forceStop) {
+    return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, forceStop).stopPipeline();
   }
 
   public static List<RdfEndpointItem> getEndpointUriContents(List<RdfEndpoint> endpoints) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index 093f9c5..1850c09 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -154,10 +154,12 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @JacksonSerialized
   @Operation(summary = "Stop the pipeline with the given id",
           tags = {"Pipeline"})
-  public Response stop(@PathParam("username") String username, @PathParam("pipelineId") String pipelineId) {
+  public Response stop(@PathParam("username") String username,
+                       @PathParam("pipelineId") String pipelineId,
+                       @QueryParam("forceStop") @DefaultValue("false") boolean forceStop) {
     logger.info("User: " + username + " stopped pipeline: " + pipelineId);
     PipelineManagement pm = new PipelineManagement();
-    return pm.stopPipeline(pipelineId);
+    return pm.stopPipeline(pipelineId, forceStop);
   }
 
   @POST
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineNoUserResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineNoUserResource.java
index dfc0d79..8a31a1c 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineNoUserResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineNoUserResource.java
@@ -45,6 +45,6 @@ public class PipelineNoUserResource extends AbstractRestResource {
     public Response stop(@PathParam("pipelineId") String pipelineId) {
         logger.info("Pipeline: " + pipelineId + " was stopped by the system");
         PipelineManagement pm = new PipelineManagement();
-        return pm.stopPipeline(pipelineId);
+        return pm.stopPipeline(pipelineId, false);
     }
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/management/PipelineManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/management/PipelineManagement.java
index 64d1176..1b80671 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/management/PipelineManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/management/PipelineManagement.java
@@ -29,10 +29,11 @@ import javax.ws.rs.core.Response;
 
 public class PipelineManagement extends AbstractRestResource {
 
-    public Response stopPipeline(String pipelineId) {
+    public Response stopPipeline(String pipelineId,
+                                 boolean forceStop) {
         try {
             Pipeline pipeline = getPipelineStorage().getPipeline(pipelineId);
-            PipelineOperationStatus status = Operations.stopPipeline(pipeline);
+            PipelineOperationStatus status = Operations.stopPipeline(pipeline, forceStop);
             return ok(status);
         } catch
                 (Exception e) {
diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss
index 13ac91e..52ee77f 100644
--- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss
+++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss
@@ -63,5 +63,5 @@
 
 .light-neutral {
   background-color: #b4b4b4;
-  box-shadow: 0 0 5px 2px #b4b4b4;
+  box-shadow: 0 0 2px 1px #b4b4b4;
 }
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.html b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.html
index 2ef39f5..6a05325 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.html
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.html
@@ -18,28 +18,57 @@
 
 <div class="sp-dialog-container">
     <div class="sp-dialog-content p-15">
-        <div>
-            <div class="success-text" fxLayout="row" fxLayoutAlign="start center">
-                <mat-icon color="accent" *ngIf="pipelineOperationStatus.success">done</mat-icon>
-                <mat-icon color="accent" *ngIf="!pipelineOperationStatus.success">warning</mat-icon>
-                <span>&nbsp;&nbsp;</span>
-                {{pipelineOperationStatus.title}}
+        <div fxFlex="100" fxLayoutAlign="center center" fxLayout="column" *ngIf="operationInProgress">
+            <div fxLayoutAlign="center" >
+                <mat-spinner [diameter]="50"
+                             fxLayoutAlign="center"
+                             style="margin: 10px 0 5px 0" >{{action == 0 ? "Starting..." : "Stopping"}}
+                </mat-spinner>
+            </div>
+            <div fxLayoutAlign="center">
+                <h3>Please wait while the pipeline is {{action == 0 ? "starting..." : "stopping"}}...</h3>
+            </div>
+        </div>
+        <div *ngIf="!operationInProgress && forceStopActive" fxLayout="column" fxLayoutAlign="center center" fxFlex="100">
+            <div class="success-message" fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
+                <div fxLayout="row">
+                    <mat-icon color="accent">done</mat-icon>
+                    <span>&nbsp;Forced stop successful</span>
+                </div>
+            </div>
+        </div>
+        <div *ngIf="!operationInProgress && !forceStopActive" fxLayout="column" fxLayoutAlign="center center" fxFlex="100">
+            <div class="success-message" fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
+                <div fxLayout="row">
+                    <mat-icon color="accent" *ngIf="pipelineOperationStatus.success">done</mat-icon>
+                    <mat-icon style="color: red;" *ngIf="!pipelineOperationStatus.success">error</mat-icon>
+                    <span>&nbsp;{{pipelineOperationStatus.title}}.</span>
+                </div>
+                <span *ngIf="action == 1 && !pipelineOperationStatus.success" class="message-small">You can perform a forced stop, which will stop and reset the pipeline status.</span>
+            </div>
+            <div fxLayout="row">
+                <button mat-button mat-raised-button class="mat-basic" (click)="toggleStatusDetailsVisible()">
+                    <div *ngIf="!statusDetailsVisible">Show Details</div>
+                    <div *ngIf="statusDetailsVisible">Hide Details</div>
+                </button>
+                <button mat-button mat-raised-button class="ml-10" color="primary" (click)="forceStopPipeline()" *ngIf="action == 1 && !pipelineOperationStatus.success">
+                    <div *ngIf="!statusDetailsVisible">Force stop</div>
+                </button>
             </div>
-            <button mat-button mat-raised-button (click)="toggleStatusDetailsVisible()">
-                <div *ngIf="!statusDetailsVisible">Show Details</div>
-                <div *ngIf="statusDetailsVisible">Hide Details</div>
-            </button>
 
-            <div fxLayout="column" *ngIf="statusDetailsVisible">
-                <div fxFlex="100" fxLayout="column" class="md-whiteframe-z2" *ngFor="let msg of pipelineOperationStatus.elementStatus">
-                    <div fxFlex="100" fxLayout="column" class="md-padding">
-                        <div>
-                            <b>{{msg.elementName}} </b> at URL <b> {{msg.elementId}}</b>
+            <div fxFlex="100" fxLayout="column" *ngIf="statusDetailsVisible" class="w-100">
+                <div fxFlex="100" fxLayout="column" class="mat-elevation-z1 mt-10" *ngFor="let msg of pipelineOperationStatus.elementStatus">
+                    <div fxFlex="100" fxLayout="column" class="p-15">
+                        <div fxFlex="100" fxLayout="row" fxLayoutAlign="start center">
                             <mat-icon color="accent" *ngIf="msg.success">done</mat-icon>
-                            <mat-icon color="accent" *ngIf="!msg.success">warning</mat-icon>
+                            <mat-icon style="color:red;" *ngIf="!msg.success">warning</mat-icon>
+                            <div fxFlex="100" fxLayout="column" class="ml-5">
+                                <span><b>{{msg.elementName}}</b></span>
+                                <small>{{msg.elementId.substr(0, msg.elementId.lastIndexOf("/"))}}</small>
+                            </div>
                         </div>
                         <div>
-                            <div fxFlex="100" fxLayout="column" *ngIf="msg.optionalMessage">
+                            <div fxFlex="100" fxLayout="column" *ngIf="msg.optionalMessage" class="mt-10">
                                 <div class="error-message">
                                     {{msg.optionalMessage}}
                                 </div>
@@ -56,4 +85,4 @@
             Close
         </button>
     </div>
-</div>
\ No newline at end of file
+</div>
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.scss b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.scss
index e11debc..e16093a 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.scss
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.scss
@@ -28,4 +28,34 @@
   color: white;
   padding: 10px;
   width: 100%;
-}
\ No newline at end of file
+}
+
+.success-message {
+  text-align: center;
+  font-size: 14pt;
+  margin-top: 20px;
+  margin-bottom: 20px;
+}
+
+.message-small {
+  text-align: center;
+  font-size: 11pt;
+  margin-top: 10px;
+  margin-bottom: 10px;
+}
+
+.ml-5 {
+  margin-left: 5px;
+}
+
+.mt-10 {
+  margin-top: 10px;
+}
+
+.w-100 {
+  width: 100%;
+}
+
+.ml-10 {
+  margin-left: 10px;
+}
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
index cd5d103..ed65a5f 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
@@ -18,7 +18,10 @@
 
 import {DialogRef} from "../../../core-ui/dialog/base-dialog/dialog-ref";
 import {PipelineOperationStatus} from "../../../core-model/gen/streampipes-model";
-import {Component, Input} from "@angular/core";
+import {Component, Input, OnInit} from "@angular/core";
+import {PipelineAction} from "../../model/pipeline-model";
+import {PipelineService} from "../../../platform-services/apis/pipeline.service";
+import {ShepherdService} from "../../../services/tour/shepherd.service";
 
 
 @Component({
@@ -26,22 +29,75 @@ import {Component, Input} from "@angular/core";
     templateUrl: './pipeline-status-dialog.component.html',
     styleUrls: ['./pipeline-status-dialog.component.scss']
 })
-export class PipelineStatusDialogComponent {
+export class PipelineStatusDialogComponent implements OnInit {
 
     statusDetailsVisible: any;
+    operationInProgress: boolean = true;
+    forceStopActive: boolean = false;
+    pipelineOperationStatus: PipelineOperationStatus;
 
     @Input()
-    pipelineOperationStatus: PipelineOperationStatus;
+    pipelineId: string;
+
+    @Input()
+    action: PipelineAction;
 
-    constructor(private DialogRef: DialogRef<PipelineStatusDialogComponent>) {
+    constructor(private dialogRef: DialogRef<PipelineStatusDialogComponent>,
+                private pipelineService: PipelineService,
+                private shepherdService: ShepherdService) {
         this.statusDetailsVisible = false;
     }
 
+    ngOnInit(): void {
+        if (this.action == PipelineAction.Start) {
+            this.startPipeline();
+        } else {
+            this.stopPipeline();
+        }
+    }
+
     close() {
-        this.DialogRef.close();
+        this.dialogRef.close();
     };
 
     toggleStatusDetailsVisible() {
         this.statusDetailsVisible = !(this.statusDetailsVisible);
     }
-}
\ No newline at end of file
+
+    startPipeline() {
+        this.pipelineService.startPipeline(this.pipelineId).subscribe(msg => {
+           this.pipelineOperationStatus = msg;
+           this.operationInProgress = false;
+            if (this.shepherdService.isTourActive()) {
+                this.shepherdService.trigger("pipeline-started");
+            }
+        }, error => {
+            this.operationInProgress = false;
+            this.pipelineOperationStatus = {title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []};
+        });
+    }
+
+    stopPipeline() {
+        this.pipelineService.stopPipeline(this.pipelineId).subscribe(msg => {
+           this.pipelineOperationStatus = msg;
+           this.operationInProgress = false;
+        }, error => {
+            this.operationInProgress = false;
+            this.pipelineOperationStatus = {title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []};
+        });
+    }
+
+    forceStopPipeline() {
+        this.operationInProgress = true;
+        this.forceStopActive = true;
+        this.pipelineService.stopPipeline(this.pipelineId, true).subscribe(msg => {
+            this.pipelineOperationStatus = msg;
+            this.operationInProgress = false;
+        }, error => {
+            this.operationInProgress = false;
+            this.pipelineOperationStatus = {title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []};
+        });
+    }
+
+
+}
diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/StopPipeline.java b/ui/src/app/pipelines/model/pipeline-model.ts
similarity index 60%
rename from streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/StopPipeline.java
rename to ui/src/app/pipelines/model/pipeline-model.ts
index c781fb3..d3bed94 100644
--- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/StopPipeline.java
+++ b/ui/src/app/pipelines/model/pipeline-model.ts
@@ -16,18 +16,7 @@
  *
  */
 
-package org.apache.streampipes.manager.pipeline;
-
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-public class StopPipeline {
-
-    public static void main(String[] args) {
-        Pipeline pipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline
-                ("2720e901-73d8-4d9e-8508-6a89551fe6fe");
-
-        Operations.stopPipeline(pipeline);
-    }
+export enum PipelineAction {
+  Start,
+  Stop
 }
diff --git a/ui/src/app/pipelines/services/pipeline-operations.service.ts b/ui/src/app/pipelines/services/pipeline-operations.service.ts
index 8cd8ef1..cd91d12 100644
--- a/ui/src/app/pipelines/services/pipeline-operations.service.ts
+++ b/ui/src/app/pipelines/services/pipeline-operations.service.ts
@@ -17,17 +17,16 @@
  */
 
 import {ShepherdService} from "../../services/tour/shepherd.service";
-import {EventEmitter, Inject, Injectable} from "@angular/core";
+import {EventEmitter, Injectable} from "@angular/core";
 import {PipelineService} from "../../platform-services/apis/pipeline.service";
 import {PanelType} from "../../core-ui/dialog/base-dialog/base-dialog.model";
 import {DialogService} from "../../core-ui/dialog/base-dialog/base-dialog.service";
 import {PipelineStatusDialogComponent} from "../dialog/pipeline-status/pipeline-status-dialog.component";
-import {Pipeline, PipelineOperationStatus} from "../../core-model/gen/streampipes-model";
+import {Pipeline} from "../../core-model/gen/streampipes-model";
 import {DeletePipelineDialogComponent} from "../dialog/delete-pipeline/delete-pipeline-dialog.component";
 import {DialogRef} from "../../core-ui/dialog/base-dialog/dialog-ref";
 import {Router} from "@angular/router";
-
-declare const require: any;
+import {PipelineAction} from "../model/pipeline-model";
 
 @Injectable()
 export class PipelineOperationsService {
@@ -48,32 +47,35 @@ export class PipelineOperationsService {
     if (toggleRunningOperation) {
       toggleRunningOperation('starting');
     }
-    this.PipelineService.startPipeline(pipelineId).subscribe(msg => {
+    let dialogRef = this.showPipelineOperationsDialog(pipelineId, PipelineAction.Start);
+    this.afterPipelineOperationsDialogClosed(dialogRef, refreshPipelinesEmitter, "starting", toggleRunningOperation);
+  }
+
+  stopPipeline(pipelineId: string,
+               refreshPipelinesEmitter: EventEmitter<boolean>,
+               toggleRunningOperation?) {
+    if (toggleRunningOperation) {
+      toggleRunningOperation('stopping');
+    }
+    let dialogRef = this.showPipelineOperationsDialog(pipelineId, PipelineAction.Stop);
+    this.afterPipelineOperationsDialogClosed(dialogRef, refreshPipelinesEmitter, "stopping", toggleRunningOperation);
+  }
+
+  afterPipelineOperationsDialogClosed(dialogRef: DialogRef<PipelineStatusDialogComponent>,
+                                      refreshPipelinesEmitter: EventEmitter<boolean>,
+                                      toggleAction: string,
+                                      toggleRunningOperation?) {
+    dialogRef.afterClosed().subscribe(msg => {
       refreshPipelinesEmitter.emit(true);
       if (toggleRunningOperation) {
-        toggleRunningOperation('starting');
+        toggleRunningOperation(toggleAction);
       }
-      if (this.ShepherdService.isTourActive()) {
-        this.ShepherdService.trigger("pipeline-started");
-      }
-      this.showDialog(msg);
-    }, error => {
-      this.showDialog({title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []})
-    });
-  };
-
-  stopPipeline(pipelineId: string, refreshPipelinesEmitter: EventEmitter<boolean>, toggleRunningOperation?) {
-    toggleRunningOperation('stopping');
-    this.PipelineService.stopPipeline(pipelineId).subscribe(msg => {
-      refreshPipelinesEmitter.emit(true);
-      toggleRunningOperation('stopping');
-      this.showDialog(msg);
-    }, error => {
-      this.showDialog({title: "Network Error", success: false, pipelineId: undefined, pipelineName: undefined, elementStatus: []})
     });
-  };
+  }
 
-  showDeleteDialog(pipeline: Pipeline, refreshPipelinesEmitter: EventEmitter<boolean>, switchToPipelineView?: any) {
+  showDeleteDialog(pipeline: Pipeline,
+                   refreshPipelinesEmitter: EventEmitter<boolean>,
+                   switchToPipelineView?: any) {
     let dialogRef: DialogRef<DeletePipelineDialogComponent> = this.DialogService.open(DeletePipelineDialogComponent, {
       panelType: PanelType.STANDARD_PANEL,
       title: "Delete Pipeline",
@@ -94,13 +96,15 @@ export class PipelineOperationsService {
     })
   };
 
-  showDialog(data: PipelineOperationStatus) {
-    this.DialogService.open(PipelineStatusDialogComponent, {
+  showPipelineOperationsDialog(pipelineId: string,
+                               action: PipelineAction): DialogRef<PipelineStatusDialogComponent> {
+    return this.DialogService.open(PipelineStatusDialogComponent, {
       panelType: PanelType.STANDARD_PANEL,
       title: "Pipeline Status",
       width: "70vw",
       data: {
-        "pipelineOperationStatus": data
+        "pipelineId": pipelineId,
+        "action": action
       }
     });
   };
diff --git a/ui/src/app/platform-services/apis/pipeline.service.ts b/ui/src/app/platform-services/apis/pipeline.service.ts
index 7335496..060c911 100644
--- a/ui/src/app/platform-services/apis/pipeline.service.ts
+++ b/ui/src/app/platform-services/apis/pipeline.service.ts
@@ -27,6 +27,7 @@ import {
   PipelineOperationStatus, PipelineStatusMessage
 } from "../../core-model/gen/streampipes-model";
 import {map} from "rxjs/operators";
+import {query} from "@angular/animations";
 
 @Injectable()
 export class PipelineService {
@@ -56,8 +57,14 @@ export class PipelineService {
         .pipe(map(result => PipelineOperationStatus.fromData(result as PipelineOperationStatus)));
   }
 
-  stopPipeline(pipelineId): Observable<PipelineOperationStatus> {
-    return this.http.get(this.platformServicesCommons.authUserBasePath() + "/pipelines/" + pipelineId + "/stop")
+  stopPipeline(pipelineId: string, forceStop?: boolean): Observable<PipelineOperationStatus> {
+    let queryAppendix = "";
+    if (forceStop) {
+      queryAppendix = "?forceStop=" + forceStop;
+    }
+    return this.http.get(this.platformServicesCommons.authUserBasePath()
+        + "/pipelines/" + pipelineId
+        + "/stop" + queryAppendix)
         .pipe(map(result => PipelineOperationStatus.fromData(result as PipelineOperationStatus)));
   }
 
@@ -104,4 +111,4 @@ export class PipelineService {
         }));
   }
 
-}
\ No newline at end of file
+}