You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/17 07:07:32 UTC

[incubator-streampipes] branch STREAMPIPES-494 updated (0e45db0 -> 98fe985)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch STREAMPIPES-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 0e45db0  [STREAMPIPES-494] Improve auto-update of pipeline element configurations
     new c1e6d61  [STREAMPIPES-494] Refactor element recommendation
     new 98fe985  [STREAMPIPES-494] Properly place recommended elements

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../model/pipeline/PipelineModification.java       |   9 +
 .../execution/http/PipelineStorageService.java     |   3 +
 .../manager/matching/ConnectionStorageHandler.java |  49 +-
 .../matching/PipelineModificationGenerator.java    |   3 +-
 .../matching/PipelineVerificationHandler.java      |   5 -
 .../matching/PipelineVerificationHandlerV2.java    |   2 +-
 .../streampipes/manager/operations/Operations.java |   5 +-
 .../manager/recommender/AllElementsProvider.java   |  28 +-
 .../manager/recommender/ElementRecommender.java    | 103 ++--
 .../recommender/PartialPipelineGenerator.java      |  69 +++
 .../streampipes/rest/impl/PipelineResource.java    |   7 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  14 +-
 .../pipeline-element-options.component.ts          |   2 +-
 .../pipeline-element-recommendation.component.ts   |  84 ++--
 .../components/pipeline/pipeline.component.ts      |   9 +-
 ui/src/app/editor/services/editor.service.ts       |   5 +-
 .../app/editor/services/jsplumb-bridge.service.ts  |  28 +-
 ui/src/app/editor/services/jsplumb.service.ts      | 522 +++++++++++----------
 18 files changed, 513 insertions(+), 434 deletions(-)
 copy streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Filter.java => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java (55%)
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/PartialPipelineGenerator.java

[incubator-streampipes] 01/02: [STREAMPIPES-494] Refactor element recommendation

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c1e6d61535b4296d8c5f261c9dde7b2f8227790a
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Jan 16 23:57:15 2022 +0100

    [STREAMPIPES-494] Refactor element recommendation
---
 .../model/pipeline/PipelineModification.java       |   9 ++
 .../execution/http/PipelineStorageService.java     |   3 +
 .../manager/matching/ConnectionStorageHandler.java |  49 +++++++---
 .../matching/PipelineModificationGenerator.java    |   3 +-
 .../matching/PipelineVerificationHandler.java      |   5 -
 .../matching/PipelineVerificationHandlerV2.java    |   2 +-
 .../streampipes/manager/operations/Operations.java |   5 +-
 .../AllElementsProvider.java}                      |  36 ++++---
 .../manager/recommender/ElementRecommender.java    | 103 +++++++--------------
 .../recommender/PartialPipelineGenerator.java      |  69 ++++++++++++++
 .../streampipes/rest/impl/PipelineResource.java    |   7 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  14 +--
 .../pipeline-element-options.component.ts          |   2 +-
 .../components/pipeline/pipeline.component.ts      |   9 +-
 ui/src/app/editor/services/editor.service.ts       |   5 +-
 ui/src/app/editor/services/jsplumb.service.ts      |  31 ++++---
 16 files changed, 220 insertions(+), 132 deletions(-)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
