You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/31 16:07:18 UTC

[incubator-streampipes] 03/03: [WIP] improve pipeline executor

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 5aff17f9c09c3c194444e0e661d932daf47cdce7
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Dec 31 17:06:41 2021 +0100

    [WIP] improve pipeline executor
---
 .../model/pipeline/PipelineElementStatus.java      |  10 +
 .../manager/data/PipelineGraphBuilder.java         |   6 +-
 .../manager/execution/http/HttpRequestBuilder.java |   2 +
 .../pipeline/AbstractPipelineExecutor.java         | 628 ---------------------
 .../pipeline/PipelineMigrationHelpers.java         |  21 -
 .../pipeline/executor/PipelineExecutor.java        |  50 +-
 .../pipeline/executor/PipelineExecutorBuilder.java |  66 +--
 .../pipeline/executor/PipelineExecutorFactory.java |  24 +-
 .../operations/types/MigrationOperation.java       |  21 -
 .../operations/types/ReconfigurationOperation.java |  21 -
 .../EntitiesLifecycleObject.java}                  |   6 +-
 .../GetStateStep.java}                             |   7 +-
 .../PipelineExecutionStep.java}                    |   7 +-
 .../PrepareMigrationStep.java}                     |   7 +-
 .../PrepareStartPipelineStep.java}                 |   6 +-
 .../ReconfigureElementStep.java}                   |   7 +-
 .../StartGraphsAndAssociatedRelaysStep.java}       |   7 +-
 .../StartPipelineStep.java}                        |   6 +-
 .../StartRelaysStep.java}                          |   7 +-
 .../StopGraphsAndAssociatedRelaysStep.java}        |   7 +-
 .../StopPipelineStep.java}                         |  25 +-
 .../StopRelaysStep.java}                           |   7 +-
 .../StoreMigratedPipelineStep.java}                |   6 +-
 .../StorePipelineStep.java}                        |   6 +-
 .../pipeline/executor/utils/RelayUtils.java        |   2 +-
 .../pipeline/executor/utils/StatusUtils.java       |   2 +-
 .../migrate-pipeline-processors.component.ts       |  23 +-
 27 files changed, 140 insertions(+), 847 deletions(-)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementStatus.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementStatus.java
