You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/02/28 21:39:50 UTC

[incubator-streampipes] branch dev updated: [hotfix] Fix bug in pipeline preview

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 34504a6  [hotfix] Fix bug in pipeline preview
34504a6 is described below

commit 34504a66dcae570a2b4f124239df9cad9d163e49
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Mon Feb 28 22:39:03 2022 +0100

    [hotfix] Fix bug in pipeline preview
---
 .../manager/data/PipelineGraphBuilder.java         |   6 +-
 .../manager/execution/http/PipelineExecutor.java   |  20 +--
 .../manager/matching/GroundingSelector.java        |   3 +-
 .../manager/matching/InvocationGraphBuilder.java   | 194 ---------------------
 .../matching/PipelineVerificationHandler.java      | 194 ---------------------
 .../matching/PipelineVerificationHandlerV2.java    |  46 +++++
 .../streampipes/manager/operations/Operations.java |  42 +----
 .../manager/preview/PipelinePreview.java           |  43 +++--
 .../manager/recommender/AllElementsProvider.java   |   4 +
 .../streampipes/rest/impl/PipelineResource.java    |   2 +-
 10 files changed, 99 insertions(+), 455 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
index a83c125..bbcadfb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
@@ -28,9 +28,9 @@ import java.util.stream.Collectors;
 
 public class PipelineGraphBuilder {
 
-    private Pipeline pipeline;
-    private List<NamedStreamPipesEntity> allPipelineElements;
-    private List<InvocableStreamPipesEntity> invocableElements;
+    private final Pipeline pipeline;
+    private final List<NamedStreamPipesEntity> allPipelineElements;
+    private final List<InvocableStreamPipesEntity> invocableElements;
 
     public PipelineGraphBuilder(Pipeline pipeline) {
         this.pipeline = pipeline;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index ed6dda9..d484bce 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -55,21 +55,15 @@ import java.util.stream.Collectors;
 
 public class PipelineExecutor {
 
-  private Pipeline pipeline;
-  private boolean visualize;
-  private boolean storeStatus;
-  private boolean monitor;
-  private boolean forceStop;
+  private final Pipeline pipeline;
+  private final boolean storeStatus;
+  private final boolean forceStop;
 
   public PipelineExecutor(Pipeline pipeline,
-                          boolean visualize,
                           boolean storeStatus,
-                          boolean monitor,
                           boolean forceStop) {
     this.pipeline = pipeline;
-    this.visualize = visualize;
     this.storeStatus = storeStatus;
-    this.monitor = monitor;
     this.forceStop = forceStop;
   }
 
@@ -200,14 +194,6 @@ public class PipelineExecutor {
             .detachGraphs();
 
     if (status.isSuccess()) {
-      if (visualize) {
-        StorageDispatcher
-                .INSTANCE
-                .getNoSqlStore()
-                .getVisualizationStorageApi()
-                .deleteVisualization(pipeline.getPipelineId());
-      }
-
       PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
               new PipelineStatusMessage(pipeline.getPipelineId(),
                       System.currentTimeMillis(),
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java
index f32763b..8895699 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java
@@ -30,7 +30,8 @@ public abstract class GroundingSelector {
     protected NamedStreamPipesEntity source;
     protected Set<InvocableStreamPipesEntity> targets;
 
-    public GroundingSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
+    public GroundingSelector(NamedStreamPipesEntity source,
+                             Set<InvocableStreamPipesEntity> targets) {
         this.source = source;
         this.targets = targets;
     }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
deleted file mode 100644
index 51b3d43..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.matching;
-
-import org.apache.streampipes.config.backend.BackendConfig;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.matching.output.OutputSchemaFactory;
-import org.apache.streampipes.manager.matching.output.OutputSchemaGenerator;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.grounding.EventGrounding;
-import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
-import org.apache.streampipes.model.output.OutputStrategy;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class InvocationGraphBuilder {
-
-  private PipelineGraph pipelineGraph;
-  private String pipelineId;
-  private Integer uniquePeIndex = 0;
-
-  private List<InvocableStreamPipesEntity> graphs;
-
-  public InvocationGraphBuilder(PipelineGraph pipelineGraph, String pipelineId) {
-    this.graphs = new ArrayList<>();
-    this.pipelineGraph = pipelineGraph;
-    this.pipelineId = pipelineId;
-
-  }
-
-  public List<InvocableStreamPipesEntity> buildGraphs() {
-
-    List<SpDataStream> streams = PipelineGraphHelpers.findStreams(pipelineGraph);
-
-    for (SpDataStream stream : streams) {
-      Set<InvocableStreamPipesEntity> connectedElements = getConnections(stream);
-      configure(stream, connectedElements);
-    }
-
-    return graphs;
-  }
-
-  private void configure(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
-
-    EventGrounding inputGrounding = new GroundingBuilder(source, targets)
-            .getEventGrounding();
-
-    if (source instanceof InvocableStreamPipesEntity) {
-      if (source instanceof DataProcessorInvocation && ((DataProcessorInvocation) source).isConfigured()) {
-
-        DataProcessorInvocation dataProcessorInvocation = (DataProcessorInvocation) source;
-        Tuple2<EventSchema, ? extends OutputStrategy> outputSettings;
-        OutputSchemaGenerator<?> schemaGenerator = new OutputSchemaFactory(dataProcessorInvocation)
-                .getOuputSchemaGenerator();
-
-        if (((DataProcessorInvocation) source).getInputStreams().size() == 1) {
-          outputSettings = schemaGenerator.buildFromOneStream(dataProcessorInvocation
-                  .getInputStreams()
-                  .get(0));
-        } else if (graphExists(dataProcessorInvocation.getDOM())) {
-          DataProcessorInvocation existingInvocation = (DataProcessorInvocation) find(dataProcessorInvocation.getDOM());
-
-          outputSettings = schemaGenerator.buildFromTwoStreams(existingInvocation
-                  .getInputStreams().get(0), dataProcessorInvocation.getInputStreams().get(1));
-          graphs.remove(existingInvocation);
-        } else {
-          outputSettings = new Tuple2<>(new EventSchema(), dataProcessorInvocation
-                  .getOutputStrategies().get(0));
-        }
-
-        SpDataStream outputStream = new SpDataStream();
-        outputStream.setEventGrounding(inputGrounding);
-        dataProcessorInvocation.setOutputStrategies(Collections.singletonList(outputSettings.b));
-        outputStream.setEventSchema(outputSettings.a);
-        ((DataProcessorInvocation) source).setOutputStream(outputStream);
-      }
-
-      if (!graphExists(source.getDOM())) {
-        graphs.add((InvocableStreamPipesEntity) source);
-      }
-    }
-
-    targets.forEach(t -> {
-      t.getInputStreams()
-              .get(getIndex(source.getDOM(), t))
-              .setEventGrounding(inputGrounding);
-
-      t.getInputStreams()
-              .get(getIndex(source.getDOM(), t))
-              .setEventSchema(getInputSchema(source));
-
-      String elementIdentifier = makeElementIdentifier(pipelineId, inputGrounding
-              .getTransportProtocol().getTopicDefinition().getActualTopicName(), t.getName());
-
-      t.setElementId(t.getBelongsTo() + ":" + elementIdentifier);
-      t.setCorrespondingPipeline(pipelineId);
-      t.setStatusInfoSettings(makeStatusInfoSettings(elementIdentifier));
-
-      uniquePeIndex++;
-
-      configure(t, getConnections(t));
-
-    });
-
-  }
-
-  private ElementStatusInfoSettings makeStatusInfoSettings(String elementIdentifier) {
-    ElementStatusInfoSettings statusSettings = new ElementStatusInfoSettings();
-    statusSettings.setKafkaHost(BackendConfig.INSTANCE.getKafkaHost());
-    statusSettings.setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
-    statusSettings.setErrorTopic(elementIdentifier + ".error");
-    statusSettings.setStatsTopic(elementIdentifier + ".stats");
-    statusSettings.setElementIdentifier(elementIdentifier);
-
-    return statusSettings;
-  }
-
-  private String makeElementIdentifier(String pipelineId, String topic, String elementName) {
-    return pipelineId
-            + "-"
-            + topic
-            + "-"
-            + elementName.replaceAll(" ", "").toLowerCase()
-            + "-"
-            + uniquePeIndex;
-  }
-
-  private EventSchema getInputSchema(NamedStreamPipesEntity source) {
-    if (source instanceof SpDataStream) {
-      return ((SpDataStream) source).getEventSchema();
-    } else if (source instanceof DataProcessorInvocation) {
-      return ((DataProcessorInvocation) source)
-              .getOutputStream()
-              .getEventSchema();
-    } else {
-      throw new IllegalArgumentException();
-    }
-  }
-
-  private Set<InvocableStreamPipesEntity> getConnections(NamedStreamPipesEntity source) {
-    Set<String> outgoingEdges = pipelineGraph.outgoingEdgesOf(source);
-    return outgoingEdges
-            .stream()
-            .map(o -> pipelineGraph.getEdgeTarget(o))
-            .map(g -> (InvocableStreamPipesEntity) g)
-            .collect(Collectors.toSet());
-
-  }
-
-  private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
-    return targetElement.getConnectedTo().indexOf(sourceDomId);
-  }
-
-  private boolean graphExists(String domId) {
-    return graphs
-            .stream()
-            .anyMatch(g -> g.getDOM().equals(domId));
-  }
-
-  private InvocableStreamPipesEntity find(String domId) {
-    return graphs
-            .stream()
-            .filter(g -> g.getDOM().equals(domId))
-            .findFirst()
-            .get();
-  }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
deleted file mode 100644
index 6b63cf6..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.matching;
-
-import org.apache.streampipes.commons.exceptions.NoSepaInPipelineException;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.manager.matching.mapping.AbstractRequirementsSelectorGenerator;
-import org.apache.streampipes.manager.matching.mapping.RequirementsSelectorGeneratorFactory;
-import org.apache.streampipes.manager.selector.PropertySelectorGenerator;
-import org.apache.streampipes.manager.util.PipelineVerificationUtils;
-import org.apache.streampipes.manager.util.TreeUtils;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.client.exception.InvalidConnectionException;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.output.CustomOutputStrategy;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineModification;
-import org.apache.streampipes.model.staticproperty.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class PipelineVerificationHandler {
-
-  private final Pipeline pipeline;
-  private final PipelineModificationMessage pipelineModificationMessage;
-  private List<InvocableStreamPipesEntity> invocationGraphs;
-  private final InvocableStreamPipesEntity rootPipelineElement;
-
-  public PipelineVerificationHandler(Pipeline pipeline) throws NoSepaInPipelineException {
-    this(pipeline, PipelineVerificationUtils.getRootNode(pipeline));
-  }
-
-  public PipelineVerificationHandler(Pipeline pipeline,
-                                     InvocableStreamPipesEntity rootNode) {
-    this.pipeline = pipeline;
-    this.rootPipelineElement = rootNode;
-    this.invocationGraphs = makeInvocationGraphs();
-    this.pipelineModificationMessage = new PipelineModificationMessage();
-  }
-
-  /**
-   * Determines whether the current pipeline is valid
-   *
-   * @return PipelineValidationHandler
-   * @throws InvalidConnectionException if the connection is not considered valid
-   */
-  public PipelineVerificationHandler validateConnection() throws InvalidConnectionException {
-    invocationGraphs = new ConnectionValidator(pipeline, invocationGraphs, rootPipelineElement)
-            .validateConnection();
-    return this;
-  }
-
-  /**
-   * computes mapping properties (based on input/output matching)
-   *
-   * @return PipelineValidationHandler
-   */
-  public PipelineVerificationHandler computeMappingProperties() {
-    List<String> connectedTo = rootPipelineElement.getConnectedTo();
-    String domId = rootPipelineElement.getDOM();
-
-    List<SpDataStream> tempStreams = new ArrayList<>();
-
-    for (int i = 0; i < connectedTo.size(); i++) {
-      NamedStreamPipesEntity element = TreeUtils.findSEPAElement(rootPipelineElement
-              .getConnectedTo().get(i), pipeline.getSepas(), pipeline
-              .getStreams());
-
-      SpDataStream incomingStream;
-
-      if (element instanceof DataProcessorInvocation || element instanceof SpDataStream) {
-        if (element instanceof DataProcessorInvocation) {
-          DataProcessorInvocation ancestor = (DataProcessorInvocation) TreeUtils.findByDomId(
-                  connectedTo.get(i), invocationGraphs);
-
-          incomingStream = ancestor.getOutputStream();
-        } else {
-          incomingStream = (SpDataStream) element;
-        }
-
-        tempStreams.add(incomingStream);
-        if (rootPipelineElement.getStreamRequirements().size() - 1 == i) {
-          updateStaticProperties(tempStreams, rootPipelineElement.getStaticProperties());
-          PipelineModification modification = new PipelineModification(
-                  domId,
-                  rootPipelineElement.getElementId(),
-                  rootPipelineElement.getStaticProperties());
-          modification.setInputStreams(tempStreams);
-          updateOutputStrategy(tempStreams);
-          if (rootPipelineElement instanceof DataProcessorInvocation) {
-            modification.setOutputStrategies(((DataProcessorInvocation) rootPipelineElement).getOutputStrategies());
-          }
-          pipelineModificationMessage.addPipelineModification(modification);
-        }
-      }
-    }
-    return this;
-  }
-
-  private void updateStaticProperties(List<SpDataStream> inputStreams,
-                                      List<StaticProperty> staticProperties) {
-    staticProperties
-            .stream()
-            .filter(sp -> (sp instanceof CollectionStaticProperty
-                    || sp instanceof MappingProperty
-                    || sp instanceof StaticPropertyGroup
-                    || sp instanceof StaticPropertyAlternatives))
-            .forEach(property -> updateStaticProperty(inputStreams, property));
-  }
-
-  private void updateStaticProperty(List<SpDataStream> inputStreams, StaticProperty property) {
-    if (property instanceof MappingProperty) {
-      MappingProperty mappingProperty = property.as(MappingProperty.class);
-      AbstractRequirementsSelectorGenerator generator = RequirementsSelectorGeneratorFactory.getRequirementsSelector(mappingProperty, inputStreams, rootPipelineElement);
-      mappingProperty.setMapsFromOptions(generator.generateSelectors());
-    } else if (property instanceof StaticPropertyGroup) {
-      updateStaticProperties(inputStreams, property.as(StaticPropertyGroup.class).getStaticProperties());
-    } else if (property instanceof StaticPropertyAlternatives) {
-      ((StaticPropertyAlternatives) property)
-              .getAlternatives()
-              .forEach(al -> updateStaticProperty(inputStreams, al.getStaticProperty()));
-      // TODO
-    } else if (property instanceof CollectionStaticProperty) {
-      CollectionStaticProperty collection = property.as(CollectionStaticProperty.class);
-      if (hasMappingEntry(collection)) {
-        updateStaticProperty(inputStreams, collection.getStaticPropertyTemplate());
-      } else if (hasGroupEntry(collection)) {
-        updateStaticProperties(inputStreams, collection.getStaticPropertyTemplate().as(StaticPropertyGroup.class).getStaticProperties());
-      }
-    }
-  }
-
-  private Boolean hasMappingEntry(CollectionStaticProperty property) {
-    return property.getStaticPropertyTemplate() instanceof MappingProperty;
-  }
-
-  private Boolean hasGroupEntry(CollectionStaticProperty property) {
-    return property.getStaticPropertyTemplate() instanceof StaticPropertyGroup;
-  }
-
-  private void updateOutputStrategy(List<SpDataStream> inputStreams) {
-
-    if (rootPipelineElement instanceof DataProcessorInvocation) {
-      ((DataProcessorInvocation) rootPipelineElement)
-              .getOutputStrategies()
-              .stream()
-              .filter(strategy -> strategy instanceof CustomOutputStrategy)
-              .forEach(strategy -> {
-                CustomOutputStrategy outputStrategy = (CustomOutputStrategy) strategy;
-                if (inputStreams.size() == 1 || (inputStreams.size() > 1 && !(outputStrategy
-                        .isOutputRight()))) {
-                  outputStrategy.setAvailablePropertyKeys(new PropertySelectorGenerator
-                          (inputStreams.get(0).getEventSchema(), false).generateSelectors());
-                } else {
-                  outputStrategy.setAvailablePropertyKeys(new PropertySelectorGenerator
-                          (inputStreams.get(0).getEventSchema(), inputStreams.get(1)
-                                  .getEventSchema(), false)
-                          .generateSelectors());
-                }
-              });
-    }
-  }
-
-  public PipelineModificationMessage getPipelineModificationMessage() {
-    return pipelineModificationMessage;
-  }
-
-  public List<InvocableStreamPipesEntity> makeInvocationGraphs() {
-    PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
-    return new InvocationGraphBuilder(pipelineGraph, null).buildGraphs();
-  }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
index e7d5025..e896dfe 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
@@ -19,8 +19,16 @@ package org.apache.streampipes.manager.matching;
 
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import org.apache.streampipes.manager.recommender.AllElementsProvider;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.message.PipelineModificationMessage;
 import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineModification;
+
+import java.util.*;
 
 public class PipelineVerificationHandlerV2 {
 
@@ -34,4 +42,42 @@ public class PipelineVerificationHandlerV2 {
     PipelineGraph graph = new PipelineGraphBuilder(pipeline).buildGraph();
     return new PipelineModificationGenerator(graph).buildPipelineModificationMessage();
   }
+
+  public List<NamedStreamPipesEntity> verifyAndBuildGraphs(boolean ignoreUnconfigured) {
+    List<PipelineModification> pipelineModifications = verifyPipeline().getPipelineModifications();
+    List<NamedStreamPipesEntity> allElements = new AllElementsProvider(pipeline).getAllElements();
+    List<NamedStreamPipesEntity> result = new ArrayList<>();
+    allElements.forEach(pipelineElement -> {
+      Optional<PipelineModification> modificationOpt = getModification(pipelineElement.getDOM(), pipelineModifications);
+      if (modificationOpt.isPresent()) {
+        PipelineModification modification = modificationOpt.get();
+        if (pipelineElement instanceof InvocableStreamPipesEntity) {
+          ((InvocableStreamPipesEntity) pipelineElement).setInputStreams(modification.getInputStreams());
+          ((InvocableStreamPipesEntity) pipelineElement).setStaticProperties(modification.getStaticProperties());
+          if (pipelineElement instanceof DataProcessorInvocation) {
+            ((DataProcessorInvocation) pipelineElement).setOutputStream(modification.getOutputStream());
+            if (((DataProcessorInvocation) pipelineElement).getOutputStream().getEventGrounding() == null) {
+              EventGrounding grounding = new GroundingBuilder(pipelineElement, Collections.emptySet()).getEventGrounding();
+              ((DataProcessorInvocation) pipelineElement).getOutputStream().setEventGrounding(grounding);
+            }
+            if (modification.getOutputStrategies() != null) {
+              ((DataProcessorInvocation) pipelineElement).setOutputStrategies(modification.getOutputStrategies());
+            }
+          }
+        }
+        if (!ignoreUnconfigured || modification.isPipelineElementValid()) {
+          result.add(pipelineElement);
+        }
+      } else {
+        result.add(pipelineElement);
+      }
+    });
+
+    return result;
+  }
+
+  private Optional<PipelineModification> getModification(String id,
+                                                         List<PipelineModification> modifications) {
+    return modifications.stream().filter(m -> m.getDomId().equals(id)).findFirst();
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 704aa85..4b2c65c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -56,36 +56,17 @@ import java.util.List;
 
 /**
  * class that provides several (partial) pipeline verification methods
- *
- * @author riemer
  */
 
 public class Operations {
 
-
-  public static PipelineModificationMessage validatePipeline(Pipeline pipeline, boolean isPartial) throws Exception {
-    return validatePipeline(pipeline, isPartial, "");
-  }
-
   /**
-   * This method is a fix for the streamsets integration. Remove the username from the signature when you don't need it anymore
    *
-   * @param pipeline
-   * @param isPartial
-   * @param username
-   * @return
-   * @throws Exception
+   * @param pipeline the pipeline to validate
+   * @return PipelineModificationMessage a message containing desired pipeline modifications
    */
-  public static PipelineModificationMessage validatePipeline(Pipeline pipeline, boolean isPartial, String username)
-          throws Exception {
+  public static PipelineModificationMessage validatePipeline(Pipeline pipeline) throws Exception {
     return new PipelineVerificationHandlerV2(pipeline).verifyPipeline();
-//    PipelineVerificationHandler validator = new PipelineVerificationHandler(
-//            pipeline);
-//    return validator
-//            .validateConnection()
-//            .computeMappingProperties()
-//            .storeConnection()
-//            .getPipelineModificationMessage();
   }
 
   public static DataSetModificationMessage updateDataSet(SpDataSet dataSet) {
@@ -117,18 +98,17 @@ public class Operations {
 
   public static PipelineOperationStatus startPipeline(
           Pipeline pipeline) {
-    return startPipeline(pipeline,true, true, false);
+    return startPipeline(pipeline, true);
   }
 
   public static PipelineOperationStatus startPipeline(
-          Pipeline pipeline, boolean visualize, boolean storeStatus,
-          boolean monitor) {
-    return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, false).startPipeline();
+          Pipeline pipeline, boolean storeStatus) {
+    return new PipelineExecutor(pipeline, storeStatus, false).startPipeline();
   }
 
   public static PipelineOperationStatus stopPipeline(
           Pipeline pipeline, boolean forceStop) {
-    return stopPipeline(pipeline, true, true, false, forceStop);
+    return stopPipeline(pipeline, true, forceStop);
   }
 
   public static List<PipelineOperationStatus> stopAllPipelines(boolean forceStop) {
@@ -145,11 +125,9 @@ public class Operations {
   }
 
   public static PipelineOperationStatus stopPipeline(Pipeline pipeline,
-                                                     boolean visualize,
                                                      boolean storeStatus,
-                                                     boolean monitor,
                                                      boolean forceStop) {
-    return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, forceStop).stopPipeline();
+    return new PipelineExecutor(pipeline, storeStatus, forceStop).stopPipeline();
   }
 
   public static List<ExtensionsServiceEndpointItem> getEndpointUriContents(List<ExtensionsServiceEndpoint> endpoints) {
@@ -176,10 +154,6 @@ public class Operations {
     return new PipelineTemplateInvocationHandler(userSid, pipelineTemplateInvocation).handlePipelineInvocation();
   }
 
-  public static PipelineOperationStatus handlePipelineTemplateInvocation(String username, PipelineTemplateInvocation pipelineTemplateInvocation, PipelineTemplateDescription pipelineTemplateDescription) {
-    return new PipelineTemplateInvocationHandler(username, pipelineTemplateInvocation, pipelineTemplateDescription).handlePipelineInvocation();
-  }
-
   public static PipelineTemplateInvocation getPipelineInvocationTemplate(SpDataStream dataStream, PipelineTemplateDescription pipelineTemplateDescription) {
     return new PipelineTemplateInvocationGenerator(dataStream, pipelineTemplateDescription).generateInvocation();
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 45540a2..1cb792d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -17,10 +17,12 @@
  */
 package org.apache.streampipes.manager.preview;
 
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
 import org.apache.streampipes.manager.execution.http.HttpRequestBuilder;
-import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
+import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
 import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
@@ -41,14 +43,16 @@ public class PipelinePreview {
   public PipelinePreviewModel initiatePreview(Pipeline pipeline) {
     String previewId = generatePreviewId();
     pipeline.setActions(new ArrayList<>());
-    PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
-    List<NamedStreamPipesEntity> graphs = new ArrayList<>(new InvocationGraphBuilder(pipelineGraph, previewId).buildGraphs());
-    graphs.addAll(pipeline.getStreams().stream().filter(stream -> !(stream instanceof SpDataSet)).collect(Collectors.toList()));
+    List<NamedStreamPipesEntity> pipelineElements = new PipelineVerificationHandlerV2(pipeline)
+            .verifyAndBuildGraphs(true)
+            .stream()
+            .filter(pe -> !(pe instanceof SpDataSet))
+            .collect(Collectors.toList());
 
-    invokeGraphs(filter(graphs));
-    storeGraphs(previewId, graphs);
+    invokeGraphs(filter(pipelineElements));
+    storeGraphs(previewId, pipelineElements);
 
-    return makePreviewModel(previewId, graphs);
+    return makePreviewModel(previewId, pipelineElements);
   }
 
   public void deletePreview(String previewId) {
@@ -77,12 +81,29 @@ public class PipelinePreview {
     }
   }
 
+  private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException {
+    return new ExtensionsServiceEndpointGenerator(
+            g.getAppId(),
+            ExtensionsServiceEndpointUtils.getPipelineElementType(g))
+            .getEndpointResourceUrl();
+  }
+
   private void invokeGraphs(List<InvocableStreamPipesEntity> graphs) {
-    graphs.forEach(g -> new HttpRequestBuilder(g, g.getBelongsTo(), null).invoke());
+    graphs.forEach(g -> {
+      try {
+        g.setSelectedEndpointUrl(findSelectedEndpoint(g));
+        new HttpRequestBuilder(g, g.getSelectedEndpointUrl(), null).invoke();
+      } catch (NoServiceEndpointsAvailableException e) {
+        e.printStackTrace();
+      }
+    });
   }
 
   private void detachGraphs(List<InvocableStreamPipesEntity> graphs) {
-    graphs.forEach(g -> new HttpRequestBuilder(g, g.getUri(), null).detach());
+    graphs.forEach(g -> {
+      String endpointUrl = g.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(g.getElementId());
+      new HttpRequestBuilder(g, endpointUrl, null).detach();
+    });
   }
 
   private void deleteGraphs(String previewId) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
index f8a04bd..b2c3130 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
@@ -40,6 +40,10 @@ public class AllElementsProvider {
             .collect(Collectors.toList());
   }
 
+  public List<NamedStreamPipesEntity> getAllElements() {
+    return allElements;
+  }
+
   public NamedStreamPipesEntity findElement(String domId) {
     return this.allElements
             .stream()
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index c23fa00..6781649 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -213,7 +213,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
   public Response update(Pipeline pipeline) {
     try {
-      return ok(Operations.validatePipeline(pipeline, true));
+      return ok(Operations.validatePipeline(pipeline));
     } catch (JsonSyntaxException e) {
       return badRequest(new Notification(NotificationType.UNKNOWN_ERROR,
               e.getMessage()));