You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/15 20:05:59 UTC

[incubator-streampipes] 01/03: [STREAMPIPES-494] Update endpoint styles

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c53d2f233f87c84d389b0a8401e193e2d164eef5
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Jan 14 09:52:26 2022 +0100

    [STREAMPIPES-494] Update endpoint styles
---
 .../v2/pipeline/CheckCompletedVisitor.java         |  10 +
 .../v2/pipeline/UpdateStaticPropertiesVisitor.java |  10 +
 .../pipeline-assembly.component.ts                 |   1 +
 .../components/pipeline/pipeline.component.ts      |  32 ++-
 ui/src/app/editor/editor.module.ts                 | 128 +++++-----
 .../app/editor/services/jsplumb-bridge.service.ts  |   4 +-
 .../app/editor/services/jsplumb-config.service.ts  | 270 ++++++++++++---------
 .../app/editor/services/pipeline-style.service.ts  |  82 +++++++
 8 files changed, 343 insertions(+), 194 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
index 885ab77..113c8c6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
@@ -107,6 +107,16 @@ public class CheckCompletedVisitor extends DefaultStaticPropertyVisitor {
 
   }
 
+  @Override
+  public void visit(SlideToggleStaticProperty slideToggleStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(RuntimeResolvableTreeInputStaticProperty treeInputStaticProperty) {
+
+  }
+
   public List<PipelineElementValidationInfo> getValidationInfos() {
     return this.validationInfos;
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateStaticPropertiesVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateStaticPropertiesVisitor.java
index 18cb7a2..4787a69 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateStaticPropertiesVisitor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateStaticPropertiesVisitor.java
@@ -96,6 +96,16 @@ public class UpdateStaticPropertiesVisitor extends DefaultStaticPropertyVisitor
 
   }
 
+  @Override
+  public void visit(SlideToggleStaticProperty slideToggleStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(RuntimeResolvableTreeInputStaticProperty treeInputStaticProperty) {
+
+  }
+
   private void updateMappingProperty(MappingProperty mappingProperty) {
     AbstractRequirementsSelectorGenerator generator = RequirementsSelectorGeneratorFactory
             .getRequirementsSelector(
diff --git a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts
index 28f6b40..bc137ba 100644
--- a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts
+++ b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts
@@ -282,6 +282,7 @@ export class PipelineAssemblyComponent implements OnInit, AfterViewInit {
                 this.pipelineValid = this.pipelineValidationService
                     .isValidPipeline(this.rawPipelineModel.filter(pe => !(pe.settings.disabled)), false);
             });
+            this.pipelineComponent.triggerPipelineModification();
         });
     }
 
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index de159ca..05d860d 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -46,6 +46,7 @@ import { JsplumbFactoryService } from '../../services/jsplumb-factory.service';
 import { PipelinePositioningService } from '../../services/pipeline-positioning.service';
 import { EVENT_CONNECTION_ABORT, EVENT_CONNECTION_DRAG } from '@jsplumb/browser-ui';
 import { EVENT_CONNECTION, EVENT_CONNECTION_DETACHED, EVENT_CONNECTION_MOVED } from '@jsplumb/core';