index 7e7274b..54f34e8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementStatus.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementStatus.java
@@ -25,6 +25,8 @@ public class PipelineElementStatus {
 	private String optionalMessage;
 	private String operation;
 	private String elementNode;
+	//TODO: Assess if runningInstanceId is needed separately or if it can be combined with the elementId
+	private String runningInstanceId;
 	
 	private boolean success;
 
@@ -85,4 +87,12 @@ public class PipelineElementStatus {
 	public void setElementNode(String elementNode) {
 		this.elementNode = elementNode;
 	}
+
+	public String getRunningInstanceId() {
+		return runningInstanceId;
+	}
+
+	public void setRunningInstanceId(String runningInstanceId) {
+		this.runningInstanceId = runningInstanceId;
+	}
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
index a83c125..bbcadfb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
@@ -28,9 +28,9 @@ import java.util.stream.Collectors;
 
 public class PipelineGraphBuilder {
 
-    private Pipeline pipeline;
-    private List<NamedStreamPipesEntity> allPipelineElements;
-    private List<InvocableStreamPipesEntity> invocableElements;
+    private final Pipeline pipeline;
+    private final List<NamedStreamPipesEntity> allPipelineElements;
+    private final List<InvocableStreamPipesEntity> invocableElements;
 
     public PipelineGraphBuilder(Pipeline pipeline) {
         this.pipeline = pipeline;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index dc0fdd9..82272c8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -99,10 +99,12 @@ public class HttpRequestBuilder {
     PipelineElementStatus status = new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(),
             response.getOptionalMessage());
     if(payload instanceof InvocableStreamPipesEntity){
+      status.setRunningInstanceId(((InvocableStreamPipesEntity) payload).getDeploymentRunningInstanceId());
       status.setElementNode(((InvocableStreamPipesEntity)payload).getDeploymentTargetNodeId());
       status.setOperation(action);
     }
     else if(payload instanceof SpDataStreamRelayContainer){
+      status.setRunningInstanceId(((SpDataStreamRelayContainer) payload).getRunningStreamRelayInstanceId() + " relay");
       status.setElementNode(((SpDataStreamRelayContainer)payload).getDeploymentTargetNodeId());
       status.setOperation(action + " relay");
     }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
deleted file mode 100644
index 01c4f2c..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
-
-import org.apache.streampipes.manager.execution.http.StateSubmitter;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.EventGrounding;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
-import org.apache.streampipes.storage.api.INodeDataStreamRelay;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.user.management.encryption.CredentialsManager;
-
-import java.security.GeneralSecurityException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-public abstract class AbstractPipelineExecutor {
-
-    protected Pipeline pipeline;
-    protected boolean visualize;
-    protected boolean storeStatus;
-    protected boolean monitor;
-
-    public AbstractPipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
-        this.pipeline = pipeline;
-        this.visualize = visualize;
-        this.storeStatus = storeStatus;
-        this.monitor = monitor;
-    }
-
-    // standard methods
-    protected void setPipelineStarted(Pipeline pipeline) {
-        pipeline.setRunning(true);
-        pipeline.setStartedAt(new Date().getTime());
-        getPipelineStorageApi().updatePipeline(pipeline);
-    }
-
-    protected void setPipelineStopped(Pipeline pipeline) {
-        pipeline.setRunning(false);
-        getPipelineStorageApi().updatePipeline(pipeline);
-    }
-
-    protected void deleteVisualization(String pipelineId) {
-        StorageDispatcher.INSTANCE
-                .getNoSqlStore()
-                .getVisualizationStorageApi()
-                .deleteVisualization(pipelineId);
-    }
-
-    protected void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
-                                       List<SpDataSet> dataSets) {
-        TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
-        TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
-    }
-
-    protected void storeDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
-        //relays.forEach(StreamPipesClusterManager::persistDataStreamRelay);
-        relays.forEach(relay -> getDataStreamRelayApi().addRelayContainer(relay));
-    }
-
-    protected void deleteDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
-        //relays.forEach(StreamPipesClusterManager::deleteDataStreamRelay);
-        relays.forEach(relay -> getDataStreamRelayApi().deleteRelayContainer(relay));
-    }
-
-    protected void updateDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
-        //relays.forEach(StreamPipesClusterManager::updateDataStreamRelay);
-        relays.forEach(relay -> getDataStreamRelayApi().updateRelayContainer(relay));
-    }
-
-
-    protected PipelineOperationStatus startPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
-                                                                     List<SpDataStreamRelayContainer> relays){
-        if (graphs.isEmpty()) {
-            return initPipelineOperationStatus();
-        }
-        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
-                graphs, new ArrayList<>(), relays).invokePipelineElementsAndRelays();
-    }
-
-    protected PipelineOperationStatus stopPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
-                                                                    List<SpDataStreamRelayContainer> relays){
-        if (graphs.isEmpty()) {
-            return initPipelineOperationStatus();
-        }
-        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
-                graphs, new ArrayList<>(),relays).detachPipelineElementsAndRelays();
-    }
-
-    protected PipelineOperationStatus startRelays(List<SpDataStreamRelayContainer> relays){
-        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), new ArrayList<>(), new ArrayList<>(),
-                relays).invokeRelaysOnMigration();
-    }
-
-    protected PipelineOperationStatus stopRelays(List<SpDataStreamRelayContainer> relays){
-        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),new ArrayList<>(), new ArrayList<>(),
-                relays).detachRelaysOnMigration();
-    }
-
-    protected PipelineElementStatus getState(InvocableStreamPipesEntity graph){
-        return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, null).getElementState();
-    }
-
-    protected PipelineElementStatus setState(InvocableStreamPipesEntity graph, String state){
-        return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, state).setElementState();
-    }
-
-    protected List<SpDataStreamRelayContainer> findRelaysWhenStopping(List<NamedStreamPipesEntity> predecessors,
-                                                          InvocableStreamPipesEntity target){
-
-        List<SpDataStreamRelayContainer> relays = new ArrayList<>();
-
-        predecessors.forEach(pred -> {
-            List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
-            SpDataStreamRelayContainer relayContainer = new SpDataStreamRelayContainer();
-
-            if (pred instanceof DataProcessorInvocation){
-                //Data Processor
-                DataProcessorInvocation graph = (DataProcessorInvocation) pred;
-                if (differentDeploymentTargets(pred, target)) {
-
-                    // TODO only add if no other processor or sink depends on relay
-                    String predDOMId = pred.getDOM();
-                    String targetRunningInstanceId = target.getDeploymentRunningInstanceId();
-                    Optional<DataProcessorInvocation> foundProcessor = pipeline.getSepas().stream()
-                            .filter(processor -> processor.getConnectedTo().contains(predDOMId))
-                            .filter(processor -> !processor.getDeploymentRunningInstanceId().equals(targetRunningInstanceId))
-                            .findAny();
-
-                    Optional<DataSinkInvocation> foundSink = pipeline.getActions().stream()
-                            .filter(action -> action.getConnectedTo().contains(predDOMId))
-                            .findAny();
-
-                    boolean foundDependencyOnDifferentTarget = false;
-                    if (foundProcessor.isPresent()) {
-                        foundDependencyOnDifferentTarget =  differentDeploymentTargets(foundProcessor.get(), target);
-                    }
-
-                    if (foundSink.isPresent()) {
-                        foundDependencyOnDifferentTarget =  differentDeploymentTargets(foundSink.get(), target);
-                    }
-
-                    if (foundDependencyOnDifferentTarget) {
-                        dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
-
-                        relayContainer = new SpDataStreamRelayContainer(graph);
-                        relayContainer.setOutputStreamRelays(dataStreamRelays);
-
-                        relays.add(relayContainer);
-                    }
-
-                }
-            } else if (pred instanceof SpDataStream){
-                //DataStream
-                SpDataStream stream = (SpDataStream) pred;
-                if (differentDeploymentTargets(stream, target)){
-
-                    String id = extractUniqueAdpaterId(stream.getElementId());
-                    //There is a relay that needs to be removed
-                    dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
-                            .get(getIndex(pred.getDOM(), target))
-                            .getEventGrounding())));
-                    String relayStrategy = pipeline.getEventRelayStrategy();
-                    relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
-                }
-            }
-        });
-        return relays;
-    }
-
-
-    protected List<SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
-                                                          InvocableStreamPipesEntity target){
-
-        List<SpDataStreamRelayContainer> relays = new ArrayList<>();
-
-        predecessors.forEach(pred -> {
-            List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
-            SpDataStreamRelayContainer relayContainer = new SpDataStreamRelayContainer();
-
-            if (pred instanceof DataProcessorInvocation){
-                //Data Processor
-                DataProcessorInvocation graph = (DataProcessorInvocation) pred;
-                if (differentDeploymentTargets(pred, target)) {
-
-                    String runningRelayId = ((DataProcessorInvocation) pred).getDeploymentRunningInstanceId();
-                    Optional<SpDataStreamRelayContainer> existingRelay = getRelayContainerById(runningRelayId);
-
-                    // only add relay if not existing - prevent from duplicate relays with same topic to same target
-                    Collection<? extends SpDataStreamRelay> foundRelays = findRelaysWithMatchingTopic(graph, target);
-
-                    if (!existingRelay.isPresent() || missingRelayToTarget(existingRelay.get(), foundRelays)) {
-                        dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
-
-                        //dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
-                        relayContainer = new SpDataStreamRelayContainer(graph);
-                        relayContainer.setOutputStreamRelays(dataStreamRelays);
-
-                        relays.add(relayContainer);
-                    }
-
-                }
-            } else if (pred instanceof SpDataStream){
-                //DataStream
-                SpDataStream stream = (SpDataStream) pred;
-                if (differentDeploymentTargets(stream, target)){
-
-                    String id = extractUniqueAdpaterId(stream.getElementId());
-                    Optional<SpDataStreamRelayContainer> existingRelay = getRelayContainerById(id);
-
-                    // only add relay if not existing - prevent from duplicate relays with same topic
-                    if(!existingRelay.isPresent()) {
-                        //There is a relay that needs to be removed
-                        dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
-                                .get(getIndex(pred.getDOM(), target))
-                                .getEventGrounding())));
-                        String relayStrategy = pipeline.getEventRelayStrategy();
-                        relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
-                    } else {
-                        // generate relays for adapter streams to remote processors
-                        List<SpDataStreamRelayContainer> generatedRelays =
-                                generateDataStreamRelays(Collections.singletonList(target));
-
-                        relays.addAll(generatedRelays);
-                    }
-                }
-            }
-        });
-        return relays;
-    }
-
-    private boolean missingRelayToTarget(SpDataStreamRelayContainer existingRelayContainer,
-                                         Collection<? extends SpDataStreamRelay> foundRelays) {
-
-        List<TransportProtocol> set = foundRelays.stream()
-                .map(SpDataStreamRelay::getEventGrounding)
-                .map(EventGrounding::getTransportProtocol)
-                .collect(Collectors.toList());
-
-        List<TransportProtocol> relay = existingRelayContainer.getOutputStreamRelays().stream()
-                .map(SpDataStreamRelay::getEventGrounding)
-                .map(EventGrounding::getTransportProtocol)
-                .collect(Collectors.toList());
-
-        boolean missing = true;
-        for (TransportProtocol tp: set) {
-            for (TransportProtocol r: relay) {
-                String targetTopic = tp.getTopicDefinition().getActualTopicName();
-                String rTopic = r.getTopicDefinition().getActualTopicName();
-                String targetHost = tp.getBrokerHostname();
-                String rHost = r.getBrokerHostname();
-
-                if (targetHost.equals(rHost) && targetTopic.equals(rTopic)) {
-                    missing = false;
-                }
-            }
-        }
-
-        return missing;
-    }
-
-    protected List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
-                                                           InvocableStreamPipesEntity target,
-                                                           PipelineGraph pipelineGraph,
-                                                           List<NamedStreamPipesEntity> foundPredecessors){
-
-        Set<InvocableStreamPipesEntity> targets = getTargetsAsSet(source, pipelineGraph,
-                InvocableStreamPipesEntity.class);
-
-        //TODO: Check if this works for all graph topologies
-        if (targets.contains(target)){
-            foundPredecessors.add(source);
-        } else {
-            List<NamedStreamPipesEntity> successors = getTargetsAsList(source, pipelineGraph,
-                    NamedStreamPipesEntity.class);
-
-            if (successors.isEmpty()) return foundPredecessors;
-            successors.forEach(successor -> getPredecessors(successor, target, pipelineGraph, foundPredecessors));
-        }
-        return foundPredecessors;
-    }
-
-    protected NamedStreamPipesEntity findMatching(NamedStreamPipesEntity entity, PipelineGraph pipelineGraph){
-        AtomicReference<NamedStreamPipesEntity> match = new AtomicReference<>();
-        List<SpDataStream> dataStreams = PipelineGraphHelpers.findStreams(pipelineGraph);
-
-        for (SpDataStream stream : dataStreams) {
-            NamedStreamPipesEntity foundEntity = compareGraphs(stream, entity, pipelineGraph, new ArrayList<>());
-            if (foundEntity != null) {
-                match.set(foundEntity);
-            }
-        }
-        return match.get();
-    }
-
-    private NamedStreamPipesEntity compareGraphs(NamedStreamPipesEntity source,
-                                                   NamedStreamPipesEntity searchedEntity,
-                                                   PipelineGraph pipelineGraph,
-                                                   List<NamedStreamPipesEntity> successors){
-        if(matchingDOM(source, searchedEntity)) {
-            return source;
-        } else if (successors.isEmpty()) {
-            successors = getTargetsAsList(source, pipelineGraph, NamedStreamPipesEntity.class);
-            Optional<NamedStreamPipesEntity> successor = successors.stream().findFirst();
-            if (successor.isPresent()) {
-                successors.remove(successor.get());
-                return compareGraphs(successor.get(), searchedEntity, pipelineGraph, successors);
-            }
-        }
-        return null;
-    }
-
-    protected List<SpDataStreamRelayContainer> generateRelays(List<InvocableStreamPipesEntity> graphs) {
-        return generateDataStreamRelays(graphs).stream()
-                .filter(r -> r.getOutputStreamRelays().size() > 0)
-                .collect(Collectors.toList());
-    }
-
-    // TODO: when using kafka as edge protocol it generates duplicate event relays -> check with mqtt as edge
-    //  protocol and fix
-    private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
-        List<SpDataStreamRelayContainer> relays = new ArrayList<>();
-
-        for (InvocableStreamPipesEntity graph : graphs) {
-            for (SpDataStream stream: pipeline.getStreams()) {
-                if (differentDeploymentTargets(stream, graph) && connected(stream, graph)) {
-
-                    List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
-                    dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(graph.getInputStreams()
-                            .get(getIndex(stream.getDOM(), graph))
-                            .getEventGrounding())));
-
-                    String id = extractUniqueAdpaterId(stream.getElementId());
-                    String relayStrategy = pipeline.getEventRelayStrategy();
-
-                    if (!relayExists(relays, id)) {
-                        relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
-                    }
-                }
-            }
-            for (DataProcessorInvocation processor : pipeline.getSepas()) {
-                if (differentDeploymentTargets(processor, graph) && connected(processor, graph)) {
-                    if (!relayExists(relays, processor.getDeploymentRunningInstanceId())) {
-//                        String previousId = processor.getDeploymentRunningInstanceId();
-//                        String modifiedId = previousId + "-" + processor.getDeploymentTargetNodeId();
-//                        processor.setDeploymentRunningInstanceId(modifiedId);
-                        SpDataStreamRelayContainer processorRelay = new SpDataStreamRelayContainer(processor);
-                        relays.add(processorRelay);
-                    }
-                }
-            }
-        }
-        return relays;
-    }
-
-
-    // Helpers
-
-    /**
-     *
-     * @param id
-     * @return
-     */
-    private Optional<SpDataStreamRelayContainer> getRelayContainerById(String id) {
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage().getRelayContainerById(id);
-    }
-
-    /**
-     * Check if relay with deploymentRunningInstanceId of predecessor already exists
-     *
-     * @param relays                        List of existing relays
-     * @param deploymentRunningInstanceId   Id to check
-     * @return boolean
-     */
-    private boolean relayExists(List<SpDataStreamRelayContainer> relays,
-                                String deploymentRunningInstanceId) {
-        return relays.stream().anyMatch(r -> r.getRunningStreamRelayInstanceId().equals(deploymentRunningInstanceId));
-    }
-
-    /**
-     * Updates group.id for data processor/sink. Note: KafkaTransportProtocol only!!
-     *
-     * @param entity    data processor/sink
-     */
-    protected void updateKafkaGroupIds(InvocableStreamPipesEntity entity) {
-        entity.getInputStreams()
-                .stream()
-                .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
-                .map(is -> is.getEventGrounding().getTransportProtocol())
-                .map(KafkaTransportProtocol.class::cast)
-                .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
-    }
-
-    /**
-     * Decrypt potential secrets contained in static properties, e.g., passwords
-     *
-     * @param graphs    List of graphs
-     * @return  List of decrypted graphs
-     */
-    protected List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-        List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
-        graphs.stream().map(g -> {
-            if (g instanceof DataProcessorInvocation) {
-                return new DataProcessorInvocation((DataProcessorInvocation) g);
-            } else {
-                return new DataSinkInvocation((DataSinkInvocation) g);
-            }
-        }).forEach(g -> {
-            g.getStaticProperties()
-                    .stream()
-                    .filter(SecretStaticProperty.class::isInstance)
-                    .forEach(sp -> {
-                        try {
-                            String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
-                                    ((SecretStaticProperty) sp).getValue());
-                            ((SecretStaticProperty) sp).setValue(decrypted);
-                            ((SecretStaticProperty) sp).setEncrypted(false);
-                        } catch (GeneralSecurityException e) {
-                            e.printStackTrace();
-                        }
-                    });
-            decryptedGraphs.add(g);
-        });
-        return decryptedGraphs;
-    }
-
-    /**
-     * Get pipeline storage dispatcher API
-     *
-     * @return IPipelineStorage NoSQL storage interface for pipelines
-     */
-    private IPipelineStorage getPipelineStorageApi() {
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
-    }
-
-    /**
-     * Get data stream relay storage dispatcher API
-     *
-     * @return INodeDataStreamRelay NoSQL storage interface for data stream relays
-     */
-    private INodeDataStreamRelay getDataStreamRelayApi() {
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage();
-    }
-
-    /**
-     * Extract topic name
-     *
-     * @param entity
-     * @return
-     */
-    private String extractActualTopic(NamedStreamPipesEntity entity) {
-        if (entity instanceof SpDataStream) {
-            return ((SpDataStream) entity)
-                    .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
-        } else if (entity instanceof SpDataStreamRelay) {
-            return ((SpDataStreamRelay) entity)
-                    .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
-        }
-        throw new SpRuntimeException("Could not extract actual topic name from entity");
-    }
-
-    // Edge / Migration Helpers
-
-    /**
-     * Compare deployment targets of two pipeline elements, namely data stream/processor (source) and data
-     * processor/sink (target)
-     *
-     * @param e1
-     * @param e2
-     * @return boolean value that returns true if source and target share the same deployment target, else false
-     */
-    private boolean differentDeploymentTargets(NamedStreamPipesEntity e1, InvocableStreamPipesEntity e2) {
-        if (e1 instanceof SpDataStream) {
-            return !((SpDataStream) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
-        } else if (e1 instanceof DataProcessorInvocation) {
-            return !((DataProcessorInvocation) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
-        } else if (e1 instanceof DataSinkInvocation) {
-            return !((DataSinkInvocation) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
-        }
-        throw new SpRuntimeException("Matching deployment targets check failed");
-    }
-
-    /**
-     * Find relays with matching topics
-     *
-     * @param graph     data processor
-     * @param target    data processor/sink
-     * @return collection of data stream relays
-     */
-    private Collection<? extends SpDataStreamRelay> findRelaysWithMatchingTopic(DataProcessorInvocation graph,
-                                                                                InvocableStreamPipesEntity target) {
-        return graph.getOutputStreamRelays().stream().
-                filter(relay ->
-                        target.getInputStreams().stream()
-                                .map(this::extractActualTopic)
-                                .collect(Collectors.toSet())
-                                .contains(extractActualTopic(relay)))
-                .collect(Collectors.toList());
-    }
-
-
-    private <T> Set<T> getTargetsAsSet(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
-                                       Class<T> clazz){
-        return pipelineGraph.outgoingEdgesOf(source)
-                .stream()
-                .map(pipelineGraph::getEdgeTarget)
-                .map(clazz::cast)
-                .collect(Collectors.toSet());
-    }
-
-    private <T> List<T> getTargetsAsList(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
-                                         Class<T> clazz){
-        return new ArrayList<>(getTargetsAsSet(source, pipelineGraph, clazz));
-    }
-
-    /**
-     * Compare connection of two pipeline elements, namely data stream/processor (source) and data processor/sink
-     * (target) by DOM identifier.
-     *
-     * @param source    data stream or data processor
-     * @param target    data processor/sink
-     * @return boolean value that returns true if source and target are connected, else false
-     */
-    private boolean connected(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
-        int index = getIndex(source.getDOM(), target);
-        if (index != -1) {
-            return target.getConnectedTo().get(index).equals(source.getDOM());
-        }
-        return false;
-    }
-
-    /**
-     * Get index of data processor/sink connection based on source DOM identifier
-     *
-     * @param sourceDomId   source DOM identifier
-     * @param target        data processor/sink
-     * @return Integer with index of connection, if invalid returns -1.
-     */
-    private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity target) {
-        return target.getConnectedTo().indexOf(sourceDomId);
-    }
-
-    /**
-     * Checks if DOM are equal
-     *
-     * @param source pipeline element
-     * @param target pipeline element
-     * @return true if DOM is the same, else false
-     */
-    private boolean matchingDOM(NamedStreamPipesEntity source, NamedStreamPipesEntity target) {
-        return source.getDOM().equals(target.getDOM());
-    }
-
-    /**
-     * Get List of InvocableStreamPipes entities, i.e., data processors/sinks from list of NameStreamPipesEntity
-     *
-     * @param graphs    List<NamedStreamPipesEntity> graphs
-     * @return
-     */
-    private List<InvocableStreamPipesEntity> getListOfInvocableStreamPipesEntity(List<NamedStreamPipesEntity> graphs) {
-        List<InvocableStreamPipesEntity> invocableEntities = new ArrayList<>();
-        graphs.stream()
-                .filter(i -> i instanceof InvocableStreamPipesEntity)
-                .forEach(i -> invocableEntities.add((InvocableStreamPipesEntity) i));
-        return invocableEntities;
-    }
-
-    /**
-     * Create pipeline operation status with pipeline id and name and set success to true
-     *
-     * @return PipelineOperationStatus
-     */
-    protected PipelineOperationStatus initPipelineOperationStatus() {
-        PipelineOperationStatus status = new PipelineOperationStatus();
-        status.setPipelineId(pipeline.getPipelineId());
-        status.setPipelineName(pipeline.getName());
-        status.setSuccess(true);
-        return status;
-    }
-
-    private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
-        return graphs
-                .stream()
-                .filter(clazz::isInstance)
-                .map(clazz::cast)
-                .collect(Collectors.toList());
-    }
-
-    private String extractUniqueAdpaterId(String s) {
-        return s.substring(s.lastIndexOf("/") + 1);
-    }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
deleted file mode 100644
index 2258cb9..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline;
-
-public class PipelineMigrationHelpers {
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
index 3e50d91..948ae4b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
@@ -17,10 +17,8 @@
  */
 package org.apache.streampipes.manager.execution.pipeline.executor;
 
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.LifecycleEntity;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.PipelineExecutionOperation;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.ReconfigurationOperation;
+import org.apache.streampipes.manager.execution.pipeline.executor.steps.EntitiesLifecycleObject;
+import org.apache.streampipes.manager.execution.pipeline.executor.steps.PipelineExecutionStep;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -48,16 +46,16 @@ public class PipelineExecutor {
 
     private PipelineElementReconfigurationEntity reconfigurationEntity;
 
-    private final LifecycleEntity<InvocableStreamPipesEntity> graphs;
+    private final EntitiesLifecycleObject<InvocableStreamPipesEntity> graphs;
 
-    private final LifecycleEntity<SpDataStreamRelayContainer> relays;
+    private final EntitiesLifecycleObject<SpDataStreamRelayContainer> relays;
 
-    private final LifecycleEntity<SpDataSet> dataSets;
+    private final EntitiesLifecycleObject<SpDataSet> dataSets;
 
 
     private PipelineOperationStatus status;
 
-    private final LinkedList<PipelineExecutionOperation> operations = new LinkedList<>();
+    private final LinkedList<PipelineExecutionStep> operations = new LinkedList<>();
 
     public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor){
         this.pipeline = pipeline;
@@ -66,19 +64,19 @@ public class PipelineExecutor {
         this.monitor = monitor;
         this.status = StatusUtils.initPipelineOperationStatus(pipeline);
 
-        this.graphs = new LifecycleEntity<>();
-        this.relays = new LifecycleEntity<>();
-        this.dataSets = new LifecycleEntity<>();
+        this.graphs = new EntitiesLifecycleObject<>();
+        this.relays = new EntitiesLifecycleObject<>();
+        this.dataSets = new EntitiesLifecycleObject<>();
     }
 
     public PipelineOperationStatus execute(){
-        for(PipelineExecutionOperation pipelineExecutionOperation: this.operations){
-            PipelineOperationStatus operationStatus = pipelineExecutionOperation.executeOperation();
+        for(PipelineExecutionStep pipelineExecutionStep : this.operations){
+            PipelineOperationStatus operationStatus = pipelineExecutionStep.executeOperation();
             StatusUtils.checkSuccess(operationStatus);
-            pipelineExecutionOperation.setStatus(operationStatus);
+            pipelineExecutionStep.setStatus(operationStatus);
             StatusUtils.updateStatus(operationStatus, this.status);
             if(!operationStatus.isSuccess()){
-                rollback(pipelineExecutionOperation);
+                rollback(pipelineExecutionStep);
                 break;
             }
         }
@@ -86,11 +84,11 @@ public class PipelineExecutor {
         return this.status;
     }
 
-    private void rollback(PipelineExecutionOperation failedOperation){
+    private void rollback(PipelineExecutionStep failedOperation){
         PipelineOperationStatus rollbackStatus = StatusUtils.initPipelineOperationStatus(pipeline);
         for(int currentOperationIndex = this.operations.indexOf(failedOperation);
-            currentOperationIndex<=0; currentOperationIndex--){
-            PipelineExecutionOperation currentOperation = this.operations.get(currentOperationIndex);
+            currentOperationIndex>=0; currentOperationIndex--){
+            PipelineExecutionStep currentOperation = this.operations.get(currentOperationIndex);
             PipelineOperationStatus rollbackOperationStatus = currentOperation.rollbackOperation();
             StatusUtils.checkSuccess(rollbackOperationStatus);
             StatusUtils.updateStatus(rollbackOperationStatus, rollbackStatus);
@@ -99,18 +97,10 @@ public class PipelineExecutor {
         StatusUtils.updateStatus(rollbackStatus, this.status);
     }
 
-    public void addOperation(PipelineExecutionOperation operation){
+    public void addStep(PipelineExecutionStep operation){
         this.operations.add(operation);
     }
 
-    public boolean containsReconfigurationOperation(){
-        return this.operations.stream().anyMatch(operation -> operation instanceof ReconfigurationOperation);
-    }
-
-    public boolean containsMigrationOperation(){
-        return this.operations.stream().anyMatch(operation -> operation instanceof MigrationOperation);
-    }
-
 
     //Getter and Setter
 
@@ -178,15 +168,15 @@ public class PipelineExecutor {
         this.reconfigurationEntity = reconfigurationEntity;
     }
 
-    public LifecycleEntity<InvocableStreamPipesEntity> getGraphs() {
+    public EntitiesLifecycleObject<InvocableStreamPipesEntity> getGraphs() {
         return graphs;
     }
 
-    public LifecycleEntity<SpDataStreamRelayContainer> getRelays() {
+    public EntitiesLifecycleObject<SpDataStreamRelayContainer> getRelays() {
         return relays;
     }
 
-    public LifecycleEntity<SpDataSet> getDataSets() {
+    public EntitiesLifecycleObject<SpDataSet> getDataSets() {
         return dataSets;
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
index d423cf1..250cde7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
@@ -17,8 +17,7 @@
  */
 package org.apache.streampipes.manager.execution.pipeline.executor;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.*;
+import org.apache.streampipes.manager.execution.pipeline.executor.steps.*;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
 import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
@@ -27,10 +26,6 @@ public class PipelineExecutorBuilder {
 
     private PipelineExecutor pipelineExecutor;
 
-    private boolean reconfigurationParametersSet = false;
-
-    private boolean migrationParametersSet = false;
-
     public static PipelineExecutorBuilder getBuilder(){
         return new PipelineExecutorBuilder();
     }
@@ -46,7 +41,6 @@ public class PipelineExecutorBuilder {
                                                           PipelineElementMigrationEntity migrationEntity){
         pipelineExecutor.setSecondaryPipeline(pipelineBeforeMigration);
         pipelineExecutor.setMigrationEntity(migrationEntity);
-        this.migrationParametersSet = true;
         return this;
     }
 
@@ -54,76 +48,70 @@ public class PipelineExecutorBuilder {
                                                                 PipelineElementReconfigurationEntity reconfigurationEntity){
         pipelineExecutor.setSecondaryPipeline(reconfiguredPipeline);
         pipelineExecutor.setReconfigurationEntity(reconfigurationEntity);
-        this.reconfigurationParametersSet = true;
         return this;
     }
 
-    public PipelineExecutorBuilder addPrepareMigrationOperation(){
-        pipelineExecutor.addOperation(new PrepareMigrationOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addPrepareMigrationStep(){
+        pipelineExecutor.addStep(new PrepareMigrationStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addGetStateOperation(){
-        pipelineExecutor.addOperation(new GetStateOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addGetStateStep(){
+        pipelineExecutor.addStep(new GetStateStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStartRelaysOperation(){
-        pipelineExecutor.addOperation(new StartRelaysOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStartRelaysStep(){
+        pipelineExecutor.addStep(new StartRelaysStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStartGraphsAndAssociatedRelaysOperation(){
-        pipelineExecutor.addOperation(new StartGraphsAndAssociatedRelaysOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStartGraphsAndAssociatedRelaysStep(){
+        pipelineExecutor.addStep(new StartGraphsAndAssociatedRelaysStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStopGraphsAndAssociatedRelaysOperation(){
-        pipelineExecutor.addOperation(new StopGraphsAndAssociatedRelaysOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStopGraphsAndAssociatedRelaysStep(){
+        pipelineExecutor.addStep(new StopGraphsAndAssociatedRelaysStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStopRelaysOperation(){
-        pipelineExecutor.addOperation(new StopRelaysOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStopRelaysStep(){
+        pipelineExecutor.addStep(new StopRelaysStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStoreMigratedPipelineOperation(){
-        pipelineExecutor.addOperation(new StoreMigratedPipelineOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStoreMigratedPipelineStep(){
+        pipelineExecutor.addStep(new StoreMigratedPipelineStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addReconfigureElementOperation(){
-        pipelineExecutor.addOperation(new ReconfigureElementOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addReconfigureElementStep(){
+        pipelineExecutor.addStep(new ReconfigureElementStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addPreparePipelineStartOperation(){
-        pipelineExecutor.addOperation(new PrepareStartPipelineOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addPreparePipelineStartStep(){
+        pipelineExecutor.addStep(new PrepareStartPipelineStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStartPipelineOperation(){
-        pipelineExecutor.addOperation(new StartPipelineOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStartPipelineStep(){
+        pipelineExecutor.addStep(new StartPipelineStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStorePipelineOperation(){
-        pipelineExecutor.addOperation(new StorePipelineOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStorePipelineStep(){
+        pipelineExecutor.addStep(new StorePipelineStep(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStopPipelineOperation(){
-        pipelineExecutor.addOperation(new StopPipelineOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStopPipelineStep(){
+        pipelineExecutor.addStep(new StopPipelineStep(pipelineExecutor));
         return this;
     }
 
     public PipelineExecutor buildPipelineExecutor(){
-        //Is this check needed? Only relevant for core development not for users but gives a little more clarity at the
-        //cost of introducing some new boolean flags and marker interfaces
-        if((this.pipelineExecutor.containsMigrationOperation() && !this.migrationParametersSet)
-                || (this.pipelineExecutor.containsReconfigurationOperation() && !this.reconfigurationParametersSet))
-            throw new SpRuntimeException("PipelineExecutor can't be build since the required parameters have not been set");
-        return this.pipelineExecutor;    }
-
+        return this.pipelineExecutor;
+    }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
index 617ff01..e4b042b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
@@ -29,14 +29,14 @@ public class PipelineExecutorFactory {
         PipelineExecutorBuilder builder = PipelineExecutorBuilder.getBuilder()
                 .initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
                 .setMigrationParameters(pipelineBeforeMigration, migrationEntity)
-                .addPrepareMigrationOperation();
+                .addPrepareMigrationStep();
         if(migrationEntity.getTargetElement().isStateful())
-            builder.addGetStateOperation();
-        builder.addStartGraphsAndAssociatedRelaysOperation()
-                .addStopRelaysOperation()
-                .addStartRelaysOperation()
-                .addStopGraphsAndAssociatedRelaysOperation()
-                .addStoreMigratedPipelineOperation();
+            builder.addGetStateStep();
+        builder.addStartGraphsAndAssociatedRelaysStep()
+                .addStopRelaysStep()
+                .addStartRelaysStep()
+                .addStopGraphsAndAssociatedRelaysStep()
+                .addStoreMigratedPipelineStep();
         return builder.buildPipelineExecutor();
     }
 
@@ -44,23 +44,23 @@ public class PipelineExecutorFactory {
                                                                  boolean monitor, Pipeline reconfiguredPipeline,
                                                                  PipelineElementReconfigurationEntity reconfigurationEntity){
         return PipelineExecutorBuilder.getBuilder().initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
-                .setReconfigurationParameters(reconfiguredPipeline, reconfigurationEntity).addReconfigureElementOperation()
+                .setReconfigurationParameters(reconfiguredPipeline, reconfigurationEntity).addReconfigureElementStep()
                 .buildPipelineExecutor();
     }
 
     public static PipelineExecutor createInvocationExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
                                                             boolean monitor){
         return PipelineExecutorBuilder.getBuilder().initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
-                .addPreparePipelineStartOperation()
-                .addStartPipelineOperation()
-                .addStorePipelineOperation()
+                .addPreparePipelineStartStep()
+                .addStartPipelineStep()
+                .addStorePipelineStep()
                 .buildPipelineExecutor();
     }
 
     public static PipelineExecutor createDetachExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
                                                         boolean monitor){
         return PipelineExecutorBuilder.getBuilder().initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
-                .addStopPipelineOperation().buildPipelineExecutor();
+                .addStopPipelineStep().buildPipelineExecutor();
     }
 
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/MigrationOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/MigrationOperation.java
deleted file mode 100644
index 22c21d1..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/MigrationOperation.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations.types;
-
-public interface MigrationOperation {
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/ReconfigurationOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/ReconfigurationOperation.java
deleted file mode 100644
index 13c67a0..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/ReconfigurationOperation.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations.types;
-
-public interface ReconfigurationOperation {
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/EntitiesLifecycleObject.java
similarity index 94%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/EntitiesLifecycleObject.java
index d2c6d8c..5a2ca91 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/EntitiesLifecycleObject.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class LifecycleEntity<T> {
+public class EntitiesLifecycleObject<T> {
 
     private final List<T> entitiesToStart;
 
@@ -30,7 +30,7 @@ public class LifecycleEntity<T> {
 
     private final List<T> entitiesToDelete;
 
-    public LifecycleEntity(){
+    public EntitiesLifecycleObject(){
         entitiesToStart = new ArrayList<>();
         entitiesToStop = new ArrayList<>();
         entitiesToStore = new ArrayList<>();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/GetStateStep.java
similarity index 93%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/GetStateStep.java
index e11609b..1fc19f8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/GetStateStep.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
@@ -29,9 +28,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 
-public class GetStateOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class GetStateStep extends PipelineExecutionStep {
 
-    public GetStateOperation(PipelineExecutor pipelineExecutor) {
+    public GetStateStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PipelineExecutionStep.java
similarity index 90%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PipelineExecutionStep.java
index e822ec4..098de93 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PipelineExecutionStep.java
@@ -15,25 +15,26 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
-public abstract class PipelineExecutionOperation {
+public abstract class PipelineExecutionStep {
 
     protected final PipelineExecutor pipelineExecutor;
 
     private PipelineOperationStatus status;
 
-    public PipelineExecutionOperation(PipelineExecutor pipelineExecutor){
+    public PipelineExecutionStep(PipelineExecutor pipelineExecutor){
         this.pipelineExecutor = pipelineExecutor;
         this.status = StatusUtils.initPipelineOperationStatus(this.pipelineExecutor.getPipeline());
     }
 
     public abstract PipelineOperationStatus executeOperation();
 
+    //TODO: Check if partial and full rollback can be unified to single method
     public abstract PipelineOperationStatus rollbackOperationPartially();
 
     public abstract PipelineOperationStatus rollbackOperationFully();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareMigrationStep.java
similarity index 95%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareMigrationStep.java
index 4c9304d..4c45b15 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareMigrationStep.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphBuilder;
 import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
@@ -37,13 +36,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class PrepareMigrationOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class PrepareMigrationStep extends PipelineExecutionStep {
 
     private List<NamedStreamPipesEntity> predecessorsAfterMigration;
     private final List<NamedStreamPipesEntity> predecessorsBeforeMigration = new ArrayList<>();
     private Pipeline pipeline;
 
-    public PrepareMigrationOperation(PipelineExecutor pipelineExecutor) {
+    public PrepareMigrationStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareStartPipelineStep.java
similarity index 96%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareStartPipelineStep.java
index 264ac19..ca9dc66 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareStartPipelineStep.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
@@ -35,10 +35,10 @@ import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-public class PrepareStartPipelineOperation extends PipelineExecutionOperation {
+public class PrepareStartPipelineStep extends PipelineExecutionStep {
 
 
-    public PrepareStartPipelineOperation(PipelineExecutor pipelineExecutor) {
+    public PrepareStartPipelineStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/ReconfigureElementStep.java
similarity index 92%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/ReconfigureElementStep.java
index 2e66272..2dddf83 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/ReconfigureElementStep.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.manager.execution.http.ReconfigurationSubmitter;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.ReconfigurationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.pipeline.Pipeline;
@@ -31,9 +30,9 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-public class ReconfigureElementOperation extends PipelineExecutionOperation implements ReconfigurationOperation {
+public class ReconfigureElementStep extends PipelineExecutionStep {
 
-    public ReconfigureElementOperation(PipelineExecutor pipelineExecutor) {
+    public ReconfigureElementStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartGraphsAndAssociatedRelaysStep.java
similarity index 91%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartGraphsAndAssociatedRelaysStep.java
index 72d3802..ae976e3 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartGraphsAndAssociatedRelaysStep.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.logging.evaluation.EvaluationLogger;
 import org.apache.streampipes.manager.execution.pipeline.executor.*;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.*;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
@@ -28,9 +27,9 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import java.util.List;
 import java.util.Set;
 
-public class StartGraphsAndAssociatedRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StartGraphsAndAssociatedRelaysStep extends PipelineExecutionStep {
 
-    public StartGraphsAndAssociatedRelaysOperation(PipelineExecutor pipelineExecutor){
+    public StartGraphsAndAssociatedRelaysStep(PipelineExecutor pipelineExecutor){
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartPipelineStep.java
similarity index 96%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartPipelineStep.java
index 720d8c5..19d465d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartPipelineStep.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.manager.execution.http.GraphSubmitter;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
@@ -33,9 +33,9 @@ import java.util.List;
 import java.util.Set;
 
 
-public class StartPipelineOperation extends PipelineExecutionOperation{
+public class StartPipelineStep extends PipelineExecutionStep {
 
-    public StartPipelineOperation(PipelineExecutor pipelineExecutor) {
+    public StartPipelineStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartRelaysStep.java
similarity index 90%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartRelaysStep.java
index 07c6480..07dc88f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartRelaysStep.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
@@ -29,10 +28,10 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import java.util.List;
 import java.util.Set;
 
-public class StartRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StartRelaysStep extends PipelineExecutionStep {
 
 
-    public StartRelaysOperation(PipelineExecutor pipelineExecutor) {
+    public StartRelaysStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopGraphsAndAssociatedRelaysStep.java
similarity index 92%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopGraphsAndAssociatedRelaysStep.java
index 2e1a0d0..810f374 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopGraphsAndAssociatedRelaysStep.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
@@ -31,9 +30,9 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import java.util.List;
 import java.util.Set;
 
-public class StopGraphsAndAssociatedRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StopGraphsAndAssociatedRelaysStep extends PipelineExecutionStep {
 
-    public StopGraphsAndAssociatedRelaysOperation(PipelineExecutor pipelineExecutor) {
+    public StopGraphsAndAssociatedRelaysStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopPipelineStep.java
similarity index 84%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopPipelineStep.java
index 1f28247..473b039 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopPipelineStep.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.manager.execution.http.GraphSubmitter;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
@@ -33,9 +33,9 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import java.util.List;
 import java.util.Set;
 
-public class StopPipelineOperation extends PipelineExecutionOperation {
+public class StopPipelineStep extends PipelineExecutionStep {
 
-    public StopPipelineOperation(PipelineExecutor pipelineExecutor) {
+    public StopPipelineStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
@@ -70,18 +70,16 @@ public class StopPipelineOperation extends PipelineExecutionOperation {
 
         Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
 
+        List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
+        List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
+        List<SpDataStreamRelayContainer> relays = PipelineElementUtils.generateRelays(graphs, pipeline);
+
         List<InvocableStreamPipesEntity> graphsToRollBack =
-                PipelineElementUtils.filterPipelineElementsById(
-                        pipelineExecutor.getGraphs().getEntitiesToStop(),
-                        idsToRollback);
+                PipelineElementUtils.filterPipelineElementsById(graphs, idsToRollback);
         List<SpDataSet> dataSetsToRollBack =
-                DataSetUtils.filterDataSetsById(
-                        pipelineExecutor.getDataSets().getEntitiesToStart(),
-                        idsToRollback);
+                DataSetUtils.filterDataSetsById(dataSets, idsToRollback);
         List<SpDataStreamRelayContainer> relaysToRollBack =
-                RelayUtils.filterRelaysById(
-                        pipelineExecutor.getRelays().getEntitiesToStart(),
-                        idsToRollback);
+                RelayUtils.filterRelaysById(relays, idsToRollback);
 
         return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
                 graphsToRollBack,
@@ -91,7 +89,6 @@ public class StopPipelineOperation extends PipelineExecutionOperation {
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        //TODO: Implement sth?
-        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
+        return rollbackOperationPartially();
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopRelaysStep.java
similarity index 90%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopRelaysStep.java
index dee1933..cc34c52 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopRelaysStep.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.logging.evaluation.EvaluationLogger;
 import org.apache.streampipes.manager.execution.pipeline.executor.*;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
@@ -29,9 +28,9 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import java.util.List;
 import java.util.Set;
 
-public class StopRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StopRelaysStep extends PipelineExecutionStep {
 
-    public StopRelaysOperation(PipelineExecutor pipelineExecutor){
+    public StopRelaysStep(PipelineExecutor pipelineExecutor){
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StoreMigratedPipelineStep.java
similarity index 93%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StoreMigratedPipelineStep.java
index 895e38d..a300683 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StoreMigratedPipelineStep.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineUtils;
@@ -26,9 +26,9 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
 
-public class StoreMigratedPipelineOperation extends PipelineExecutionOperation{
+public class StoreMigratedPipelineStep extends PipelineExecutionStep {
 
-    public StoreMigratedPipelineOperation(PipelineExecutor pipelineExecutor) {
+    public StoreMigratedPipelineStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StorePipelineStep.java
similarity index 95%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StorePipelineStep.java
index 960aec0..98def31 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StorePipelineStep.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
 
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
@@ -26,9 +26,9 @@ import org.apache.streampipes.model.message.PipelineStatusMessageType;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
-public class StorePipelineOperation extends PipelineExecutionOperation{
+public class StorePipelineStep extends PipelineExecutionStep {
 
-    public StorePipelineOperation(PipelineExecutor pipelineExecutor) {
+    public StorePipelineStep(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
index ac0b210..7d8fae8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
@@ -237,7 +237,7 @@ public class RelayUtils {
     public static List<SpDataStreamRelayContainer> filterRelaysById(List<SpDataStreamRelayContainer> relays,
                                                                     Set<String> relayIds) {
         return relays.stream().
-                filter(relay -> relayIds.contains(relay.getRunningStreamRelayInstanceId()))
+                filter(relay -> relayIds.contains(relay.getRunningStreamRelayInstanceId() + " relay"))
                 .collect(Collectors.toList());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
index c4d8a4f..381bebf 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
@@ -50,7 +50,7 @@ public class StatusUtils {
     public static Set<String> extractUniqueSuccessfulIds(PipelineOperationStatus status) {
         return status.getElementStatus().stream()
                 .filter(PipelineElementStatus::isSuccess)
-                .map(PipelineElementStatus::getElementId)
+                .map(PipelineElementStatus::getRunningInstanceId)
                 .collect(Collectors.toSet());
     }
 
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
index eb34d39..46af116 100644
--- a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
@@ -170,26 +170,29 @@ export class MigratePipelineProcessorsComponent implements OnInit {
 
   modifyPipelineElementsDeployments(pipelineElements) {
     pipelineElements.forEach(p => {
-      let selectedTargetNodeId = p.deploymentTargetNodeId
+      const selectedTargetNodeId = p.deploymentTargetNodeId
 
       // Currently relay only for data processors
       if (p instanceof DataProcessorInvocation) {
         p.eventRelayStrategy = this.selectedRelayStrategyVal;
       }
 
-      if(selectedTargetNodeId != "default") {
-        let selectedNode = this.edgeNodes
-            .filter(node => node.nodeControllerId === selectedTargetNodeId)
+      if (selectedTargetNodeId !== 'default') {
+        const selectedNode = this.edgeNodes
+            .filter(node => node.nodeControllerId === selectedTargetNodeId);
 
         p.deploymentTargetNodeHostname = selectedNode
-            .map(node => node.hostname)[0]
+            .map(node => node.hostname)[0];
 
         p.deploymentTargetNodePort = selectedNode
-            .map(node => node.port)[0]
-      }
-      else {
-        p.deploymentTargetNodeHostname = null
-        p.deploymentTargetNodePort = null
+            .map(node => node.port)[0];
+
+        p.elementEndpointHostname = selectedNode
+            .map(node => node.hostname)[0];
+      } else {
+        p.deploymentTargetNodeHostname = null;
+        p.deploymentTargetNodePort = null;
+        p.elementEndpointHostname = null;
       }
     })
   }