index 1546e81..fc2ce9c 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
@@ -35,6 +35,7 @@ public class PipelineModification {
   private List<StaticProperty> staticProperties;
   private List<OutputStrategy> outputStrategies;
   private List<SpDataStream> inputStreams;
+  private SpDataStream outputStream;
 
   public PipelineModification(String domId, String elementId,
                               List<StaticProperty> staticProperties) {
@@ -110,4 +111,12 @@ public class PipelineModification {
   public void setValidationInfos(List<PipelineElementValidationInfo> validationInfos) {
     this.validationInfos = validationInfos;
   }
+
+  public SpDataStream getOutputStream() {
+    return outputStream;
+  }
+
+  public void setOutputStream(SpDataStream outputStream) {
+    this.outputStream = outputStream;
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
index 7a0d764..7603776 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.manager.execution.http;
 
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import org.apache.streampipes.manager.matching.ConnectionStorageHandler;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -62,6 +63,8 @@ public class PipelineStorageService {
 
     pipeline.setSepas(sepas);
     pipeline.setActions(secs);
+
+    new ConnectionStorageHandler(pipeline).storeConnections();
   }
 
   private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionStorageHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionStorageHandler.java
index 4b5c1e7..afa21cb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionStorageHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionStorageHandler.java
@@ -17,35 +17,54 @@
  */
 package org.apache.streampipes.manager.matching;
 
-import org.apache.streampipes.manager.util.TreeUtils;
+import org.apache.streampipes.manager.recommender.AllElementsProvider;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.client.connection.Connection;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class ConnectionStorageHandler {
 
+  private AllElementsProvider elementsProvider;
   private Pipeline pipeline;
-  private InvocableStreamPipesEntity rootPipelineElement;
 
-  public ConnectionStorageHandler(Pipeline pipeline,
-                                  InvocableStreamPipesEntity rootPipelineElement) {
+  public ConnectionStorageHandler(Pipeline pipeline) {
     this.pipeline = pipeline;
-    this.rootPipelineElement = rootPipelineElement;
+    this.elementsProvider = new AllElementsProvider(pipeline);
+  }
+
+  public void storeConnections() {
+    List<Connection> connections = new ArrayList<>();
+    pipeline.getActions().forEach(sink -> findConnections(sink, connections));
+
+    connections.forEach(connection -> StorageDispatcher.INSTANCE
+            .getNoSqlStore()
+            .getConnectionStorageApi()
+            .addConnection(connection));
   }
 
-  public void storeConnection() {
-    String fromId = rootPipelineElement.getConnectedTo().get(rootPipelineElement.getConnectedTo().size() - 1);
-    NamedStreamPipesEntity sepaElement = TreeUtils.findSEPAElement(fromId, pipeline.getSepas(), pipeline.getStreams());
-    String sourceId;
-    if (sepaElement instanceof SpDataStream) {
-      sourceId = sepaElement.getElementId();
-    } else {
-      sourceId = ((InvocableStreamPipesEntity) sepaElement).getBelongsTo();
+  private void findConnections(NamedStreamPipesEntity target,
+                               List<Connection> connections) {
+    if (target instanceof DataSinkInvocation || target instanceof DataProcessorInvocation) {
+      InvocableStreamPipesEntity pipelineElement = (InvocableStreamPipesEntity) target;
+      pipelineElement.getConnectedTo().forEach(conn -> {
+        NamedStreamPipesEntity source = this.elementsProvider.findElement(conn);
+        String sourceId;
+        if (source instanceof SpDataStream) {
+          sourceId = source.getElementId();
+        } else {
+          sourceId = ((InvocableStreamPipesEntity) source).getBelongsTo();
+        }
+        connections.add(new Connection(sourceId, ((InvocableStreamPipesEntity) target).getBelongsTo()));
+        findConnections(source, connections);
+      });
     }
-    Connection connection = new Connection(sourceId, rootPipelineElement.getBelongsTo());
-    StorageDispatcher.INSTANCE.getNoSqlStore().getConnectionStorageApi().addConnection(connection);
   }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
index 78ec367..cf8fd9d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
@@ -22,7 +22,7 @@ public class PipelineModificationGenerator {
 
   private final PipelineGraph pipelineGraph;
   private final Map<String, PipelineModification> pipelineModifications;
-  private Map<String, PipelineEdgeValidation> edgeValidations;
+  private final Map<String, PipelineEdgeValidation> edgeValidations;
   private final PipelineValidator pipelineValidator;
 
   public PipelineModificationGenerator(PipelineGraph pipelineGraph) {
@@ -95,6 +95,7 @@ public class PipelineModificationGenerator {
                                  InvocableStreamPipesEntity t) {
     if (t instanceof DataProcessorInvocation) {
       modification.setOutputStrategies(((DataProcessorInvocation) t).getOutputStrategies());
+      modification.setOutputStream(((DataProcessorInvocation) t).getOutputStream());
     }
     modification.setInputStreams(t.getInputStreams());
     modification.setStaticProperties(t.getStaticProperties());
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
index 7e63678..6b63cf6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
@@ -182,11 +182,6 @@ public class PipelineVerificationHandler {
     }
   }
 
-  public PipelineVerificationHandler storeConnection() {
-    new ConnectionStorageHandler(pipeline, rootPipelineElement).storeConnection();
-    return this;
-  }
-
   public PipelineModificationMessage getPipelineModificationMessage() {
     return pipelineModificationMessage;
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
index 0152dca..e7d5025 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.model.pipeline.Pipeline;
 
 public class PipelineVerificationHandlerV2 {
 
-  private Pipeline pipeline;
+  private final Pipeline pipeline;
 
   public PipelineVerificationHandlerV2(Pipeline pipeline) {
     this.pipeline = pipeline;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 1afd5cf..3b1e5fa 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -102,8 +102,9 @@ public class Operations {
     return new TypeExtractor(graphData).getTypeVerifier().verifyAndUpdate();
   }
 
-  public static PipelineElementRecommendationMessage findRecommendedElements(Pipeline partialPipeline) throws NoSuitableSepasAvailableException {
-    return new ElementRecommender(partialPipeline).findRecommendedElements();
+  public static PipelineElementRecommendationMessage findRecommendedElements(Pipeline partialPipeline,
+                                                                             String baseRecId) throws NoSuitableSepasAvailableException {
+    return new ElementRecommender(partialPipeline, baseRecId).findRecommendedElements();
   }
 
   public static void storePipeline(Pipeline pipeline) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
similarity index 50%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
index 0152dca..f8a04bd 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
@@ -15,23 +15,37 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.matching;
 
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
+
+package org.apache.streampipes.manager.recommender;
+
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.Pipeline;
 
-public class PipelineVerificationHandlerV2 {
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-  private Pipeline pipeline;
+public class AllElementsProvider {
 
-  public PipelineVerificationHandlerV2(Pipeline pipeline) {
-    this.pipeline = pipeline;
+  private final List<NamedStreamPipesEntity> allElements;
+
+  public AllElementsProvider(Pipeline pipeline) {
+    this.allElements = Stream.of(
+                    pipeline.getStreams(),
+                    pipeline.getSepas(),
+                    pipeline.getActions())
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
   }
 
-  public PipelineModificationMessage verifyPipeline() {
-    PipelineGraph graph = new PipelineGraphBuilder(pipeline).buildGraph();
-    return new PipelineModificationGenerator(graph).buildPipelineModificationMessage();
+  public NamedStreamPipesEntity findElement(String domId) {
+    return this.allElements
+            .stream()
+            .filter(p -> p.getDOM().equals(domId))
+            .findFirst()
+            .orElseThrow(IllegalArgumentException::new);
   }
+
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java
index 8ce5afc..9b8dfab 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java
@@ -18,34 +18,25 @@
 
 package org.apache.streampipes.manager.recommender;
 
-import org.apache.streampipes.commons.exceptions.NoSepaInPipelineException;
 import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
+import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
 import org.apache.streampipes.manager.matching.v2.StreamMatch;
-import org.apache.streampipes.manager.storage.UserService;
-import org.apache.streampipes.manager.util.PipelineVerificationUtils;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.client.exception.InvalidConnectionException;
-import org.apache.streampipes.model.client.matching.MatchingResultMessage;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.message.PipelineModificationMessage;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementRecommendation;
 import org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
+import org.apache.streampipes.model.pipeline.PipelineModification;
 import org.apache.streampipes.resource.management.SpResourceManager;
 import org.apache.streampipes.storage.api.INoSqlStorage;
 import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.storage.management.StorageManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -54,22 +45,24 @@ import java.util.stream.Collectors;
 
 public class ElementRecommender {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ElementRecommender.class);
+  private final Pipeline pipeline;
+  private final String baseRecDomId;
+  private final PipelineElementRecommendationMessage recommendationMessage;
 
-  private Pipeline pipeline;
-  private PipelineElementRecommendationMessage recommendationMessage;
-
-  public ElementRecommender(Pipeline partialPipeline) {
+  public ElementRecommender(Pipeline partialPipeline,
+                            String baseRecDomId) {
     this.pipeline = partialPipeline;
+    this.baseRecDomId = baseRecDomId;
     this.recommendationMessage = new PipelineElementRecommendationMessage();
   }
 
   public PipelineElementRecommendationMessage findRecommendedElements() throws NoSuitableSepasAvailableException {
+    AllElementsProvider elementsProvider = new AllElementsProvider(this.pipeline);
 
-    String rootNodeElementId;
+    String rootNodeId;
     try {
-      rootNodeElementId = getRootNodeElementId(getRootNode());
-      Optional<SpDataStream> outputStream = getOutputStream();
+      rootNodeId = getRootNodeId(elementsProvider);
+      Optional<SpDataStream> outputStream = getOutputStream(elementsProvider);
       outputStream.ifPresent(spDataStream -> validate(spDataStream, getAll()));
     } catch (Exception e) {
       e.printStackTrace();
@@ -83,11 +76,16 @@ public class ElementRecommender {
               .setRecommendedElements(calculateWeights(
                       filterOldElements(getNoSqlStorage()
                               .getConnectionStorageApi()
-                              .getRecommendedElements(rootNodeElementId))));
+                              .getRecommendedElements(rootNodeId))));
       return recommendationMessage;
     }
   }
 
+  private String getRootNodeId(AllElementsProvider elementsProvider) {
+    NamedStreamPipesEntity pe = elementsProvider.findElement(this.baseRecDomId);
+    return pe instanceof InvocableStreamPipesEntity ? ((InvocableStreamPipesEntity) pe).getBelongsTo() : pe.getElementId();
+  }
+
   private List<PipelineElementRecommendation> filterOldElements(List<PipelineElementRecommendation> recommendedElements) {
     return recommendedElements
             .stream()
@@ -111,7 +109,6 @@ public class ElementRecommender {
             });
 
     return recommendedElements;
-
   }
 
   private String getName(String elementId) {
@@ -139,8 +136,7 @@ public class ElementRecommender {
     for (ConsumableStreamPipesEntity sepa : entities) {
       SpDataStream requirement = sepa.getSpDataStreams().get(0);
       requirement.setEventGrounding(sepa.getSupportedGrounding());
-      List<MatchingResultMessage> messages = new ArrayList<>();
-      Boolean matches = new StreamMatch().match(offer, requirement, messages);
+      boolean matches = new StreamMatch().match(offer, requirement, new ArrayList<>());
       if (matches) {
         addPossibleElements(sepa);
       }
@@ -151,7 +147,7 @@ public class ElementRecommender {
     recommendationMessage.addPossibleElement(new PipelineElementRecommendation(sepa.getElementId(), sepa.getName(), sepa.getDescription()));
   }
 
-  private List<ConsumableStreamPipesEntity> getAllSepas() {
+  private List<ConsumableStreamPipesEntity> getAllDataProcessors() {
     List<String> userObjects = new SpResourceManager().manageDataProcessors().findAllIdsOnly();
     return getTripleStore()
             .getAllDataProcessors()
@@ -162,7 +158,7 @@ public class ElementRecommender {
   }
 
 
-  private List<ConsumableStreamPipesEntity> getAllSecs() {
+  private List<ConsumableStreamPipesEntity> getAllDataSinks() {
     List<String> userObjects = new SpResourceManager().manageDataSinks().findAllIdsOnly();
     return getTripleStore()
             .getAllDataSinks()
@@ -174,8 +170,8 @@ public class ElementRecommender {
 
   private List<ConsumableStreamPipesEntity> getAll() {
     List<ConsumableStreamPipesEntity> allElements = new ArrayList<>();
-    allElements.addAll(getAllSepas());
-    allElements.addAll(getAllSecs());
+    allElements.addAll(getAllDataProcessors());
+    allElements.addAll(getAllDataSinks());
     return allElements;
   }
 
@@ -183,53 +179,24 @@ public class ElementRecommender {
     return StorageManager.INSTANCE.getPipelineElementStorage();
   }
 
-  private NamedStreamPipesEntity getRootNode() throws NoSepaInPipelineException {
-    if (pipeline.getSepas().size() == 0 && pipeline.getActions().size() == 0) {
-      return pipeline.getStreams().get(pipeline.getStreams().size() - 1);
-    } else {
-      return PipelineVerificationUtils.getRootNode(pipeline);
-    }
-  }
-
-  private String getRootNodeElementId(NamedStreamPipesEntity entity) {
-    if (entity instanceof InvocableStreamPipesEntity) {
-      return ((InvocableStreamPipesEntity) entity).getBelongsTo();
-    } else {
-      return entity.getElementId();
-    }
-  }
-
   private INoSqlStorage getNoSqlStorage() {
     return StorageDispatcher.INSTANCE.getNoSqlStore();
   }
 
-  private Optional<SpDataStream> getOutputStream() throws NoSepaInPipelineException, InvalidConnectionException {
-    NamedStreamPipesEntity rootNode = getRootNode();
-    if (rootNode instanceof SpDataStream) {
-      return Optional.of((SpDataStream) rootNode);
-    } else if (rootNode instanceof DataSinkInvocation) {
-      return Optional.empty();
-    } else {
-      ((DataProcessorInvocation) rootNode).setConfigured(true);
-      PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
-      List<InvocableStreamPipesEntity> graphs = new InvocationGraphBuilder(pipelineGraph, null).buildGraphs();
+  private Optional<SpDataStream> getOutputStream(AllElementsProvider elementsProvider) {
 
-      Optional<InvocableStreamPipesEntity> rootElementWithOutputStream = graphs
+    NamedStreamPipesEntity entity = elementsProvider.findElement(this.baseRecDomId);
+
+    if (entity instanceof SpDataStream) {
+      return Optional.of((SpDataStream) entity);
+    } else {
+      Pipeline partialPipeline = new PartialPipelineGenerator(this.baseRecDomId, elementsProvider).makePartialPipeline();
+      PipelineModificationMessage modifications = new PipelineVerificationHandlerV2(partialPipeline).verifyPipeline();
+      return modifications.getPipelineModifications()
               .stream()
-              .filter(g -> g.getElementId().equals(rootNode.getElementId()))
+              .filter(m -> m.getDomId().equals(this.baseRecDomId))
+              .map(PipelineModification::getOutputStream)
               .findFirst();
-
-      if (rootElementWithOutputStream.isPresent() && (rootElementWithOutputStream.get()
-              instanceof DataProcessorInvocation)) {
-        return Optional.of(((DataProcessorInvocation) rootElementWithOutputStream.get())
-                .getOutputStream());
-      } else {
-        return Optional.empty();
-      }
     }
   }
-
-  private UserService getUserService() {
-    return new UserService(StorageDispatcher.INSTANCE.getNoSqlStore().getUserStorageAPI());
-  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/PartialPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/PartialPipelineGenerator.java
new file mode 100644
index 0000000..b0f4698
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/PartialPipelineGenerator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.manager.recommender;
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PartialPipelineGenerator {
+
+  private final String baseRecDomId;
+  private final AllElementsProvider elementsProvider;
+
+  public PartialPipelineGenerator(String baseRecDomId,
+                                  AllElementsProvider elementsProvider) {
+    this.baseRecDomId = baseRecDomId;
+    this.elementsProvider = elementsProvider;
+  }
+
+  public Pipeline makePartialPipeline() {
+    Pipeline pipeline = new Pipeline();
+    List<SpDataStream> streams = new ArrayList<>();
+    List<DataProcessorInvocation> processors = new ArrayList<>();
+
+    findConnectedElements(elementsProvider.findElement(this.baseRecDomId), streams, processors);
+
+    pipeline.setStreams(streams);
+    pipeline.setSepas(processors);
+    return pipeline;
+  }
+
+  private void findConnectedElements(NamedStreamPipesEntity pipelineElement,
+                                     List<SpDataStream> streams,
+                                     List<DataProcessorInvocation> processors) {
+    if (pipelineElement instanceof SpDataStream) {
+      streams.add((SpDataStream) pipelineElement);
+    } else if (pipelineElement instanceof DataProcessorInvocation) {
+      DataProcessorInvocation processor = (DataProcessorInvocation) pipelineElement;
+      processors.add(processor);
+      processor.getConnectedTo().forEach(p -> {
+        NamedStreamPipesEntity connectedElement = elementsProvider.findElement(p);
+        findConnectedElements(connectedElement, streams, processors);
+      });
+    }
+  }
+
+
+}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index 7e4a8b7..c23fa00 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -169,16 +169,17 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
     return ok(message);
   }
 
-  @Path("/recommend")
+  @Path("/recommend/{recId}")
   @POST
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
   @Hidden
   @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
   @PostAuthorize("hasPermission(returnObject, 'READ')")
-  public PipelineElementRecommendationMessage recommend(Pipeline pipeline) {
+  public PipelineElementRecommendationMessage recommend(Pipeline pipeline,
+                                                        @PathParam("recId") String baseRecElement) {
     try {
-      return Operations.findRecommendedElements(pipeline);
+      return Operations.findRecommendedElements(pipeline, baseRecElement);
     } catch (JsonSyntaxException e) {
       throw new WebApplicationException(badRequest(new Notification(NotificationType.UNKNOWN_ERROR,
               e.getMessage())));
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index ac57f56..e797917 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -18,7 +18,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2022-01-13 21:47:46.
+// Generated using typescript-generator version 2.27.744 on 2022-01-16 20:35:08.
 
 export class AbstractStreamPipesEntity {
   "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStream [...]
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
     instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
     instance.internallyManaged = data.internallyManaged;
     instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
-    instance.uri = data.uri;
     instance.dom = data.dom;
+    instance.uri = data.uri;
     instance._rev = data._rev;
     return instance;
   }
@@ -192,9 +192,9 @@ export class AdapterDescription extends NamedStreamPipesEntity {
     instance.selectedEndpointUrl = data.selectedEndpointUrl;
     instance.correspondingServiceGroup = data.correspondingServiceGroup;
     instance.correspondingDataStreamElementId = data.correspondingDataStreamElementId;
-    instance.valueRules = __getCopyArrayFn(__identity<any>())(data.valueRules);
     instance.streamRules = __getCopyArrayFn(__identity<any>())(data.streamRules);
     instance.schemaRules = __getCopyArrayFn(__identity<any>())(data.schemaRules);
+    instance.valueRules = __getCopyArrayFn(__identity<any>())(data.valueRules);
     return instance;
   }
 
@@ -1666,9 +1666,9 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
     }
     const instance = target || new GenericAdapterSetDescription();
     super.fromData(data, instance);
-    instance.eventSchema = EventSchema.fromData(data.eventSchema);
     instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
     instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+    instance.eventSchema = EventSchema.fromData(data.eventSchema);
     return instance;
   }
 }
@@ -1685,9 +1685,9 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
     }
     const instance = target || new GenericAdapterStreamDescription();
     super.fromData(data, instance);
-    instance.eventSchema = EventSchema.fromData(data.eventSchema);
     instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
     instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+    instance.eventSchema = EventSchema.fromData(data.eventSchema);
     return instance;
   }
 }
@@ -2370,6 +2370,7 @@ export class PipelineModification {
   elementId: string;
   inputStreams: SpDataStreamUnion[];
   outputStrategies: OutputStrategyUnion[];
+  outputStream: SpDataStreamUnion;
   pipelineElementValid: boolean;
   staticProperties: StaticPropertyUnion[];
   validationInfos: PipelineElementValidationInfo[];
@@ -2386,6 +2387,7 @@ export class PipelineModification {
     instance.staticProperties = __getCopyArrayFn(StaticProperty.fromDataUnion)(data.staticProperties);
     instance.outputStrategies = __getCopyArrayFn(OutputStrategy.fromDataUnion)(data.outputStrategies);
     instance.inputStreams = __getCopyArrayFn(SpDataStream.fromDataUnion)(data.inputStreams);
+    instance.outputStream = SpDataStream.fromDataUnion(data.outputStream);
     return instance;
   }
 }
@@ -2497,8 +2499,8 @@ export class PipelineTemplateDescription extends NamedStreamPipesEntity {
     super.fromData(data, instance);
     instance.boundTo = __getCopyArrayFn(BoundPipelineElement.fromData)(data.boundTo);
     instance.pipelineTemplateId = data.pipelineTemplateId;
-    instance.pipelineTemplateDescription = data.pipelineTemplateDescription;
     instance.pipelineTemplateName = data.pipelineTemplateName;
+    instance.pipelineTemplateDescription = data.pipelineTemplateDescription;
     return instance;
   }
 }
diff --git a/ui/src/app/editor/components/pipeline-element-options/pipeline-element-options.component.ts b/ui/src/app/editor/components/pipeline-element-options/pipeline-element-options.component.ts
index b26b2b5..8fea480 100644
--- a/ui/src/app/editor/components/pipeline-element-options/pipeline-element-options.component.ts
+++ b/ui/src/app/editor/components/pipeline-element-options/pipeline-element-options.component.ts
@@ -142,7 +142,7 @@ export class PipelineElementOptionsComponent implements OnInit, OnDestroy {
       }
     });
     const currentPipeline = this.objectProvider.makePipeline(clonedModel);
-    this.editorService.recommendPipelineElement(currentPipeline).subscribe((result) => {
+    this.editorService.recommendPipelineElement(currentPipeline, pipelineElementDomId).subscribe((result) => {
       if (result.success) {
         this.possibleElements = cloneDeep(this.pipelineElementRecommendationService.collectPossibleElements(this.allElements, result.possibleElements));
         this.recommendedElements = cloneDeep(this.pipelineElementRecommendationService.populateRecommendedList(this.allElements, result.recommendedElements));
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index 534fe8a..1d39e6a 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -385,8 +385,13 @@ export class PipelineComponent implements OnInit, OnDestroy {
           if (modification.staticProperties) {
             (pe.payload as InvocablePipelineElementUnion).staticProperties = modification.staticProperties;
           }
-          if (pe.payload instanceof DataProcessorInvocation && modification.outputStrategies) {
-            (pe.payload as DataProcessorInvocation).outputStrategies = modification.outputStrategies;
+          if (pe.payload instanceof DataProcessorInvocation) {
+            if (modification.outputStrategies) {
+              (pe.payload as DataProcessorInvocation).outputStrategies = modification.outputStrategies;
+            }
+            if (modification.outputStream) {
+              (pe.payload as DataProcessorInvocation).outputStream = modification.outputStream;
+            }
           }
           if (modification.inputStreams) {
             (pe.payload as InvocablePipelineElementUnion).inputStreams = modification.inputStreams;
diff --git a/ui/src/app/editor/services/editor.service.ts b/ui/src/app/editor/services/editor.service.ts
index c990a46..560a350 100644
--- a/ui/src/app/editor/services/editor.service.ts
+++ b/ui/src/app/editor/services/editor.service.ts
@@ -56,8 +56,9 @@ export class EditorService {
       return this.platformServicesCommons.apiBasePath;
     }
 
-    recommendPipelineElement(pipeline): Observable<PipelineElementRecommendationMessage> {
-        return this.http.post(this.pipelinesResourceUrl + '/recommend', pipeline)
+    recommendPipelineElement(pipeline: Pipeline,
+                             currentDomId: string): Observable<PipelineElementRecommendationMessage> {
+        return this.http.post(this.pipelinesResourceUrl + '/recommend/' + currentDomId, pipeline)
             .pipe(map(data => PipelineElementRecommendationMessage.fromData(data as any)));
     }
 
diff --git a/ui/src/app/editor/services/jsplumb.service.ts b/ui/src/app/editor/services/jsplumb.service.ts
index 47d180f..b7f75be 100644
--- a/ui/src/app/editor/services/jsplumb.service.ts
+++ b/ui/src/app/editor/services/jsplumb.service.ts
@@ -120,39 +120,40 @@ export class JsplumbService {
 
     createAssemblyElement(pipelineElementDomId: string,
                           pipelineElement: InvocablePipelineElementUnion,
-                          $parentElement,
+                          sourceElement,
                           previewConfig: boolean) {
-        let $target;
+        let targetElementId;
         if (pipelineElement instanceof DataProcessorInvocation) {
-            $target = this.dataProcessorDropped(pipelineElementDomId, pipelineElement as DataProcessorInvocation, true, false);
-            this.connectNodes($parentElement, $target, previewConfig);
+            targetElementId = this.dataProcessorDropped(pipelineElementDomId, pipelineElement as DataProcessorInvocation, true, false);
+            this.connectNodes(sourceElement, targetElementId, previewConfig);
         } else {
-            $target = this.dataSinkDropped(pipelineElementDomId, pipelineElement, true, false);
-            this.connectNodes($parentElement, $target, previewConfig);
+            targetElementId = this.dataSinkDropped(pipelineElementDomId, pipelineElement, true, false);
+            this.connectNodes(sourceElement, targetElementId, previewConfig);
         }
     }
 
-    connectNodes($parentElement,
-                 $target,
+    connectNodes(sourceElement,
+                 targetElementId,
                  previewConfig: boolean) {
         let options;
         const jsplumbBridge = this.getBridge(previewConfig);
         // TODO: getJsplumbConfig depends on isPreview. Not implemented yet
         const jsplumbConfig = this.jsplumbEndpointService.getJsplumbConfig(true);
-        options = $parentElement.hasClass('stream') ?
+        options = sourceElement.hasClass('stream') ?
           jsplumbConfig.streamEndpointOptions : jsplumbConfig.sepaEndpointOptions;
         let sourceEndPoint;
-        if (jsplumbBridge.selectEndpoints({source: $parentElement}).length > 0) {
-            if (!(jsplumbBridge.selectEndpoints({source: $parentElement}).get(0).isFull())) {
-                sourceEndPoint = jsplumbBridge.selectEndpoints({source: $parentElement}).get(0);
+        if (jsplumbBridge.selectEndpoints({source: sourceElement}).length > 0) {
+            if (!(jsplumbBridge.selectEndpoints({source: sourceElement}).get(0).isFull())) {
+                sourceEndPoint = jsplumbBridge.selectEndpoints({source: sourceElement}).get(0);
             } else {
-                sourceEndPoint = jsplumbBridge.addEndpoint($parentElement, options);
+                sourceEndPoint = jsplumbBridge.addEndpoint(sourceElement, options);
             }
         } else {
-            sourceEndPoint = jsplumbBridge.addEndpoint($parentElement, options);
+            sourceEndPoint = jsplumbBridge.addEndpoint(sourceElement, options);
         }
 
-        const targetEndPoint = jsplumbBridge.selectEndpoints({target: $target}).get(0);
+        const targetElement = document.getElementById(targetElementId);
+        const targetEndPoint = jsplumbBridge.selectEndpoints({target: targetElement}).get(0);
 
         jsplumbBridge.connect({source: sourceEndPoint, target: targetEndPoint, detachable: true});
         jsplumbBridge.repaintEverything();

[incubator-streampipes] 02/02: [STREAMPIPES-494] Properly place recommended elements

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 98fe985160ec160112593edd41b070b3f1f8d641
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Mon Jan 17 08:07:12 2022 +0100

    [STREAMPIPES-494] Properly place recommended elements
---
 .../pipeline-element-recommendation.component.ts   |  84 ++--
 .../app/editor/services/jsplumb-bridge.service.ts  |  28 +-
 ui/src/app/editor/services/jsplumb.service.ts      | 523 +++++++++++----------
 3 files changed, 312 insertions(+), 323 deletions(-)

diff --git a/ui/src/app/editor/components/pipeline-element-recommendation/pipeline-element-recommendation.component.ts b/ui/src/app/editor/components/pipeline-element-recommendation/pipeline-element-recommendation.component.ts
index 2799c44..6d4fcd5 100644
--- a/ui/src/app/editor/components/pipeline-element-recommendation/pipeline-element-recommendation.component.ts
+++ b/ui/src/app/editor/components/pipeline-element-recommendation/pipeline-element-recommendation.component.ts
@@ -16,11 +16,11 @@
  *
  */
 
-import {JsplumbService} from "../../services/jsplumb.service";
-import {AfterViewInit, ChangeDetectorRef, Component, Input, OnInit} from "@angular/core";
-import {PipelineElementConfig} from "../../model/editor.model";
-import {DataProcessorInvocation} from "../../../core-model/gen/streampipes-model";
-import {SafeCss} from "../../utils/style-sanitizer";
+import { JsplumbService } from '../../services/jsplumb.service';
+import { AfterViewInit, Component, Input, OnInit } from '@angular/core';
+import { InvocablePipelineElementUnion, PipelineElementConfig } from '../../model/editor.model';
+import { DataProcessorInvocation } from '../../../core-model/gen/streampipes-model';
+import { SafeCss } from '../../utils/style-sanitizer';
 
 @Component({
   selector: 'pipeline-element-recommendation',
@@ -40,9 +40,9 @@ export class PipelineElementRecommendationComponent implements OnInit, AfterView
 
   _recommendedElements: any;
 
-  recommendationsPrepared: boolean = false;
+  recommendationsPrepared = false;
 
-  constructor(private JsplumbService: JsplumbService,
+  constructor(private jsplumbService: JsplumbService,
               public safeCss: SafeCss) {
 
   }
@@ -63,65 +63,65 @@ export class PipelineElementRecommendationComponent implements OnInit, AfterView
   setLayoutSettings(element, index, recommendedElements) {
     element.layoutSettings = {
       skewStyle: element.name ? this.getSkewStyle(index, recommendedElements) : {'opacity': 0},
-      unskewStyle: this.getUnskewStyle(element, index,recommendedElements),
+      unskewStyle: this.getUnskewStyle(element, index, recommendedElements),
       unskewStyleLabel: this.getUnskewStyleLabel(index, recommendedElements),
-      type: element instanceof DataProcessorInvocation ? "sepa" : "action"
+      type: element instanceof DataProcessorInvocation ? 'sepa' : 'action'
     };
   }
 
-  create(recommendedElement) {
+  create(recommendedElement: InvocablePipelineElementUnion) {
     this.recommendationsShown = false;
-    this.JsplumbService.createElement(this.rawPipelineModel, recommendedElement, this.pipelineElementDomId);
+    this.jsplumbService.createElement(this.rawPipelineModel, recommendedElement, this.pipelineElementDomId);
   }
 
   getUnskewStyle(recommendedElement, index, recommendedElements) {
-    var unskew = -(this.getSkew(recommendedElements));
-    var rotate = -(90 - (this.getSkew(recommendedElements) / 2));
+    const unskew = -(this.getSkew(recommendedElements));
+    const rotate = -(90 - (this.getSkew(recommendedElements) / 2));
 
-    return "transform: skew(" + unskew + "deg)" + " rotate(" + rotate + "deg)" + " scale(1);"
-       +"background-color: " +this.getBackgroundColor(recommendedElement, index);
+    return 'transform: skew(' + unskew + 'deg)' + ' rotate(' + rotate + 'deg)' + ' scale(1);'
+       + 'background-color: ' + this.getBackgroundColor(recommendedElement, index);
   }
 
   getBackgroundColor(recommendedElement, index) {
-    var alpha = (recommendedElement.weight < 0.2 ? 0.2 : (recommendedElement.weight - 0.2));
+    let alpha = (recommendedElement.weight < 0.2 ? 0.2 : (recommendedElement.weight - 0.2));
     alpha = Math.round((alpha * 10)) / 10;
-    var rgb = recommendedElement instanceof DataProcessorInvocation ? this.getSepaColor(index) : this.getActionColor(index);
-    return "rgba(" +rgb +"," +alpha +")";
+    const rgb = recommendedElement instanceof DataProcessorInvocation ? this.getSepaColor(index) : this.getActionColor(index);
+    return 'rgba(' + rgb + ',' + alpha + ')';
   }
 
   getSepaColor(index) {
-    return (index % 2 === 0) ? "0, 150, 136" : "0, 164, 150";
+    return (index % 2 === 0) ? '0, 150, 136' : '0, 164, 150';
   }
 
   getActionColor(index) {
-    return (index % 2 === 0) ? "63, 81, 181" : "79, 101, 230";
+    return (index % 2 === 0) ? '63, 81, 181' : '79, 101, 230';
   }
 
   getSkewStyle(index, recommendedElements) {
     // transform: rotate(72deg) skew(18deg);
-    var skew = this.getSkew(recommendedElements);
-    var rotate = (index + 1) * this.getAngle(recommendedElements);
+    const skew = this.getSkew(recommendedElements);
+    const rotate = (index + 1) * this.getAngle(recommendedElements);
 
-    return "transform: rotate(" + rotate + "deg) skew(" + skew + "deg);";
+    return 'transform: rotate(' + rotate + 'deg) skew(' + skew + 'deg);';
   }
 
   getUnskewStyleLabel(index, recommendedElements) {
-    var unskew = -(this.getSkew(recommendedElements));
-    var rotate =  (index + 1) * this.getAngle(recommendedElements);
-    var unrotate = -360 + (rotate*-1);
-
-    return "transform: skew(" + unskew + "deg)" + " rotate(" + unrotate + "deg)" + " scale(1);"
-      +"z-index: -1;"
-      +"margin-left: 50%;"
-      +"margin-top: 50%;"
-      +"position: absolute;"
-      +"background: white;"
-      +"height: 50px;"
-      +"width: 50px;"
-      +"font-size: 16px;"
-      +"text-align: center;"
-      +"line-height: 50px;"
-      +"top: 0px;";
+    const unskew = -(this.getSkew(recommendedElements));
+    const rotate =  (index + 1) * this.getAngle(recommendedElements);
+    const unrotate = -360 + (rotate * -1);
+
+    return 'transform: skew(' + unskew + 'deg)' + ' rotate(' + unrotate + 'deg)' + ' scale(1);'
+      + 'z-index: -1;'
+      + 'margin-left: 50%;'
+      + 'margin-top: 50%;'
+      + 'position: absolute;'
+      + 'background: white;'
+      + 'height: 50px;'
+      + 'width: 50px;'
+      + 'font-size: 16px;'
+      + 'text-align: center;'
+      + 'line-height: 50px;'
+      + 'top: 0px;';
   }
 
   getSkew(recommendedElements) {
@@ -134,8 +134,8 @@ export class PipelineElementRecommendationComponent implements OnInit, AfterView
 
   fillRemainingItems(recommendedElements) {
     if (recommendedElements.length < 6) {
-      for (var i = recommendedElements.length; i < 6; i++) {
-        let element = {fakeElement: true, weight: 0};
+      for (let i = recommendedElements.length; i < 6; i++) {
+        const element = {fakeElement: true, weight: 0};
         recommendedElements.push(element);
       }
     }
@@ -152,4 +152,4 @@ export class PipelineElementRecommendationComponent implements OnInit, AfterView
     this._recommendedElements = recommendedElements;
     this.recommendationsPrepared = true;
   }
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/editor/services/jsplumb-bridge.service.ts b/ui/src/app/editor/services/jsplumb-bridge.service.ts
index d0a63f8..4652d85 100644
--- a/ui/src/app/editor/services/jsplumb-bridge.service.ts
+++ b/ui/src/app/editor/services/jsplumb-bridge.service.ts
@@ -17,7 +17,7 @@
  */
 
 import { BrowserJsPlumbInstance } from '@jsplumb/browser-ui';
-import { SelectOptions } from '@jsplumb/core';
+import { EndpointSelection, SelectOptions } from '@jsplumb/core';
 
 export class JsplumbBridge {
 
@@ -30,13 +30,11 @@ export class JsplumbBridge {
     }
 
     activateEndpointWithType(endpointId: string, endpointEnabled: boolean, endpointType: string) {
-        console.log("activate endpoint");
         this.activateEndpoint(endpointId, endpointEnabled);
         this.setEndpointType(endpointId, endpointType);
     }
 
     setEndpointType(endpointId: string, endpointType: string) {
-        console.log("set endpoint type");
         const endpoint = this.getEndpointById(endpointId);
         // @ts-ignore
         endpoint.setType(endpointType);
@@ -71,33 +69,19 @@ export class JsplumbBridge {
         return this.jsPlumbInstance.bind(event, fn);
     }
 
-    // TODO: Overloading Functions?
-    selectEndpoints(endpoint?) {
+    selectEndpoints(endpoint?): EndpointSelection  {
         if (endpoint === undefined) {
-            // @ts-ignore
             return this.jsPlumbInstance.selectEndpoints();
+        } else {
+            return this.jsPlumbInstance.selectEndpoints(endpoint);
         }
-        // @ts-ignore
-        return this.jsPlumbInstance.selectEndpoints(endpoint);
-    }
-
-    selectEndpointsById(id) {
-        // @ts-ignore
-        return this.jsPlumbInstance.selectEndpoints({source: id});
     }
 
-    getSourceEndpoint(id) {
-        // @ts-ignore
-        return this.jsPlumbInstance.selectEndpoints({source: id});
-    }
-
-    getTargetEndpoint(id) {
-        // @ts-ignore
+    getTargetEndpoint(id: string): EndpointSelection {
         return this.jsPlumbInstance.selectEndpoints({target: document.getElementById(id)});
     }
 
-    getEndpointCount(id) {
-        // @ts-ignore
+    getEndpointCount(id: string): number {
         return this.jsPlumbInstance.selectEndpoints({element: document.getElementById(id)}).length;
     }
 
diff --git a/ui/src/app/editor/services/jsplumb.service.ts b/ui/src/app/editor/services/jsplumb.service.ts
index b7f75be..4c43750 100644
--- a/ui/src/app/editor/services/jsplumb.service.ts
+++ b/ui/src/app/editor/services/jsplumb.service.ts
@@ -20,19 +20,19 @@ import { JsplumbConfigService } from './jsplumb-config.service';
 import { JsplumbBridge } from './jsplumb-bridge.service';
 import { Injectable } from '@angular/core';
 import {
-    InvocablePipelineElementUnion,
-    PipelineElementConfig,
-    PipelineElementConfigurationStatus,
-    PipelineElementUnion
+  InvocablePipelineElementUnion,
+  PipelineElementConfig,
+  PipelineElementConfigurationStatus,
+  PipelineElementUnion
 } from '../model/editor.model';
 import { PipelineElementTypeUtils } from '../utils/editor.utils';
 import {
-    DataProcessorInvocation,
-    DataSinkInvocation,
-    Pipeline,
-    SpDataSet,
-    SpDataStream,
-    SpDataStreamUnion
+  DataProcessorInvocation,
+  DataSinkInvocation,
+  Pipeline,
+  SpDataSet,
+  SpDataStream,
+  SpDataStreamUnion
 } from '../../core-model/gen/streampipes-model';
 import { JsplumbEndpointService } from './jsplumb-endpoint.service';
 import { JsplumbFactoryService } from './jsplumb-factory.service';
@@ -41,277 +41,282 @@ import { EditorService } from './editor.service';
 @Injectable()
 export class JsplumbService {
 
-    idCounter = 0;
+  idCounter = 0;
+
+  constructor(private jsplumbConfigService: JsplumbConfigService,
+              private jsplumbFactory: JsplumbFactoryService,
+              private jsplumbEndpointService: JsplumbEndpointService,
+              private editorService: EditorService) {
+  }
+
+  isFullyConnected(pipelineElementConfig: PipelineElementConfig,
+                   previewConfig: boolean) {
+    const jsplumbBridge = this.jsplumbFactory.getJsplumbBridge(previewConfig);
+    const payload = pipelineElementConfig.payload as InvocablePipelineElementUnion;
+    return payload.inputStreams == null ||
+      jsplumbBridge.getConnections({target: document.getElementById(payload.dom)}).length == payload.inputStreams.length;
+  }
+
+  makeRawPipeline(pipelineModel: Pipeline,
+                  isPreview: boolean) {
+    return pipelineModel
+      .streams
+      .map(s => this.toConfig(s, 'stream', isPreview))
+      .concat(pipelineModel.sepas.map(s => this.toConfig(s, 'sepa', isPreview)))
+      .concat(pipelineModel.actions.map(s => this.toConfig(s, 'action', isPreview)));
+  }
+
+  toConfig(pe: PipelineElementUnion,
+           type: string,
+           isPreview: boolean) {
+    (pe as any).type = type;
+    return this.createNewPipelineElementConfig(pe, {x: 100, y: 100}, isPreview, true);
+  }
+
+  createElementWithoutConnection(pipelineModel: PipelineElementConfig[],
+                                 pipelineElement: PipelineElementUnion,
+                                 x: number,
+                                 y: number) {
+    const pipelineElementConfig = this.createNewPipelineElementConfigAtPosition(x, y, pipelineElement, false);
+    pipelineModel.push(pipelineElementConfig);
+
+    if (pipelineElementConfig.payload instanceof SpDataSet) {
+      this.editorService.updateDataSet(pipelineElement).subscribe(data => {
+        (pipelineElementConfig.payload as SpDataSet).eventGrounding = data.eventGrounding;
+        (pipelineElementConfig.payload as SpDataSet).datasetInvocationId = data.invocationId;
 
-    constructor(private jsplumbConfigService: JsplumbConfigService,
-                private jsplumbFactory: JsplumbFactoryService,
-                private jsplumbEndpointService: JsplumbEndpointService,
-                private editorService: EditorService) {
-    }
-
-    isFullyConnected(pipelineElementConfig: PipelineElementConfig,
-                     previewConfig: boolean) {
-        const jsplumbBridge = this.jsplumbFactory.getJsplumbBridge(previewConfig);
-        const payload = pipelineElementConfig.payload as InvocablePipelineElementUnion;
-        return payload.inputStreams == null ||
-          jsplumbBridge.getConnections({target: document.getElementById(payload.dom)}).length == payload.inputStreams.length;
-    }
-
-    makeRawPipeline(pipelineModel: Pipeline,
-                    isPreview: boolean) {
-        return pipelineModel
-          .streams
-          .map(s => this.toConfig(s, 'stream', isPreview))
-          .concat(pipelineModel.sepas.map(s => this.toConfig(s, 'sepa', isPreview)))
-          .concat(pipelineModel.actions.map(s => this.toConfig(s, 'action', isPreview)));
-    }
-
-    toConfig(pe: PipelineElementUnion,
-             type: string,
-             isPreview: boolean) {
-        (pe as any).type = type;
-        return this.createNewPipelineElementConfig(pe, {x: 100, y: 100}, isPreview, true);
-    }
-
-    createElementWithoutConnection(pipelineModel: PipelineElementConfig[],
-                                   pipelineElement: PipelineElementUnion,
-                                   x: number,
-                                   y: number) {
-        const pipelineElementConfig = this.createNewPipelineElementConfigAtPosition(x, y, pipelineElement, false);
-        pipelineModel.push(pipelineElementConfig);
-
-        if (pipelineElementConfig.payload instanceof SpDataSet) {
-            this.editorService.updateDataSet(pipelineElement).subscribe(data => {
-                (pipelineElementConfig.payload as SpDataSet).eventGrounding = data.eventGrounding;
-                (pipelineElementConfig.payload as SpDataSet).datasetInvocationId = data.invocationId;
-
-                setTimeout(() => {
-                    this.elementDropped(pipelineElementConfig.payload.dom,
-                      pipelineElementConfig.payload,
-                      true,
-                      false);
-                }, 1);
-            });
-        } else {
-            setTimeout(() => {
-                this.elementDropped(pipelineElementConfig.payload.dom,
-                  pipelineElementConfig.payload,
-                  true,
-                  false);
-            }, 100);
-        }
-
-    }
-
-    createElement(pipelineModel: PipelineElementConfig[],
-                  pipelineElement: InvocablePipelineElementUnion,
-                  pipelineElementDomId: string) {
-        const pipelineElementDom = $('#' + pipelineElementDomId);
-
-        const pipelineElementConfig = this.createNewPipelineElementConfigWithFixedCoordinates(pipelineElementDom, pipelineElement, false);
-        pipelineModel.push(pipelineElementConfig);
         setTimeout(() => {
-            this.createAssemblyElement(pipelineElementConfig.payload.dom,
-              pipelineElementConfig.payload as InvocablePipelineElementUnion,
-              pipelineElementDom,
-              false);
-        });
+          this.elementDropped(pipelineElementConfig.payload.dom,
+            pipelineElementConfig.payload,
+            true,
+            false);
+        }, 1);
+      });
+    } else {
+      setTimeout(() => {
+        this.elementDropped(pipelineElementConfig.payload.dom,
+          pipelineElementConfig.payload,
+          true,
+          false);
+      }, 100);
     }
 
-    createAssemblyElement(pipelineElementDomId: string,
-                          pipelineElement: InvocablePipelineElementUnion,
-                          sourceElement,
-                          previewConfig: boolean) {
-        let targetElementId;
-        if (pipelineElement instanceof DataProcessorInvocation) {
-            targetElementId = this.dataProcessorDropped(pipelineElementDomId, pipelineElement as DataProcessorInvocation, true, false);
-            this.connectNodes(sourceElement, targetElementId, previewConfig);
-        } else {
-            targetElementId = this.dataSinkDropped(pipelineElementDomId, pipelineElement, true, false);
-            this.connectNodes(sourceElement, targetElementId, previewConfig);
-        }
+  }
+
+  createElement(pipelineModel: PipelineElementConfig[],
+                pipelineElement: InvocablePipelineElementUnion,
+                sourceElementDomId: string) {
+    const sourceElement = $('#' + sourceElementDomId);
+
+    const pipelineElementConfig = this.createNewPipelineElementConfigWithFixedCoordinates(sourceElement, pipelineElement, false);
+    pipelineModel.push(pipelineElementConfig);
+    setTimeout(() => {
+      this.createAssemblyElement(pipelineElementConfig.payload.dom,
+        pipelineElementConfig.payload as InvocablePipelineElementUnion,
+        sourceElement,
+        false);
+    });
+  }
+
+  createAssemblyElement(pipelineElementDomId: string,
+                        pipelineElement: InvocablePipelineElementUnion,
+                        sourceElement,
+                        previewConfig: boolean) {
+    let targetElementId;
+    if (pipelineElement instanceof DataProcessorInvocation) {
+      targetElementId = this.dataProcessorDropped(pipelineElementDomId, pipelineElement as DataProcessorInvocation, true, false);
+      this.connectNodes(sourceElement, targetElementId, previewConfig);
+    } else {
+      targetElementId = this.dataSinkDropped(pipelineElementDomId, pipelineElement, true, false);
+      this.connectNodes(sourceElement, targetElementId, previewConfig);
     }
-
-    connectNodes(sourceElement,
-                 targetElementId,
-                 previewConfig: boolean) {
-        let options;
-        const jsplumbBridge = this.getBridge(previewConfig);
-        // TODO: getJsplumbConfig depends on isPreview. Not implemented yet
-        const jsplumbConfig = this.jsplumbEndpointService.getJsplumbConfig(true);
-        options = sourceElement.hasClass('stream') ?
-          jsplumbConfig.streamEndpointOptions : jsplumbConfig.sepaEndpointOptions;
-        let sourceEndPoint;
-        if (jsplumbBridge.selectEndpoints({source: sourceElement}).length > 0) {
-            if (!(jsplumbBridge.selectEndpoints({source: sourceElement}).get(0).isFull())) {
-                sourceEndPoint = jsplumbBridge.selectEndpoints({source: sourceElement}).get(0);
-            } else {
-                sourceEndPoint = jsplumbBridge.addEndpoint(sourceElement, options);
-            }
-        } else {
-            sourceEndPoint = jsplumbBridge.addEndpoint(sourceElement, options);
-        }
-
-        const targetElement = document.getElementById(targetElementId);
-        const targetEndPoint = jsplumbBridge.selectEndpoints({target: targetElement}).get(0);
-
-        jsplumbBridge.connect({source: sourceEndPoint, target: targetEndPoint, detachable: true});
-        jsplumbBridge.repaintEverything();
+  }
+
+  connectNodes(sourceElementSelector,
+               targetElementId,
+               previewConfig: boolean) {
+    let options;
+    const sourceElement = sourceElementSelector.get()[0];
+    const jsplumbBridge = this.getBridge(previewConfig);
+    const jsplumbConfig = this.jsplumbEndpointService.getJsplumbConfig(true);
+    options = sourceElementSelector.hasClass('stream') ?
+      jsplumbConfig.streamEndpointOptions : jsplumbConfig.sepaEndpointOptions;
+    let sourceEndPoint;
+    const selectedEndpoints = jsplumbBridge.selectEndpoints({source: sourceElement});
+    if (selectedEndpoints.length > 0) {
+      if (!(selectedEndpoints.get(0).isFull())) {
+        sourceEndPoint = jsplumbBridge.selectEndpoints({source: sourceElement}).get(0);
+      } else {
+        sourceEndPoint = jsplumbBridge.addEndpoint(sourceElement, options);
+      }
+    } else {
+      sourceEndPoint = jsplumbBridge.addEndpoint(sourceElement, options);
     }
 
-    createNewPipelineElementConfigWithFixedCoordinates($parentElement, json, isPreview): PipelineElementConfig {
-        const x = $parentElement.position().left;
-        const y = $parentElement.position().top;
-        return this.createNewPipelineElementConfigAtPosition(x, y, json, isPreview);
+    const targetElement = document.getElementById(targetElementId);
+    const targetEndPoint = jsplumbBridge.selectEndpoints({target: targetElement}).get(0);
+
+    jsplumbBridge.connect({source: sourceEndPoint, target: targetEndPoint, detachable: true});
+    jsplumbBridge.repaintEverything();
+  }
+
+  createNewPipelineElementConfigWithFixedCoordinates(sourceElement,
+                                                     pipelineElement: InvocablePipelineElementUnion,
+                                                     isPreview): PipelineElementConfig {
+    const x = sourceElement.position().left;
+    const y = sourceElement.position().top;
+    return this.createNewPipelineElementConfigAtPosition(x, y, pipelineElement, isPreview);
+  }
+
+  createNewPipelineElementConfigAtPosition(x: number,
+                                           y: number,
+                                           json: any,
+                                           isPreview: boolean): PipelineElementConfig {
+    const coord = {'x': x + 200, 'y': y};
+    return this.createNewPipelineElementConfig(json, coord, isPreview, false);
+  }
+
+  createNewPipelineElementConfig(pipelineElement: PipelineElementUnion,
+                                 coordinates,
+                                 isPreview: boolean,
+                                 isCompleted: boolean,
+                                 newElementId?: string): PipelineElementConfig {
+    const displaySettings = isPreview ? 'connectable-preview' : 'connectable-editor';
+    const connectable = 'connectable';
+    const pipelineElementConfig = {} as PipelineElementConfig;
+    pipelineElementConfig.type = PipelineElementTypeUtils
+      .toCssShortHand(PipelineElementTypeUtils.fromType(pipelineElement));
+    pipelineElementConfig.payload = this.clone(pipelineElement, newElementId);
+    pipelineElementConfig.settings = {
+      connectable,
+      openCustomize: !(pipelineElement as any).configured,
+      preview: isPreview,
+      completed: (pipelineElement instanceof SpDataStream || pipelineElement instanceof SpDataSet || isPreview || isCompleted) ? PipelineElementConfigurationStatus.OK : PipelineElementConfigurationStatus.INCOMPLETE,
+      disabled: false,
+      loadingStatus: false,
+      displaySettings,
+      position: {
+        x: coordinates.x,
+        y: coordinates.y
+      }
+    };
+    if (!pipelineElementConfig.payload.dom) {
+      pipelineElementConfig.payload.dom = 'jsplumb_' + this.idCounter + '_' + this.makeId(4);
+      this.idCounter++;
     }
 
-    createNewPipelineElementConfigAtPosition(x: number,
-                                             y: number,
-                                             json: any,
-                                             isPreview: boolean): PipelineElementConfig {
-        const coord = {'x': x + 200, 'y': y};
-        return this.createNewPipelineElementConfig(json, coord, isPreview, false);
+    return pipelineElementConfig;
+  }
+
+  clone(pipelineElement: PipelineElementUnion, newElementId?: string) {
+    if (pipelineElement instanceof SpDataSet) {
+      return SpDataSet.fromData(pipelineElement, new SpDataSet());
+    } else if (pipelineElement instanceof SpDataStream) {
+      return SpDataStream.fromData(pipelineElement, new SpDataStream());
+    } else if (pipelineElement instanceof DataProcessorInvocation) {
+      const clonedPe = DataProcessorInvocation.fromData(pipelineElement, new DataProcessorInvocation());
+      if (newElementId) {
+        this.updateElementIds(clonedPe, newElementId);
+      }
+      return clonedPe;
+    } else {
+      const clonedPe = DataSinkInvocation.fromData(pipelineElement, new DataSinkInvocation());
+      if (newElementId) {
+        this.updateElementIds(clonedPe, newElementId);
+      }
+      return clonedPe;
     }
+  }
 
-    createNewPipelineElementConfig(pipelineElement: PipelineElementUnion,
-                                   coordinates,
-                                   isPreview: boolean,
-                                   isCompleted: boolean,
-                                   newElementId?: string): PipelineElementConfig {
-        const displaySettings = isPreview ? 'connectable-preview' : 'connectable-editor';
-        const connectable = 'connectable';
-        const pipelineElementConfig = {} as PipelineElementConfig;
-        pipelineElementConfig.type = PipelineElementTypeUtils
-          .toCssShortHand(PipelineElementTypeUtils.fromType(pipelineElement));
-        pipelineElementConfig.payload = this.clone(pipelineElement, newElementId);
-        pipelineElementConfig.settings = {connectable,
-            openCustomize: !(pipelineElement as any).configured,
-            preview: isPreview,
-            completed: (pipelineElement instanceof SpDataStream || pipelineElement instanceof SpDataSet || isPreview || isCompleted) ? PipelineElementConfigurationStatus.OK : PipelineElementConfigurationStatus.INCOMPLETE,
-            disabled: false,
-            loadingStatus: false,
-            displaySettings,
-            position: {
-                x: coordinates.x,
-                y: coordinates.y
-            }};
-        if (!pipelineElementConfig.payload.dom) {
-            pipelineElementConfig.payload.dom = 'jsplumb_' + this.idCounter + '_' + this.makeId(4);
-            this.idCounter++;
-        }
-
-        return pipelineElementConfig;
-    }
+  updateElementIds(pipelineElement: PipelineElementUnion, newElementId: string) {
+    pipelineElement.elementId = newElementId;
+    pipelineElement.uri = newElementId;
+  }
 
-    clone(pipelineElement: PipelineElementUnion, newElementId?: string) {
-        if (pipelineElement instanceof SpDataSet) {
-            return SpDataSet.fromData(pipelineElement, new SpDataSet());
-        } else if (pipelineElement instanceof SpDataStream) {
-            return SpDataStream.fromData(pipelineElement, new SpDataStream());
-        } else if (pipelineElement instanceof DataProcessorInvocation) {
-            const clonedPe = DataProcessorInvocation.fromData(pipelineElement, new DataProcessorInvocation());
-            if (newElementId) {
-                this.updateElementIds(clonedPe, newElementId);
-            }
-            return clonedPe;
-        } else {
-            const clonedPe = DataSinkInvocation.fromData(pipelineElement, new DataSinkInvocation());
-            if (newElementId) {
-                this.updateElementIds(clonedPe, newElementId);
-            }
-            return clonedPe;
-        }
-    }
+  makeId(count: number) {
+    let text = '';
+    const possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
 
-    updateElementIds(pipelineElement: PipelineElementUnion, newElementId: string) {
-        pipelineElement.elementId = newElementId;
-        pipelineElement.uri = newElementId;
+    for (let i = 0; i < count; i++) {
+      text += possible.charAt(Math.floor(Math.random() * possible.length));
     }
 
-    makeId(count: number) {
-        let text = '';
-        const possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
-
-        for (let i = 0; i < count; i++) {
-            text += possible.charAt(Math.floor(Math.random() * possible.length));
-        }
-
-        return text;
+    return text;
+  }
+
+  elementDropped(pipelineElementDomId: string,
+                 pipelineElement: PipelineElementUnion,
+                 endpoints: boolean,
+                 preview: boolean): string {
+    if (pipelineElement instanceof SpDataStream) {
+      return this.dataStreamDropped(pipelineElementDomId, pipelineElement as SpDataStream, endpoints, preview);
+    } else if (pipelineElement instanceof SpDataSet) {
+      return this.dataSetDropped(pipelineElementDomId, pipelineElement as SpDataSet, endpoints, preview);
+    } else if (pipelineElement instanceof DataProcessorInvocation) {
+      return this.dataProcessorDropped(pipelineElementDomId, pipelineElement, endpoints, preview);
+    } else if (pipelineElement instanceof DataSinkInvocation) {
+      return this.dataSinkDropped(pipelineElementDomId, pipelineElement, endpoints, preview);
     }
-
-    elementDropped(pipelineElementDomId: string,
-                   pipelineElement: PipelineElementUnion,
-                   endpoints: boolean,
-                   preview: boolean): string {
-        if (pipelineElement instanceof SpDataStream) {
-            return this.dataStreamDropped(pipelineElementDomId, pipelineElement as SpDataStream, endpoints, preview);
-        } else if (pipelineElement instanceof SpDataSet) {
-            return this.dataSetDropped(pipelineElementDomId, pipelineElement as SpDataSet, endpoints, preview);
-        } else if (pipelineElement instanceof DataProcessorInvocation) {
-            return this.dataProcessorDropped(pipelineElementDomId, pipelineElement, endpoints, preview);
-        } else if (pipelineElement instanceof DataSinkInvocation) {
-            return this.dataSinkDropped(pipelineElementDomId, pipelineElement, endpoints, preview);
-        }
+  }
+
+  dataSetDropped(pipelineElementDomId: string,
+                 pipelineElement: SpDataSet,
+                 endpoints: boolean,
+                 preview: boolean) {
+    const jsplumbBridge = this.getBridge(preview);
+    if (endpoints) {
+      const endpointOptions = this.jsplumbEndpointService.getStreamEndpoint(preview, pipelineElementDomId);
+      jsplumbBridge.addEndpoint(pipelineElementDomId, endpointOptions);
     }
+    return pipelineElementDomId;
+  }
 
-    dataSetDropped(pipelineElementDomId: string,
-                   pipelineElement: SpDataSet,
-                   endpoints: boolean,
-                   preview: boolean) {
-        const jsplumbBridge = this.getBridge(preview);
-        if (endpoints) {
-            const endpointOptions = this.jsplumbEndpointService.getStreamEndpoint(preview, pipelineElementDomId);
-            jsplumbBridge.addEndpoint(pipelineElementDomId, endpointOptions);
-        }
-        return pipelineElementDomId;
-    }
 
-
-    dataStreamDropped(pipelineElementDomId: string,
-                      pipelineElement: SpDataStreamUnion,
-                      endpoints: boolean,
-                      preview: boolean) {
-        const jsplumbBridge = this.getBridge(preview);
-        if (endpoints) {
-            const endpointOptions = this.jsplumbEndpointService.getStreamEndpoint(preview, pipelineElementDomId);
-            jsplumbBridge.addEndpoint(pipelineElementDomId, endpointOptions);
-        }
-        return pipelineElementDomId;
+  dataStreamDropped(pipelineElementDomId: string,
+                    pipelineElement: SpDataStreamUnion,
+                    endpoints: boolean,
+                    preview: boolean) {
+    const jsplumbBridge = this.getBridge(preview);
+    if (endpoints) {
+      const endpointOptions = this.jsplumbEndpointService.getStreamEndpoint(preview, pipelineElementDomId);
+      jsplumbBridge.addEndpoint(pipelineElementDomId, endpointOptions);
     }
-
-    dataProcessorDropped(pipelineElementDomId: string,
-                         pipelineElement: DataProcessorInvocation,
-                         endpoints: boolean,
-                         preview: boolean): string {
-        const jsplumbBridge = this.getBridge(preview);
-        this.dataSinkDropped(pipelineElementDomId, pipelineElement, endpoints, preview);
-        if (endpoints) {
-            jsplumbBridge.addEndpoint(pipelineElementDomId,
-              this.jsplumbEndpointService.getOutputEndpoint(preview, pipelineElementDomId));
-        }
-        return pipelineElementDomId;
+    return pipelineElementDomId;
+  }
+
+  dataProcessorDropped(pipelineElementDomId: string,
+                       pipelineElement: DataProcessorInvocation,
+                       endpoints: boolean,
+                       preview: boolean): string {
+    const jsplumbBridge = this.getBridge(preview);
+    this.dataSinkDropped(pipelineElementDomId, pipelineElement, endpoints, preview);
+    if (endpoints) {
+      jsplumbBridge.addEndpoint(pipelineElementDomId,
+        this.jsplumbEndpointService.getOutputEndpoint(preview, pipelineElementDomId));
     }
+    return pipelineElementDomId;
+  }
 
-    dataSinkDropped(pipelineElementDomId: string,
-                    pipelineElement: InvocablePipelineElementUnion,
-                    endpoints: boolean,
-                    preview: boolean): string {
-        const jsplumbBridge = this.getBridge(preview);
-        if (endpoints) {
-            if (pipelineElement.inputStreams.length < 2) { // 1 InputNode
-                jsplumbBridge.addEndpoint(pipelineElementDomId,
-                  this.jsplumbEndpointService.getInputEndpoint(preview, pipelineElementDomId, 0));
-            } else {
-                jsplumbBridge.addEndpoint(pipelineElementDomId,
-                  this.jsplumbEndpointService.getNewTargetPoint(preview, 0, 0.25, pipelineElementDomId, 0));
-                jsplumbBridge.addEndpoint(pipelineElementDomId,
-                  this.jsplumbEndpointService.getNewTargetPoint(preview, 0, 0.75, pipelineElementDomId, 1));
-            }
-        }
-        return pipelineElementDomId;
+  dataSinkDropped(pipelineElementDomId: string,
+                  pipelineElement: InvocablePipelineElementUnion,
+                  endpoints: boolean,
+                  preview: boolean): string {
+    const jsplumbBridge = this.getBridge(preview);
+    if (endpoints) {
+      if (pipelineElement.inputStreams.length < 2) { // 1 InputNode
+        jsplumbBridge.addEndpoint(pipelineElementDomId,
+          this.jsplumbEndpointService.getInputEndpoint(preview, pipelineElementDomId, 0));
+      } else {
+        jsplumbBridge.addEndpoint(pipelineElementDomId,
+          this.jsplumbEndpointService.getNewTargetPoint(preview, 0, 0.25, pipelineElementDomId, 0));
+        jsplumbBridge.addEndpoint(pipelineElementDomId,
+          this.jsplumbEndpointService.getNewTargetPoint(preview, 0, 0.75, pipelineElementDomId, 1));
+      }
     }
+    return pipelineElementDomId;
+  }
 
-    getBridge(previewConfig: boolean): JsplumbBridge {
-        return this.jsplumbFactory.getJsplumbBridge(previewConfig);
-    }
+  getBridge(previewConfig: boolean): JsplumbBridge {
+    return this.jsplumbFactory.getJsplumbBridge(previewConfig);
+  }
 }