+import { PipelineStyleService } from "../../services/pipeline-style.service";
 
 @Component({
   selector: 'pipeline',
@@ -104,6 +105,7 @@ export class PipelineComponent implements OnInit, OnDestroy {
               private objectProvider: ObjectProvider,
               private editorService: EditorService,
               private shepherdService: ShepherdService,
+              private pipelineStyleService: PipelineStyleService,
               private pipelineValidationService: PipelineValidationService,
               private dialogService: DialogService,
               private dialog: MatDialog,
@@ -325,9 +327,7 @@ export class PipelineComponent implements OnInit, OnDestroy {
             pe.settings.loadingStatus = false;
             const edgeValidations = this.getTargetEdgeValidations(pipelineModificationMessage, info.target.id);
             const currentConnectionValid = this.currentConnectionValid(pe, edgeValidations);
-            this.processEdgeValidations(pipelineModificationMessage, info.target.id);
             if (currentConnectionValid) {
-              info.targetEndpoint.setType('token');
               this.validatePipeline();
               this.modifyPipeline(pipelineModificationMessage);
               if (this.jsplumbService.isFullyConnected(pe, this.preview)) {
@@ -343,12 +343,12 @@ export class PipelineComponent implements OnInit, OnDestroy {
             } else {
               this.JsplumbBridge.detach(info.connection);
               const invalidEdgeValidation = edgeValidations.find(e => e.sourceId === info.source.id);
-              console.log(invalidEdgeValidation);
               if (invalidEdgeValidation) {
                 this.showMatchingErrorDialog(invalidEdgeValidation.status.notifications);
               }
             }
           }, status => {
+            pe.settings.loadingStatus = false;
             this.showErrorDialog(status.error.title, status.error.description);
           });
       }
@@ -371,25 +371,29 @@ export class PipelineComponent implements OnInit, OnDestroy {
     return edgeValidations.filter(ev => ev.targetId === targetDomId);
   }
 
-  processEdgeValidations(pipelineModificationMessage: PipelineModificationMessage,
-                         targetDomId: string) {
-
-  }
-
   modifyPipeline(pm: PipelineModificationMessage) {
     if (pm.pipelineModifications) {
       pm.pipelineModifications.forEach(modification => {
         const id = modification.domId;
         if (id !== 'undefined') {
           const pe = this.objectProvider.findElement(id, this.rawPipelineModel);
-          (pe.payload as InvocablePipelineElementUnion).staticProperties = modification.staticProperties;
-          if (pe.payload instanceof DataProcessorInvocation) {
+          if (modification.staticProperties) {
+            (pe.payload as InvocablePipelineElementUnion).staticProperties = modification.staticProperties;
+          }
+          if (pe.payload instanceof DataProcessorInvocation && modification.outputStrategies) {
             (pe.payload as DataProcessorInvocation).outputStrategies = modification.outputStrategies;
           }
-          (pe.payload as InvocablePipelineElementUnion).inputStreams = modification.inputStreams;
+          if (modification.inputStreams) {
+            (pe.payload as InvocablePipelineElementUnion).inputStreams = modification.inputStreams;
+          }
         }
       });
     }
+    if (pm.edgeValidations) {
+      this.pipelineStyleService.updateAllConnectorStyles(pm.edgeValidations);
+      this.pipelineStyleService.updateAllEndpointStyles(pm.edgeValidations);
+      this.JsplumbBridge.repaintEverything();
+    }
   }
 
   isCustomOutput(pe) {
@@ -460,7 +464,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
           if (!(pipelineElementConfig.payload instanceof DataSinkInvocation)) {
             this.JsplumbBridge.activateEndpoint('out-' + pipelineElementConfig.payload.dom, pipelineElementConfig.settings.completed);
           }
-          this.JsplumbBridge.getSourceEndpoint(pipelineElementConfig.payload.dom).toggleType('token');
           this.triggerPipelineCacheUpdate();
           this.announceConfiguredElement(pipelineElementConfig);
           if (this.previewModeActive) {
@@ -500,4 +503,9 @@ export class PipelineComponent implements OnInit, OnDestroy {
       });
     }
   }
+
+  triggerPipelineModification() {
+    this.currentPipelineModel = this.objectProvider.makePipeline(this.rawPipelineModel);
+    this.objectProvider.updatePipeline(this.currentPipelineModel).subscribe(pm => this.modifyPipeline(pm));
+  }
 }
diff --git a/ui/src/app/editor/editor.module.ts b/ui/src/app/editor/editor.module.ts
index 3ff9973..8725e3a 100644
--- a/ui/src/app/editor/editor.module.ts
+++ b/ui/src/app/editor/editor.module.ts
@@ -65,78 +65,80 @@ import { JsplumbEndpointService } from './services/jsplumb-endpoint.service';
 import { JsplumbFactoryService } from './services/jsplumb-factory.service';
 import { PipelineElementPreviewComponent } from './components/pipeline-element-preview/pipeline-element-preview.component';
 import { PipelineElementDiscoveryComponent } from './dialog/pipeline-element-discovery/pipeline-element-discovery.component';
+import { PipelineStyleService } from './services/pipeline-style.service';
 
 @NgModule({
-    imports: [
-        CoreUiModule,
-        CommonModule,
-        ConnectModule,
-        MatTabsModule,
-        MatListModule,
-        FlexLayoutModule,
-        GridsterModule,
-        CommonModule,
-        FlexLayoutModule,
-        CustomMaterialModule,
-        FormsModule,
-        MatProgressSpinnerModule,
-        ShowdownModule,
-        ReactiveFormsModule,
-    ],
-    declarations: [
-        CompatibleElementsComponent,
-        CustomizeComponent,
-        CustomOutputStrategyComponent,
-        EditorComponent,
-        EnabledPipelineElementFilter,
-        HelpComponent,
-        MatchingErrorComponent,
-        MissingElementsForTutorialComponent,
-        OutputStrategyComponent,
-        UserDefinedOutputStrategyComponent,
-        PipelineAssemblyComponent,
-        PipelineElementComponent,
-        PipelineElementDiscoveryComponent,
-        PipelineElementDocumentationComponent,
-        PipelineElementIconStandComponent,
-        PipelineElementOptionsComponent,
-        PipelineElementPreviewComponent,
-        PipelineElementRecommendationComponent,
-        PipelineElementTemplateConfigComponent,
-        PipelineComponent,
-        PropertySelectionComponent,
-        SavePipelineComponent,
-        SafeCss,
-        WelcomeTourComponent
-    ],
-    providers: [
-        EditorService,
-        SemanticTypeUtilsService,
-        JsplumbFactoryService,
-        JsplumbEndpointService,
-        JsplumbService,
-        JsplumbConfigService,
-        ObjectProvider,
-        PipelineCanvasScrollingService,
-        PipelineElementDraggedService,
-        PipelineEditorService,
-        PipelinePositioningService,
-        PipelineValidationService,
-        PipelineElementRecommendationService,
-        SafeCss,
-    ],
+  imports: [
+    CoreUiModule,
+    CommonModule,
+    ConnectModule,
+    MatTabsModule,
+    MatListModule,
+    FlexLayoutModule,
+    GridsterModule,
+    CommonModule,
+    FlexLayoutModule,
+    CustomMaterialModule,
+    FormsModule,
+    MatProgressSpinnerModule,
+    ShowdownModule,
+    ReactiveFormsModule,
+  ],
+  declarations: [
+    CompatibleElementsComponent,
+    CustomizeComponent,
+    CustomOutputStrategyComponent,
+    EditorComponent,
+    EnabledPipelineElementFilter,
+    HelpComponent,
+    MatchingErrorComponent,
+    MissingElementsForTutorialComponent,
+    OutputStrategyComponent,
+    UserDefinedOutputStrategyComponent,
+    PipelineAssemblyComponent,
+    PipelineElementComponent,
+    PipelineElementDiscoveryComponent,
+    PipelineElementDocumentationComponent,
+    PipelineElementIconStandComponent,
+    PipelineElementOptionsComponent,
+    PipelineElementPreviewComponent,
+    PipelineElementRecommendationComponent,
+    PipelineElementTemplateConfigComponent,
+    PipelineComponent,
+    PropertySelectionComponent,
+    SavePipelineComponent,
+    SafeCss,
+    WelcomeTourComponent
+  ],
+  providers: [
+    EditorService,
+    SemanticTypeUtilsService,
+    JsplumbFactoryService,
+    JsplumbEndpointService,
+    JsplumbService,
+    JsplumbConfigService,
+    ObjectProvider,
+    PipelineCanvasScrollingService,
+    PipelineElementDraggedService,
+    PipelineEditorService,
+    PipelinePositioningService,
+    PipelineStyleService,
+    PipelineValidationService,
+    PipelineElementRecommendationService,
+    SafeCss,
+  ],
   exports: [
     EditorComponent,
     PipelineComponent,
     PipelineElementComponent
   ],
-    entryComponents: [
-        EditorComponent
-    ]
+  entryComponents: [
+    EditorComponent
+  ]
 })
 export class EditorModule {
 
-    constructor() {
-    }
+  constructor() {
+  }
 
 }
diff --git a/ui/src/app/editor/services/jsplumb-bridge.service.ts b/ui/src/app/editor/services/jsplumb-bridge.service.ts
index cc607c0..d0a63f8 100644
--- a/ui/src/app/editor/services/jsplumb-bridge.service.ts
+++ b/ui/src/app/editor/services/jsplumb-bridge.service.ts
@@ -30,11 +30,13 @@ export class JsplumbBridge {
     }
 
     activateEndpointWithType(endpointId: string, endpointEnabled: boolean, endpointType: string) {
+        console.log("activate endpoint");
         this.activateEndpoint(endpointId, endpointEnabled);
         this.setEndpointType(endpointId, endpointType);
     }
 
     setEndpointType(endpointId: string, endpointType: string) {
+        console.log("set endpoint type");
         const endpoint = this.getEndpointById(endpointId);
         // @ts-ignore
         endpoint.setType(endpointType);
@@ -91,7 +93,7 @@ export class JsplumbBridge {
 
     getTargetEndpoint(id) {
         // @ts-ignore
-        return this.jsPlumbInstance.selectEndpoints({target: id});
+        return this.jsPlumbInstance.selectEndpoints({target: document.getElementById(id)});
     }
 
     getEndpointCount(id) {
diff --git a/ui/src/app/editor/services/jsplumb-config.service.ts b/ui/src/app/editor/services/jsplumb-config.service.ts
index 77735a7..d6a2c16 100644
--- a/ui/src/app/editor/services/jsplumb-config.service.ts
+++ b/ui/src/app/editor/services/jsplumb-config.service.ts
@@ -19,127 +19,161 @@
 import { Injectable } from '@angular/core';
 import { JsplumbSettings } from '../model/jsplumb.model';
 import { BezierConnector } from '@jsplumb/connector-bezier';
-import { EndpointTypeDescriptor } from '@jsplumb/core';
+import { ArrowOverlay, EndpointTypeDescriptor } from '@jsplumb/core';
+import { ArrowOverlayOptions } from "@jsplumb/common";
 
 @Injectable()
 export class JsplumbConfigService {
 
-    constructor() {
-    }
-
-    getEditorConfig() {
-        return this.makeConfig(this.makeSettings(12, 5, 30, 30, 2, 80));
-    }
-
-    getPreviewConfig() {
-        return this.makeConfig(this.makeSettings(6, 2, 15, 15, 1, 40));
-    }
-
-    getEndpointTypeConfig(): Record<string, EndpointTypeDescriptor> {
-        return {
-            'empty': {
-                paintStyle: {
-                    fill: 'white',
-                    stroke: '#9E9E9E',
-                    strokeWidth: 2,
-                }
-            },
-            'token': {
-                paintStyle: {
-                    fill: '#BDBDBD',
-                    stroke: '#9E9E9E',
-                    strokeWidth: 2
-                },
-                hoverPaintStyle: {
-                    fill: '#BDBDBD',
-                    stroke: '#4CAF50',
-                    strokeWidth: 4,
-                }
-            },
-            'highlight': {
-                paintStyle: {
-                    fill: 'white',
-                    stroke: '#4CAF50',
-                    strokeWidth: 4
-                }
-            }
-        };
-    }
-
-    makeConfig(settings) {
-        const config = {} as any;
-        config.streamEndpointOptions = this.makeStreamEndpointOptions(settings);
-        config.sepaEndpointOptions = this.makeSepaEndpointOptions(settings);
-        config.leftTargetPointOptions = this.makeLeftTargetPointOptions(settings);
-        return config;
-    }
-
-    makeSettings(dotRadius: number,
-                 lineWidth: number,
-                 arrowWidth: number,
-                 arrowLength: number,
-                 arrowLineWidth: number,
-                 curviness: number) {
-        const settings = {} as JsplumbSettings;
-        settings.dotRadius = dotRadius;
-        settings.lineWidth = lineWidth;
-        settings.arrowWidth = arrowWidth;
-        settings.arrowLength = arrowLength;
-        settings.arrowLineWidth = arrowLineWidth;
-        settings.curviness = curviness;
-        return settings;
-    }
-
-    makeStreamEndpointOptions(settings: JsplumbSettings) {
-        return {
-            endpoint: {type: 'Dot', options: {radius: settings.dotRadius}},
-            connectorStyle: {stroke: '#BDBDBD', outlineStroke: '#BDBDBD', strokeWidth: settings.lineWidth},
-            connector: {type: BezierConnector.type, options: {curviness: settings.curviness}},
-            source: true,
-            type: 'token',
-            maxConnections: -1,
-            anchor: 'Right',
-            connectorOverlays: this.defaultConnectorOverlay(settings)
-        };
-    }
-
-    makeSepaEndpointOptions(settings) {
-        return {
-            endpoint: {type: 'Dot', options: {radius: settings.dotRadius}},
-            connectorStyle: {
-                stroke: '#BDBDBD', outlineStroke: '#9E9E9E', strokeWidth: settings.lineWidth
-            },
-            connector: {type: BezierConnector.type, options: {curviness: settings.curviness}},
-            source: true,
-            maxConnections: -1,
-            anchor: 'Right',
-            type: 'empty',
-            connectorOverlays: this.defaultConnectorOverlay(settings),
-            parameters: {
-                endpointType: 'output'
-            }
-        };
-    }
-
-    makeLeftTargetPointOptions(settings) {
-        return {
-            endpoint: {type: 'Dot', options: {radius: settings.dotRadius}},
-            type: 'empty',
-            anchor: 'Left',
-            target: true
-        };
-    }
-
-    defaultConnectorOverlay(settings) {
-        return [{
-            type: 'Arrow', options: {
-                width: settings.arrowWidth, length: settings.arrowLength, location: 0.5, id: 'arrow', paintStyle: {
-                    fillStyle: '#BDBDBD',
-                    stroke: '#9E9E9E',
-                    strokeWidth: settings.arrowLineWidth
-                }
-            }
-        }];
-    }
+  constructor() {
+  }
+
+  getEditorConfig() {
+    return this.makeConfig(this.makeSettings(12, 5, 30, 30, 2, 80));
+  }
+
+  getPreviewConfig() {
+    return this.makeConfig(this.makeSettings(6, 2, 15, 15, 1, 40));
+  }
+
+  getEndpointTypeConfig(): Record<string, EndpointTypeDescriptor> {
+    return {
+      'empty': {
+        paintStyle: {
+          fill: 'white',
+          stroke: '#9E9E9E',
+          strokeWidth: 1,
+        }
+      },
+      'token': {
+        paintStyle: {
+          fill: '#BDBDBD',
+          stroke: '#9E9E9E',
+          strokeWidth: 1
+        },
+        hoverPaintStyle: {
+          fill: '#9E9E9E',
+          stroke: '#9E9E9E',
+          strokeWidth: 2,
+        }
+      },
+      'highlight': {
+        paintStyle: {
+          fill: 'white',
+          stroke: '#4CAF50',
+          strokeWidth: 2
+        }
+      }
+    };
+  }
+
+  getConnectorStyleSuccess() {
+    return {
+      stroke: '#6ab26c',
+      outlineStroke: '#6ab26c',
+      strokeWidth: 5
+    };
+  }
+
+  getConnectorStyleError() {
+    return {
+      stroke: '#b74e4e',
+      outlineStroke: '#b74e4e',
+      strokeWidth: 5
+    };
+  }
+
+  getConnectorStyleWarning() {
+    return {
+      outlineStroke: '#d3c545',
+      stroke: '#d3c545',
+      strokeWidth: 5
+    };
+  }
+
+  getDefaultConnectorStyle(settings) {
+    return {stroke: '#BDBDBD', outlineStroke: '#BDBDBD', strokeWidth: settings.lineWidth};
+  }
+
+  getDefaultConnector(settings) {
+    return {type: BezierConnector.type, options: {curviness: settings.curviness}};
+  }
+
+  getDefaultEndpoint(settings) {
+    return {type: 'Dot', options: {radius: settings.dotRadius}};
+  }
+
+  makeConfig(settings: JsplumbSettings) {
+    const config = {} as any;
+    config.streamEndpointOptions = this.makeStreamEndpointOptions(settings);
+    config.sepaEndpointOptions = this.makeSepaEndpointOptions(settings);
+    config.leftTargetPointOptions = this.makeLeftTargetPointOptions(settings);
+    return config;
+  }
+
+  makeSettings(dotRadius: number,
+               lineWidth: number,
+               arrowWidth: number,
+               arrowLength: number,
+               arrowLineWidth: number,
+               curviness: number) {
+    const settings = {} as JsplumbSettings;
+    settings.dotRadius = dotRadius;
+    settings.lineWidth = lineWidth;
+    settings.arrowWidth = arrowWidth;
+    settings.arrowLength = arrowLength;
+    settings.arrowLineWidth = arrowLineWidth;
+    settings.curviness = curviness;
+    return settings;
+  }
+
+  makeStreamEndpointOptions(settings: JsplumbSettings) {
+    return {
+      ...this.makeDefaultOutputPortOptions(settings),
+      type: 'token',
+    };
+  }
+
+  makeSepaEndpointOptions(settings) {
+    return {
+      ...this.makeDefaultOutputPortOptions(settings),
+      type: 'token',
+      parameters: {
+        endpointType: 'output'
+      }
+    };
+  }
+
+  makeDefaultOutputPortOptions(settings) {
+    return {
+      endpoint: this.getDefaultEndpoint(settings),
+      connectorStyle: this.getDefaultConnectorStyle(settings),
+      connector: this.getDefaultConnector(settings),
+      source: true,
+      maxConnections: -1,
+      anchor: 'Right',
+      connectorOverlays: [this.defaultConnectorOverlay(settings)],
+    };
+  }
+
+  makeLeftTargetPointOptions(settings) {
+    return {
+      endpoint: {type: 'Dot', options: {radius: settings.dotRadius}},
+      type: 'empty',
+      anchor: 'Left',
+      target: true
+    };
+  }
+
+  defaultConnectorOverlay(settings): ArrowOverlayOptions {
+    return {
+      type: 'Arrow', options: {
+        width: settings.arrowWidth,
+        length: settings.arrowLength,
+        location: 0.5,
+        id: 'arrow'
+      }
+    };
+  }
 
 }
diff --git a/ui/src/app/editor/services/pipeline-style.service.ts b/ui/src/app/editor/services/pipeline-style.service.ts
new file mode 100644
index 0000000..a949f54
--- /dev/null
+++ b/ui/src/app/editor/services/pipeline-style.service.ts
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { Injectable } from '@angular/core';
+import { JsplumbConfigService } from './jsplumb-config.service';
+import { EdgeValidationStatus, PipelineEdgeValidation } from '../../core-model/gen/streampipes-model';
+import { ArrowOverlay, Endpoint } from '@jsplumb/core';
+import { JsplumbFactoryService } from './jsplumb-factory.service';
+
+@Injectable()
+export class PipelineStyleService {
+
+  constructor(private jsPlumbConfigService: JsplumbConfigService,
+              private jsplumbFactoryService: JsplumbFactoryService) {
+
+  }
+
+  updateAllConnectorStyles(edgeValidations: PipelineEdgeValidation[]) {
+    edgeValidations.forEach(edgeValidation => this.updateConnectorStyle(edgeValidation));
+  }
+
+  updateAllEndpointStyles(edgeValidations: PipelineEdgeValidation[]) {
+    const jsplumbBridge = this.jsplumbFactoryService.getJsplumbBridge(false);
+    edgeValidations.forEach(value => {
+      const endpoints = jsplumbBridge.getTargetEndpoint(value.targetId);
+      endpoints.each(endpoint => {
+        if (endpoint.connections.length > 0) {
+          endpoint.setType('token');
+        }
+      });
+    });
+  }
+
+  updateConnectorStyle(validation: PipelineEdgeValidation) {
+    const jsplumbBridge = this.jsplumbFactoryService.getJsplumbBridge(false);
+    const connections = jsplumbBridge.getConnections({
+      source: this.byId(validation.sourceId),
+      target: this.byId(validation.targetId)
+    });
+    const connectorStyle = this.getConnectorStyleConfig(validation.status);
+
+    if (Array.isArray(connections)) {
+      connections.forEach(connection => {
+        connection.setPaintStyle(connectorStyle);
+      });
+    }
+  }
+
+  getConnectorStyleConfig(status: EdgeValidationStatus) {
+    if (status.validationStatusType === 'COMPLETE') {
+      return this.jsPlumbConfigService.getConnectorStyleSuccess();
+    } else if (status.validationStatusType === 'INCOMPLETE') {
+      return this.jsPlumbConfigService.getConnectorStyleWarning();
+    } else {
+      return this.jsPlumbConfigService.getConnectorStyleError();
+    }
+  }
+
+  updateEndpointStyle(endpoint: Endpoint,
+                      endpointType: string) {
+    endpoint.setType(endpointType);
+  }
+
+  byId(id: string) {
+    return document.getElementById(id);
+  }
+}