You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/31 16:07:17 UTC

[incubator-streampipes] 02/03: [WIP] refactor pipeline executor

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 47c6a8006f23a1a9b3091dacad743dd832e5b205
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Dec 13 19:42:45 2021 +0100

    [WIP] refactor pipeline executor
---
 .../PipelineElementReconfigurationEntity.java      |  10 +
 .../PipelineElementReconfigurationExecutor.java    |  41 --
 .../execution/pipeline/PipelineExecutor.java       | 110 -----
 .../pipeline/PipelineMigrationExecutor.java        | 461 ---------------------
 .../pipeline/executor/PipelineExecutor.java        | 139 ++-----
 .../pipeline/executor/PipelineExecutorBuilder.java |  21 +-
 .../pipeline/executor/PipelineExecutorFactory.java |  13 +-
 .../executor/operations/GetStateOperation.java     |  18 +-
 .../executor/operations/LifecycleEntity.java       |  17 +
 .../operations/PipelineExecutionOperation.java     |   6 +-
 .../operations/PrepareMigrationOperation.java      |  73 +++-
 ...ion.java => PrepareStartPipelineOperation.java} |  27 +-
 .../operations/ReconfigureElementOperation.java    |  46 +-
 ...> StartGraphsAndAssociatedRelaysOperation.java} |  52 +--
 .../operations/StartPipelineOperation.java         |  88 ++--
 ...orsOperation.java => StartRelaysOperation.java} |  36 +-
 ...=> StopGraphsAndAssociatedRelaysOperation.java} |  38 +-
 .../executor/operations/StopPipelineOperation.java |  36 +-
 ...ssorOperation.java => StopRelaysOperation.java} |  32 +-
 .../operations/StoreMigratedPipelineOperation.java |  21 +-
 .../operations/StorePipelineOperation.java         |  16 +-
 .../pipeline/executor/utils/DataSetUtils.java      |  20 +
 .../executor/utils/PipelineElementUtils.java       |   7 +
 .../pipeline/executor/utils/RelayUtils.java        |   7 +
 .../pipeline/executor/utils/StatusUtils.java       |   2 +-
 .../migration/PipelineElementMigrationHandler.java |   3 -
 .../PipelineElementReconfigurationHandler.java     |  10 +-
 27 files changed, 384 insertions(+), 966 deletions(-)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementReconfigurationEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementReconfigurationEntity.java
index e6efba8..ba621a3 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementReconfigurationEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementReconfigurationEntity.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.model.pipeline;
 
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 
 import java.util.ArrayList;
@@ -47,6 +48,15 @@ public class PipelineElementReconfigurationEntity {
         this.reconfiguredStaticProperties = reconfiguredStaticProperties;
     }
 
+    public PipelineElementReconfigurationEntity(InvocableStreamPipesEntity entity){
+        this.deploymentRunningInstanceId = entity.getDeploymentRunningInstanceId();
+        this.pipelineElementName = entity.getName();
+        this.deploymentTargetNodeId = entity.getDeploymentTargetNodeId();
+        this.deploymentTargetNodeHostname = entity.getDeploymentTargetNodeHostname();
+        this.deploymentTargetNodePort = entity.getDeploymentTargetNodePort();
+        this.reconfiguredStaticProperties = new ArrayList<>();
+    }
+
     public String getDeploymentRunningInstanceId() {
         return deploymentRunningInstanceId;
     }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineElementReconfigurationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineElementReconfigurationExecutor.java
deleted file mode 100644
index dc61206..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineElementReconfigurationExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline;
-
-import org.apache.streampipes.manager.execution.http.ReconfigurationSubmitter;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-
-public class PipelineElementReconfigurationExecutor {
-
-    private final Pipeline reconfiguredPipeline;
-    private final PipelineElementReconfigurationEntity reconfigurationEntity;
-
-    public PipelineElementReconfigurationExecutor(Pipeline reconfiguredPipeline,
-                                                  PipelineElementReconfigurationEntity reconfigurationEntity) {
-        this.reconfiguredPipeline = reconfiguredPipeline;
-        this.reconfigurationEntity = reconfigurationEntity;
-    }
-
-    public PipelineOperationStatus reconfigurePipelineElement() {
-        return new ReconfigurationSubmitter(reconfiguredPipeline.getPipelineId(), reconfiguredPipeline.getName(),
-                reconfigurationEntity).reconfigure();
-    }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
deleted file mode 100644
index 17aa1ce..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.pipeline;
-
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class PipelineExecutor extends AbstractPipelineExecutor {
-
-    public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
-        super(pipeline, visualize, storeStatus, monitor);
-    }
-
-    public PipelineOperationStatus startPipeline() {
-
-        pipeline.getSepas().forEach(this::updateKafkaGroupIds);
-        pipeline.getActions().forEach(this::updateKafkaGroupIds);
-
-        List<DataProcessorInvocation> sepas = pipeline.getSepas();
-        List<DataSinkInvocation> secs = pipeline.getActions();
-
-        List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
-                SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-
-        for (SpDataSet ds : dataSets) {
-          ds.setCorrespondingPipeline(pipeline.getPipelineId());
-        }
-
-        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-        graphs.addAll(sepas);
-        graphs.addAll(secs);
-
-        List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
-
-        graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
-
-        List<SpDataStreamRelayContainer> relays = generateRelays(decryptedGraphs);
-
-        PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
-                decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
-
-        if (status.isSuccess()) {
-            storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-            storeDataStreamRelayContainer(relays);
-
-            PipelineStatusManager.addPipelineStatus(
-                  pipeline.getPipelineId(),
-                  new PipelineStatusMessage(pipeline.getPipelineId(),
-                          System.currentTimeMillis(),
-                          PipelineStatusMessageType.PIPELINE_STARTED.title(),
-                          PipelineStatusMessageType.PIPELINE_STARTED.description()));
-
-          if (storeStatus) setPipelineStarted(pipeline);
-        }
-        return status;
-    }
-
-    public PipelineOperationStatus stopPipeline() {
-        List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
-        List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
-        List<SpDataStreamRelayContainer> relays = generateRelays(graphs);
-
-        PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), graphs,
-                dataSets, relays).detachPipelineElementsAndRelays();
-
-        if (status.isSuccess()) {
-            if (visualize) deleteVisualization(pipeline.getPipelineId());
-            if (storeStatus) setPipelineStopped(pipeline);
-
-            deleteDataStreamRelayContainer(relays);
-
-            PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
-                  new PipelineStatusMessage(pipeline.getPipelineId(),
-                          System.currentTimeMillis(),
-                          PipelineStatusMessageType.PIPELINE_STOPPED.title(),
-                          PipelineStatusMessageType.PIPELINE_STOPPED.description()));
-        }
-        return status;
-    }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
deleted file mode 100644
index 7115d40..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline;
-
-import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
-import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
-
-    /**
-     * Old pipeline before migration
-     */
-    private final Pipeline pipelineBeforeMigration;
-
-    /**
-     * Pipeline element to be migrated. Contains pair of source and target element description
-     */
-    private final PipelineElementMigrationEntity migrationEntity;
-
-    /**
-     * Predecessors of the pipeline element to be migrated in the migration pipeline
-     */
-    private final List<NamedStreamPipesEntity> predecessorsAfterMigration;
-
-    /**
-     * Predecessors of the pipeline element to be migrated in the old pipeline before migration
-     */
-    private final List<NamedStreamPipesEntity> predecessorsBeforeMigration;
-
-    /**
-     * Collection of relays that were created in the migration process needing to be stored
-     */
-    private final List<SpDataStreamRelayContainer> relaysToBePersisted;
-
-    /**
-     * Collection of relays that were removed in the migration process needing to be deleted from persistent storage.
-     */
-    private final List<SpDataStreamRelayContainer> relaysToBeDeleted;
-
-
-    public PipelineMigrationExecutor(Pipeline pipeline, Pipeline pipelineBeforeMigration,
-                                     PipelineElementMigrationEntity migrationEntity,
-                                     boolean visualize, boolean storeStatus, boolean monitor) {
-        super(pipeline, visualize, storeStatus, monitor);
-        this.pipelineBeforeMigration = pipelineBeforeMigration;
-        this.migrationEntity = migrationEntity;
-        this.predecessorsAfterMigration = new ArrayList<>();
-        this.predecessorsBeforeMigration = new ArrayList<>();
-        this.relaysToBePersisted = new ArrayList<>();
-        this.relaysToBeDeleted = new ArrayList<>();
-    }
-
-    public PipelineOperationStatus migratePipelineElement() {
-        if(this.migrationEntity.getTargetElement().isStateful())
-            return migrateStatefulPipelineElement();
-        return migrateStatelessPipelineElement();
-    }
-
-    public PipelineOperationStatus migrateStatelessPipelineElement() {
-
-        PipelineOperationStatus status = initPipelineOperationStatus();
-
-        // 1. start new element
-        // 2. stop relay to origin element
-        // 3. start relay to new element
-        // 4. stop origin element
-        // 5. stop origin relay
-
-        prepareMigration();
-        //TODO: Remove logging after evaluation
-        EvaluationLogger logger = EvaluationLogger.getInstance();
-
-        // Start target pipeline elements and relays on new target node
-        long before_start_target = System.nanoTime();
-        PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
-        if(!statusStartTarget.isSuccess()){
-            //Target PE could not be started; nothing to roll back
-            return status;
-        }
-        long start_target_duration = System.nanoTime() - before_start_target;
-        logger.logMQTT("Migration", "start target element","",start_target_duration,start_target_duration/1000000000.0);
-
-        // Stop relays from origin predecessor
-        long downtime_beginning = System.nanoTime();
-        PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
-        if(!statusStopRelays.isSuccess()){
-            rollbackToPreMigrationStepOne(statusStopRelays, status);
-            return status;
-        }
-        long stop_relays_origin_duration = System.nanoTime() - downtime_beginning;
-        logger.logMQTT("Migration", "stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0);
-
-        // Start relays to target after migration
-        long before_start_relay_target = System.nanoTime();
-        PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
-        if(!statusStartRelays.isSuccess()){
-            rollbackToPreMigrationStepTwo(statusStartRelays, status);
-            return status;
-        }
-        long start_relay_target_duration = System.nanoTime() - before_start_relay_target;
-        logger.logMQTT("Migration", "start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0);
-
-        long downtime = System.nanoTime() - downtime_beginning;
-        logger.logMQTT("Migration", "downtime", "", downtime, downtime/1000000000.0);
-
-        //Stop origin and associated relay
-        long before_stop_origin = System.nanoTime();
-        PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
-        if(!statusStopOrigin.isSuccess()){
-            rollbackToPreMigrationStepThree(status);
-            return status;
-        }
-        long stop_origin_duration = System.nanoTime() - before_stop_origin;
-        logger.logMQTT("Migration", "stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0);
-
-        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-        graphs.addAll(pipeline.getActions());
-        graphs.addAll(pipeline.getSepas());
-
-        List<SpDataSet> dataSets = findDataSets();
-
-        // store new pipeline and relays
-        storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-        deleteDataStreamRelayContainer(relaysToBeDeleted);
-        storeDataStreamRelayContainer(relaysToBePersisted);
-
-        // set global status
-        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
-        return status;
-    }
-
-    public PipelineOperationStatus migrateStatefulPipelineElement() {
-        //TODO: Assess how this 'dirty' implementation/procedure can be improved
-
-        PipelineOperationStatus status = initPipelineOperationStatus();
-
-        // 1. start new element
-        // 2. stop relay to origin element
-        // 3. start relay to new element
-        // 4. stop origin element
-        // 5. stop origin relay
-
-        prepareMigration();
-
-        //Try to get state of origin element (success not checked)
-        PipelineElementStatus statusGettingState = getState(this.migrationEntity.getSourceElement());
-        if(!statusGettingState.isSuccess()){
-            //status.addPipelineElementStatus(statusGettingState);
-            status.setSuccess(false);
-            return status;
-        }
-        String currentState = statusGettingState.getOptionalMessage();
-        // Start target pipeline elements and relays on new target node
-        PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
-        if(!statusStartTarget.isSuccess()){
-            //Target PE could not be started; nothing to roll back
-            return status;
-        }
-
-        //Set state of the newly invoked Pipeline Element (success not checked)
-         PipelineElementStatus statusSettingState = setState(this.migrationEntity.getTargetElement(), currentState);
-
-        if(!statusSettingState.isSuccess()){
-            //status.addPipelineElementStatus(statusSettingState);
-            status.setSuccess(false);
-            rollbackToPreMigrationStepOne(new PipelineOperationStatus(), status);
-            return status;
-        }
-
-        // Stop relays from origin predecessor
-        PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
-        if(!statusStopRelays.isSuccess()){
-            rollbackToPreMigrationStepOne(statusStopRelays, status);
-            return status;
-        }
-
-        // Start relays to target after migration
-        PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
-        if(!statusStartRelays.isSuccess()){
-            rollbackToPreMigrationStepTwo(statusStartRelays, status);
-            return status;
-        }
-
-        //Stop origin and associated relay
-        PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
-        if(!statusStopOrigin.isSuccess()){
-            rollbackToPreMigrationStepThree(status);
-            return status;
-        }
-
-        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-        graphs.addAll(pipeline.getActions());
-        graphs.addAll(pipeline.getSepas());
-
-        List<SpDataSet> dataSets = findDataSets();
-
-        // store new pipeline and relays
-        storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-        deleteDataStreamRelayContainer(relaysToBeDeleted);
-        storeDataStreamRelayContainer(relaysToBePersisted);
-
-        // set global status
-        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
-        return status;
-    }
-
-    private void prepareMigration() {
-        //Purge existing relays
-        purgeExistingRelays();
-
-        //Generate new relays
-        PipelineGraph pipelineGraphAfterMigration = new PipelineGraphBuilder(pipeline).buildGraph();
-        buildGraphWithRelays(pipelineGraphAfterMigration);
-
-        //Get predecessors
-        PipelineGraph pipelineGraphBeforeMigration = new PipelineGraphBuilder(pipelineBeforeMigration).buildGraph();
-        findPredecessorsInMigrationPipeline(pipelineGraphAfterMigration);
-
-        //Find counterpart for predecessors in currentPipeline
-        findAndComparePredecessorsInCurrentPipeline(pipelineGraphBeforeMigration);
-    }
-
-    private void rollbackToPreMigrationStepOne(PipelineOperationStatus statusStopRelays,
-                                               PipelineOperationStatus status) {
-        // Stop target pipeline element and relays on new target node
-        PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
-
-        // Restart relays before migration attempt
-        // extract unique running instance ids of original relays
-        Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStopRelays);
-        PipelineOperationStatus statusRelaysRollback = rollbackToOriginRelaysByInvoke(relayIdsToRollback);
-
-        // Add status to global migration status
-        updateMigrationStatus(statusTargetRollback, status);
-        updateMigrationStatus(statusRelaysRollback, status);
-    }
-
-    private void rollbackToPreMigrationStepTwo(PipelineOperationStatus statusStartRelays,
-                                               PipelineOperationStatus status) {
-        //Rollback target PE, stopped relays and all successfully started relays
-        PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
-
-        // Restart relays to origin from predecessors before migration
-        PipelineOperationStatus statusRelaysInvokeRollback = startRelays(findRelays(predecessorsBeforeMigration,
-                migrationEntity.getSourceElement()));
-
-        // Stop relays that were started due to migration attempt
-        // extract unique running instance ids of original relays
-        Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStartRelays);
-        PipelineOperationStatus statusRelaysDetachRollback = rollbackTargetRelaysByDetach(relayIdsToRollback);
-
-        // Add status to global migration status
-        updateMigrationStatus(statusTargetRollback, status);
-        updateMigrationStatus(statusRelaysInvokeRollback, status);
-        updateMigrationStatus(statusRelaysDetachRollback, status);
-    }
-
-    private void rollbackToPreMigrationStepThree(PipelineOperationStatus status) {
-        //Rollback everything
-        PipelineOperationStatus statusStopRelays = stopRelays(findRelays(predecessorsAfterMigration,
-                migrationEntity.getTargetElement()));
-
-        PipelineOperationStatus statusStartRelays = startRelays(findRelays(predecessorsBeforeMigration,
-                migrationEntity.getSourceElement()));
-
-        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
-        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
-
-        PipelineOperationStatus statusStopTargetAndRelays = stopPipelineElementsAndRelays(graphs, relays);
-
-        // Add status to global migration status
-        updateMigrationStatus(statusStopRelays, status);
-        updateMigrationStatus(statusStartRelays, status);
-        updateMigrationStatus(statusStopTargetAndRelays, status);
-    }
-
-
-    private PipelineOperationStatus rollbackToOriginRelaysByInvoke(Set<String> relayIdsToRollback) {
-        List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
-                predecessorsBeforeMigration, migrationEntity.getSourceElement());
-
-        return startRelays(rollbackRelays);
-    }
-
-    private PipelineOperationStatus rollbackTargetRelaysByDetach(Set<String> relayIdsToRollback) {
-        List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
-                predecessorsAfterMigration, migrationEntity.getTargetElement());
-
-        return stopRelays(rollbackRelays);
-    }
-
-    private PipelineOperationStatus rollbackTargetPipelineElementsAndRelays() {
-        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
-        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
-
-        return new GraphSubmitter(pipeline.getPipelineId(),
-                pipeline.getName(), graphs, new ArrayList<>(), relays).detachPipelineElementsAndRelays();
-    }
-
-    private PipelineOperationStatus startRelaysFromPredecessorsAfterMigration(PipelineOperationStatus status) {
-        List<SpDataStreamRelayContainer> relays = findRelays(predecessorsAfterMigration,
-                migrationEntity.getTargetElement());
-
-        updateRelaysToBePersisted(relays);
-
-        PipelineOperationStatus statusStartRelays = startRelays(relays);
-        updateMigrationStatus(statusStartRelays, status);
-
-        return statusStartRelays;
-    }
-
-    private PipelineOperationStatus stopRelaysFromPredecessorsBeforeMigration(PipelineOperationStatus status) {
-        List<SpDataStreamRelayContainer> relays = findRelaysWhenStopping(predecessorsBeforeMigration,
-                migrationEntity.getSourceElement());
-
-        updateRelaysToBeDeleted(relays);
-
-        PipelineOperationStatus statusStopRelays = stopRelays(relays);
-        updateMigrationStatus(statusStopRelays, status);
-
-        return statusStopRelays;
-    }
-
-    private PipelineOperationStatus startTargetPipelineElementsAndRelays(PipelineOperationStatus status) {
-        List<InvocableStreamPipesEntity> decryptedGraphs =
-                decryptSecrets(Collections.singletonList(migrationEntity.getTargetElement()));
-        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(decryptedGraphs);
-
-        updateRelaysToBePersisted(relays);
-
-        PipelineOperationStatus statusStartTarget = startPipelineElementsAndRelays(decryptedGraphs, relays);
-        updateMigrationStatus(statusStartTarget, status);
-
-        return statusStartTarget;
-    }
-
-    private PipelineOperationStatus stopOriginPipelineElementsAndRelays(PipelineOperationStatus status) {
-        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getSourceElement());
-        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
-
-        updateRelaysToBeDeleted(relays);
-
-        PipelineOperationStatus statusStopOrigin = stopPipelineElementsAndRelays(graphs, relays);
-        updateMigrationStatus(statusStopOrigin, status);
-
-        return statusStopOrigin;
-    }
-
-    // Helpers
-
-    private void updateRelaysToBePersisted(List<SpDataStreamRelayContainer> relays) {
-        relays.stream()
-                .filter(r -> r.getOutputStreamRelays().size() > 0)
-                .forEach(relaysToBePersisted::add);
-    }
-
-    private void updateRelaysToBeDeleted(List<SpDataStreamRelayContainer> relays) {
-        relays.stream()
-                .filter(r -> r.getOutputStreamRelays().size() > 0)
-                .forEach(relaysToBeDeleted::add);
-    }
-
-    private List<SpDataStreamRelayContainer> findRelaysAndFilterById(Set<String> relayIdsToRollback,
-                                                                     List<NamedStreamPipesEntity> predecessor,
-                                                                     InvocableStreamPipesEntity target) {
-        return findRelays(predecessor, target).stream()
-                .filter(relay -> relayIdsToRollback.contains(relay.getRunningStreamRelayInstanceId()))
-                .collect(Collectors.toList());
-    }
-
-    private void findAndComparePredecessorsInCurrentPipeline(PipelineGraph pipelineGraphBeforeMigration) {
-        predecessorsAfterMigration.forEach(migrationPredecessor ->
-                predecessorsBeforeMigration.add(findMatching(migrationPredecessor, pipelineGraphBeforeMigration)));
-    }
-
-    private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
-        // get unique list of predecessors
-        List<NamedStreamPipesEntity> predecessors = PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).stream()
-                .map(stream -> getPredecessors(stream, migrationEntity.getTargetElement(),
-                        pipelineGraphAfterMigration, new ArrayList<>()))
-                .flatMap(List::stream)
-                .collect(Collectors.toList())
-                .stream()
-                .distinct()
-                .collect(Collectors.toList());
-
-        predecessorsAfterMigration.addAll(predecessors);
-    }
-
-    private List<SpDataSet> findDataSets() {
-        return pipeline.getStreams().stream()
-                .filter(s -> s instanceof SpDataSet)
-                .map(s -> new SpDataSet((SpDataSet) s))
-                .collect(Collectors.toList());
-    }
-
-    private void buildGraphWithRelays(PipelineGraph pipelineGraphAfterMigration) {
-        new InvocationGraphBuilder(pipelineGraphAfterMigration, pipeline.getPipelineId()).buildGraphs();
-    }
-
-    private void purgeExistingRelays() {
-        pipeline.getSepas().forEach(s -> s.setOutputStreamRelays(new ArrayList<>()));
-    }
-
-    private void updateMigrationStatus(PipelineOperationStatus partialStatus, PipelineOperationStatus status) {
-        // Add status to global migration status
-        partialStatus.getElementStatus().forEach(status::addPipelineElementStatus);
-    }
-
-    private List<SpDataStreamRelayContainer> extractRelaysFromDataProcessor(List<InvocableStreamPipesEntity> graphs) {
-        return graphs.stream()
-                .map(DataProcessorInvocation.class::cast)
-//                .map(p -> {
-//                    String modifiedId = p.getDeploymentRunningInstanceId() + "-" + p.getDeploymentTargetNodeId();
-//                    p.setDeploymentRunningInstanceId(modifiedId);
-//                    return p;
-//                })
-                .map(SpDataStreamRelayContainer::new)
-                .collect(Collectors.toList());
-    }
-
-    private Set<String> extractUniqueRelayIds(PipelineOperationStatus status) {
-        return status.getElementStatus().stream()
-                .filter(PipelineElementStatus::isSuccess)
-                .map(PipelineElementStatus::getElementId)
-                .collect(Collectors.toSet());
-    }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
index 0bd26ec..3e50d91 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
@@ -24,16 +24,13 @@ import org.apache.streampipes.manager.execution.pipeline.executor.operations.typ
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
 import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
-import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.List;
 
 public class PipelineExecutor {
 
@@ -42,9 +39,6 @@ public class PipelineExecutor {
     private boolean storeStatus;
     private boolean monitor;
 
-    /**
-     * Old pipeline before migration
-     */
     private Pipeline secondaryPipeline;
 
     /**
@@ -52,58 +46,29 @@ public class PipelineExecutor {
      */
     private PipelineElementMigrationEntity migrationEntity;
 
-    /**
-     * Predecessors of the pipeline element to be migrated in the migration pipeline
-     */
-    private List<NamedStreamPipesEntity> predecessorsAfterMigration;
-
-    /**
-     * Predecessors of the pipeline element to be migrated in the old pipeline before migration
-     */
-    private List<NamedStreamPipesEntity> predecessorsBeforeMigration;
-
-    /**
-     * Collection of relays that were created in the migration process needing to be stored
-     */
-    private List<SpDataStreamRelayContainer> relaysToBePersisted;
-
-    /**
-     * Collection of relays that were removed in the migration process needing to be deleted from persistent storage.
-     */
-    private List<SpDataStreamRelayContainer> relaysToBeDeleted;
-
-    private PipelineOperationStatus status;
-
     private PipelineElementReconfigurationEntity reconfigurationEntity;
 
-    private final LinkedList<PipelineExecutionOperation> operations = new LinkedList<>();
-
-    private List<SpDataSet> dataSets;
+    private final LifecycleEntity<InvocableStreamPipesEntity> graphs;
 
-    private List<SpDataStreamRelayContainer> relays;
+    private final LifecycleEntity<SpDataStreamRelayContainer> relays;
 
-    private List<InvocableStreamPipesEntity> graphs;
+    private final LifecycleEntity<SpDataSet> dataSets;
 
-    private final LifecycleEntity<InvocableStreamPipesEntity> grafs;
 
-    private final LifecycleEntity<SpDataStreamRelayContainer> relais;
+    private PipelineOperationStatus status;
 
-    private final LifecycleEntity<SpDataSet> dataZeds;
+    private final LinkedList<PipelineExecutionOperation> operations = new LinkedList<>();
 
     public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor){
         this.pipeline = pipeline;
         this.visualize = visualize;
         this.storeStatus = storeStatus;
         this.monitor = monitor;
-        this.predecessorsAfterMigration = new ArrayList<>();
-        this.predecessorsBeforeMigration = new ArrayList<>();
-        this.relaysToBePersisted = new ArrayList<>();
-        this.relaysToBeDeleted = new ArrayList<>();
         this.status = StatusUtils.initPipelineOperationStatus(pipeline);
 
-        this.grafs = new LifecycleEntity<>();
-        this.relais = new LifecycleEntity<>();
-        this.dataZeds = new LifecycleEntity<>();
+        this.graphs = new LifecycleEntity<>();
+        this.relays = new LifecycleEntity<>();
+        this.dataSets = new LifecycleEntity<>();
     }
 
     public PipelineOperationStatus execute(){
@@ -134,12 +99,21 @@ public class PipelineExecutor {
         StatusUtils.updateStatus(rollbackStatus, this.status);
     }
 
-    //Getter and Setter
-
     public void addOperation(PipelineExecutionOperation operation){
         this.operations.add(operation);
     }
 
+    public boolean containsReconfigurationOperation(){
+        return this.operations.stream().anyMatch(operation -> operation instanceof ReconfigurationOperation);
+    }
+
+    public boolean containsMigrationOperation(){
+        return this.operations.stream().anyMatch(operation -> operation instanceof MigrationOperation);
+    }
+
+
+    //Getter and Setter
+
     public Pipeline getPipeline() {
         return pipeline;
     }
@@ -188,38 +162,6 @@ public class PipelineExecutor {
         this.migrationEntity = migrationEntity;
     }
 
-    public List<NamedStreamPipesEntity> getPredecessorsAfterMigration() {
-        return predecessorsAfterMigration;
-    }
-
-    public void setPredecessorsAfterMigration(List<NamedStreamPipesEntity> predecessorsAfterMigration) {
-        this.predecessorsAfterMigration = predecessorsAfterMigration;
-    }
-
-    public List<NamedStreamPipesEntity> getPredecessorsBeforeMigration() {
-        return predecessorsBeforeMigration;
-    }
-
-    public void setPredecessorsBeforeMigration(List<NamedStreamPipesEntity> predecessorsBeforeMigration) {
-        this.predecessorsBeforeMigration = predecessorsBeforeMigration;
-    }
-
-    public List<SpDataStreamRelayContainer> getRelaysToBePersisted() {
-        return relaysToBePersisted;
-    }
-
-    public void setRelaysToBePersisted(List<SpDataStreamRelayContainer> relaysToBePersisted) {
-        this.relaysToBePersisted = relaysToBePersisted;
-    }
-
-    public List<SpDataStreamRelayContainer> getRelaysToBeDeleted() {
-        return relaysToBeDeleted;
-    }
-
-    public void setRelaysToBeDeleted(List<SpDataStreamRelayContainer> relaysToBeDeleted) {
-        this.relaysToBeDeleted = relaysToBeDeleted;
-    }
-
     public PipelineOperationStatus getStatus() {
         return status;
     }
@@ -236,48 +178,15 @@ public class PipelineExecutor {
         this.reconfigurationEntity = reconfigurationEntity;
     }
 
-    public List<SpDataSet> getDataSets() {
-        return dataSets;
-    }
-
-    public void setDataSets(List<SpDataSet> dataSets) {
-        this.dataSets = dataSets;
-    }
-
-    public List<SpDataStreamRelayContainer> getRelays() {
-        return relays;
-    }
-
-    public void setRelays(List<SpDataStreamRelayContainer> relays) {
-        this.relays = relays;
-    }
-
-    public List<InvocableStreamPipesEntity> getGraphs() {
+    public LifecycleEntity<InvocableStreamPipesEntity> getGraphs() {
         return graphs;
     }
 
-    public void setGraphs(List<InvocableStreamPipesEntity> graphs) {
-        this.graphs = graphs;
-    }
-
-
-    public LifecycleEntity<InvocableStreamPipesEntity> getGrafs() {
-        return grafs;
-    }
-
-    public LifecycleEntity<SpDataStreamRelayContainer> getRelais() {
-        return relais;
-    }
-
-    public LifecycleEntity<SpDataSet> getDataZeds() {
-        return dataZeds;
-    }
-
-    public boolean containsReconfigurationOperation(){
-        return this.operations.stream().anyMatch(operation -> operation instanceof ReconfigurationOperation);
+    public LifecycleEntity<SpDataStreamRelayContainer> getRelays() {
+        return relays;
     }
 
-    public boolean containsMigrationOperation(){
-        return this.operations.stream().anyMatch(operation -> operation instanceof MigrationOperation);
+    public LifecycleEntity<SpDataSet> getDataSets() {
+        return dataSets;
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
index 52fc870..d423cf1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
@@ -68,23 +68,23 @@ public class PipelineExecutorBuilder {
         return this;
     }
 
-    public PipelineExecutorBuilder addStartRelaysFromPredecessorsOperation(){
-        pipelineExecutor.addOperation(new StartRelaysFromPredecessorsOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStartRelaysOperation(){
+        pipelineExecutor.addOperation(new StartRelaysOperation(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStartTargetPipelineElementsOperation(){
-        pipelineExecutor.addOperation(new StartTargetPipelineElementsOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStartGraphsAndAssociatedRelaysOperation(){
+        pipelineExecutor.addOperation(new StartGraphsAndAssociatedRelaysOperation(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStopOriginPipelineElementAndRelaysOperation(){
-        pipelineExecutor.addOperation(new StopOriginPipelineElementAndRelaysOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStopGraphsAndAssociatedRelaysOperation(){
+        pipelineExecutor.addOperation(new StopGraphsAndAssociatedRelaysOperation(pipelineExecutor));
         return this;
     }
 
-    public PipelineExecutorBuilder addStopRelaysFromPredecessorOperation(){
-        pipelineExecutor.addOperation(new StopRelaysFromPredecessorOperation(pipelineExecutor));
+    public PipelineExecutorBuilder addStopRelaysOperation(){
+        pipelineExecutor.addOperation(new StopRelaysOperation(pipelineExecutor));
         return this;
     }
 
@@ -98,6 +98,11 @@ public class PipelineExecutorBuilder {
         return this;
     }
 
+    public PipelineExecutorBuilder addPreparePipelineStartOperation(){
+        pipelineExecutor.addOperation(new PrepareStartPipelineOperation(pipelineExecutor));
+        return this;
+    }
+
     public PipelineExecutorBuilder addStartPipelineOperation(){
         pipelineExecutor.addOperation(new StartPipelineOperation(pipelineExecutor));
         return this;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
index 413edd3..617ff01 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
@@ -32,10 +32,10 @@ public class PipelineExecutorFactory {
                 .addPrepareMigrationOperation();
         if(migrationEntity.getTargetElement().isStateful())
             builder.addGetStateOperation();
-        builder.addStartTargetPipelineElementsOperation()
-                .addStopRelaysFromPredecessorOperation()
-                .addStartRelaysFromPredecessorsOperation()
-                .addStopOriginPipelineElementAndRelaysOperation()
+        builder.addStartGraphsAndAssociatedRelaysOperation()
+                .addStopRelaysOperation()
+                .addStartRelaysOperation()
+                .addStopGraphsAndAssociatedRelaysOperation()
                 .addStoreMigratedPipelineOperation();
         return builder.buildPipelineExecutor();
     }
@@ -51,7 +51,10 @@ public class PipelineExecutorFactory {
     public static PipelineExecutor createInvocationExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
                                                             boolean monitor){
         return PipelineExecutorBuilder.getBuilder().initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
-                .addStartPipelineOperation().addStorePipelineOperation().buildPipelineExecutor();
+                .addPreparePipelineStartOperation()
+                .addStartPipelineOperation()
+                .addStorePipelineOperation()
+                .buildPipelineExecutor();
     }
 
     public static PipelineExecutor createDetachExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
index c0fe26c..e11609b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
@@ -39,20 +39,20 @@ public class GetStateOperation extends PipelineExecutionOperation implements Mig
     public PipelineOperationStatus executeOperation() {
         long nanoTimeBeforeOperation = System.nanoTime();
         PipelineElementStatus statusGettingState = CommunicationUtils.getState(
-                associatedPipelineExecutor.getMigrationEntity().getSourceElement(),
-                associatedPipelineExecutor.getPipeline());
+                pipelineExecutor.getMigrationEntity().getSourceElement(),
+                pipelineExecutor.getPipeline());
         if(statusGettingState.isSuccess()) {
-            associatedPipelineExecutor.getMigrationEntity().getTargetElement()
+            pipelineExecutor.getMigrationEntity().getTargetElement()
                     .setState(statusGettingState.getOptionalMessage());
             statusGettingState.setOptionalMessage("Successfully retrieved state");
         }
-        PipelineOperationStatus getStateStatus = StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        PipelineOperationStatus getStateStatus = StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
         getStateStatus.addPipelineElementStatus(statusGettingState);
         StatusUtils.checkSuccess(getStateStatus);
         long duration = System.nanoTime() - nanoTimeBeforeOperation;
         EvaluationLogger.getInstance().logMQTT("Migration", "get state", "", duration, duration/1000000000.0);
         try {
-            int stateSize = getSizeInBytes(associatedPipelineExecutor.getMigrationEntity().getTargetElement().getState());
+            int stateSize = getSizeInBytes(pipelineExecutor.getMigrationEntity().getTargetElement().getState());
             EvaluationLogger.getInstance().logMQTT("Migration", "state size", stateSize/1024.0, stateSize/1048576.0);
         } catch (IOException e) {
             e.printStackTrace();
@@ -73,13 +73,13 @@ public class GetStateOperation extends PipelineExecutionOperation implements Mig
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        associatedPipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        pipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        associatedPipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        pipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
index 09777ba..d2c6d8c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
 package org.apache.streampipes.manager.execution.pipeline.executor.operations;
 
 import java.util.ArrayList;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
index 1d6d76a..e822ec4 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
@@ -23,13 +23,13 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 public abstract class PipelineExecutionOperation {
 
-    protected final PipelineExecutor associatedPipelineExecutor;
+    protected final PipelineExecutor pipelineExecutor;
 
     private PipelineOperationStatus status;
 
     public PipelineExecutionOperation(PipelineExecutor pipelineExecutor){
-        this.associatedPipelineExecutor = pipelineExecutor;
-        this.status = StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        this.pipelineExecutor = pipelineExecutor;
+        this.status = StatusUtils.initPipelineOperationStatus(this.pipelineExecutor.getPipeline());
     }
 
     public abstract PipelineOperationStatus executeOperation();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
index 9b668b7..4c9304d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
@@ -22,25 +22,34 @@ import org.apache.streampipes.manager.data.PipelineGraphBuilder;
 import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.model.pipeline.Pipeline;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
 public class PrepareMigrationOperation extends PipelineExecutionOperation implements MigrationOperation {
 
+    private List<NamedStreamPipesEntity> predecessorsAfterMigration;
+    private final List<NamedStreamPipesEntity> predecessorsBeforeMigration = new ArrayList<>();
+    private Pipeline pipeline;
+
     public PrepareMigrationOperation(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
     @Override
     public PipelineOperationStatus executeOperation() {
-        Pipeline pipeline = associatedPipelineExecutor.getPipeline();
+        this.pipeline = pipelineExecutor.getPipeline();
         //Purge existing relays
         PipelineUtils.purgeExistingRelays(pipeline);
 
@@ -51,42 +60,78 @@ public class PrepareMigrationOperation extends PipelineExecutionOperation implem
 
         //Get predecessors
         PipelineGraph pipelineGraphBeforeMigration =
-                new PipelineGraphBuilder(associatedPipelineExecutor.getSecondaryPipeline()).buildGraph();
-        findPredecessorsInMigrationPipeline(pipelineGraphAfterMigration);
+                new PipelineGraphBuilder(pipelineExecutor.getSecondaryPipeline()).buildGraph();
+        findPredecessorsAfterMigration(pipelineGraphAfterMigration);
 
         //Find counterpart for predecessors in currentPipeline
-        findAndComparePredecessorsInCurrentPipeline(pipelineGraphBeforeMigration);
+        findPredecessorsBeforeMigration(pipelineGraphBeforeMigration);
+
+        initializeGraphs();
+        initializeRelays();
+
         return StatusUtils.initPipelineOperationStatus(pipeline);
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        return null;
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        return null;
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 
-    private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
+    private void findPredecessorsAfterMigration(PipelineGraph pipelineGraphAfterMigration) {
         // get unique list of predecessors
-        List<NamedStreamPipesEntity> predecessors = PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).stream()
+        this.predecessorsAfterMigration = PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).stream()
                 .map(stream -> PipelineUtils.getPredecessors(stream,
-                        associatedPipelineExecutor.getMigrationEntity().getTargetElement(),
+                        pipelineExecutor.getMigrationEntity().getTargetElement(),
                         pipelineGraphAfterMigration, new ArrayList<>()))
                 .flatMap(List::stream)
                 .collect(Collectors.toList())
                 .stream()
                 .distinct()
                 .collect(Collectors.toList());
+    }
 
-        associatedPipelineExecutor.getPredecessorsAfterMigration().addAll(predecessors);
+    private void findPredecessorsBeforeMigration(PipelineGraph pipelineGraphBeforeMigration) {
+        this.predecessorsAfterMigration.forEach(migrationPredecessor ->
+        {
+            NamedStreamPipesEntity predecessor = PipelineUtils.findMatching(migrationPredecessor, pipelineGraphBeforeMigration);
+            this.predecessorsBeforeMigration
+                    .add(predecessor);
+        });
     }
 
-    private void findAndComparePredecessorsInCurrentPipeline(PipelineGraph pipelineGraphBeforeMigration) {
-        associatedPipelineExecutor.getPredecessorsAfterMigration().forEach(migrationPredecessor ->
-                associatedPipelineExecutor.getPredecessorsBeforeMigration()
-                        .add(PipelineUtils.findMatching(migrationPredecessor, pipelineGraphBeforeMigration)));
+    private void initializeGraphs(){
+        List<InvocableStreamPipesEntity> decryptedTargetGraphs = PipelineElementUtils.decryptSecrets(
+                Collections.singletonList(pipelineExecutor.getMigrationEntity().getTargetElement()),
+                pipelineExecutor.getPipeline());
+        pipelineExecutor.getGraphs().getEntitiesToStart().addAll(decryptedTargetGraphs);
+
+        List<InvocableStreamPipesEntity> originGraphs = Collections.singletonList(
+                pipelineExecutor.getMigrationEntity().getSourceElement());
+        pipelineExecutor.getGraphs().getEntitiesToStop().addAll(originGraphs);
+
+        pipelineExecutor.getGraphs().getEntitiesToStore().addAll(pipeline.getActions());
+        pipelineExecutor.getGraphs().getEntitiesToStore().addAll(pipeline.getSepas());
+    }
+
+    private void initializeRelays(){
+        List<SpDataStreamRelayContainer> relaysToTarget = RelayUtils.findRelays(
+                predecessorsAfterMigration,
+                pipelineExecutor.getMigrationEntity().getTargetElement(),
+                pipelineExecutor.getPipeline());
+        pipelineExecutor.getRelays().getEntitiesToStart().addAll(relaysToTarget);
+        pipelineExecutor.getRelays().getEntitiesToStore().addAll(relaysToTarget);
+
+        List<SpDataStreamRelayContainer> relaysToOrigin = RelayUtils
+                .findRelaysWhenStopping(
+                        predecessorsBeforeMigration,
+                        pipelineExecutor.getMigrationEntity().getSourceElement(),
+                        pipelineExecutor.getPipeline());
+        pipelineExecutor.getRelays().getEntitiesToStop().addAll(relaysToOrigin);
+        pipelineExecutor.getRelays().getEntitiesToDelete().addAll(relaysToOrigin);
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
similarity index 80%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
index 29bc7e6..264ac19 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
@@ -17,7 +17,6 @@
  */
 package org.apache.streampipes.manager.execution.pipeline.executor.operations;
 
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
@@ -36,15 +35,16 @@ import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-public class StartPipelineOperation extends PipelineExecutionOperation{
+public class PrepareStartPipelineOperation extends PipelineExecutionOperation {
 
-    public StartPipelineOperation(PipelineExecutor pipelineExecutor) {
+
+    public PrepareStartPipelineOperation(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
     @Override
     public PipelineOperationStatus executeOperation() {
-        Pipeline pipeline = associatedPipelineExecutor.getPipeline();
+        Pipeline pipeline = pipelineExecutor.getPipeline();
         pipeline.getSepas().forEach(this::updateKafkaGroupIds);
         pipeline.getActions().forEach(this::updateKafkaGroupIds);
 
@@ -68,24 +68,23 @@ public class StartPipelineOperation extends PipelineExecutionOperation{
 
         List<SpDataStreamRelayContainer> relays = PipelineElementUtils.generateRelays(decryptedGraphs, pipeline);
 
-        associatedPipelineExecutor.setGraphs(graphs);
-        associatedPipelineExecutor.setDataSets(dataSets);
-        associatedPipelineExecutor.setRelays(relays);
-
-        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
-                decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
+        pipelineExecutor.getGraphs().getEntitiesToStart().addAll(graphs);
+        pipelineExecutor.getDataSets().getEntitiesToStart().addAll(dataSets);
+        pipelineExecutor.getRelays().getEntitiesToStart().addAll(relays);
+        pipelineExecutor.getGraphs().getEntitiesToStore().addAll(graphs);
+        pipelineExecutor.getDataSets().getEntitiesToStore().addAll(dataSets);
+        pipelineExecutor.getRelays().getEntitiesToStore().addAll(relays);
+        return StatusUtils.initPipelineOperationStatus(pipeline);
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        //TODO: Check if there is a possible realization of partial rollback
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        //TODO: Implement full rollback
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 
     /**
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
index 8e0e8c2..2e66272 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
@@ -21,8 +21,15 @@ import org.apache.streampipes.manager.execution.http.ReconfigurationSubmitter;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.ReconfigurationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class ReconfigureElementOperation extends PipelineExecutionOperation implements ReconfigurationOperation {
 
@@ -32,18 +39,49 @@ public class ReconfigureElementOperation extends PipelineExecutionOperation impl
 
     @Override
     public PipelineOperationStatus executeOperation() {
-        Pipeline reconfiguredPipeline = associatedPipelineExecutor.getSecondaryPipeline();
+        Pipeline reconfiguredPipeline = pipelineExecutor.getSecondaryPipeline();
         return new ReconfigurationSubmitter(reconfiguredPipeline.getPipelineId(), reconfiguredPipeline.getName(),
-                associatedPipelineExecutor.getReconfigurationEntity()).reconfigure();
+                pipelineExecutor.getReconfigurationEntity()).reconfigure();
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        return rollbackOperationFully();
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        Pipeline originalPipeline = pipelineExecutor.getPipeline();
+        PipelineElementReconfigurationEntity originalReconfigurationEntity = pipelineExecutor.getReconfigurationEntity();
+
+        Optional<DataProcessorInvocation> reconfiguredEntity =
+                findReconfiguredEntity(originalPipeline, originalReconfigurationEntity);
+
+        if(!reconfiguredEntity.isPresent())
+            return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
+
+        PipelineElementReconfigurationEntity rollbackReconfigurationEntity =
+                new PipelineElementReconfigurationEntity(reconfiguredEntity.get());
+
+        rollbackReconfigurationEntity.setReconfiguredStaticProperties(
+                findMatchingStaticProperty(reconfiguredEntity.get().getStaticProperties(),
+                        originalReconfigurationEntity.getReconfiguredStaticProperties()));
+
+        return new ReconfigurationSubmitter(originalPipeline.getPipelineId(), originalPipeline.getName(),
+                rollbackReconfigurationEntity).reconfigure();
+    }
+
+    private Optional<DataProcessorInvocation> findReconfiguredEntity(Pipeline originalPipeline,
+                                                                     PipelineElementReconfigurationEntity reconfigurationEntity){
+        return originalPipeline.getSepas().stream()
+                .filter(invocation -> invocation.getDeploymentRunningInstanceId()
+                        .equals(reconfigurationEntity.getDeploymentRunningInstanceId())).findFirst();
+    }
+
+    private List<StaticProperty> findMatchingStaticProperty(List<StaticProperty> listToSearchIn,
+                                                            List<StaticProperty> listToCompareTo){
+        return listToSearchIn.stream().filter(searchProperty ->
+            listToCompareTo.stream().anyMatch(compareProperty ->
+                    compareProperty.getInternalName().equals(searchProperty.getInternalName()))).collect(Collectors.toList());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartTargetPipelineElementsOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
similarity index 54%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartTargetPipelineElementsOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
index 9e79ab8..72d3802 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartTargetPipelineElementsOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
@@ -18,43 +18,35 @@
 package org.apache.streampipes.manager.execution.pipeline.executor.operations;
 
 import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
 import org.apache.streampipes.manager.execution.pipeline.executor.*;
 import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.*;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
-public class StartTargetPipelineElementsOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StartGraphsAndAssociatedRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
 
-    public StartTargetPipelineElementsOperation(PipelineExecutor pipelineExecutor){
+    public StartGraphsAndAssociatedRelaysOperation(PipelineExecutor pipelineExecutor){
         super(pipelineExecutor);
     }
 
     @Override
     public PipelineOperationStatus executeOperation() {
         long nanoTimeBeforeOperation = System.nanoTime();
-        List<InvocableStreamPipesEntity> decryptedGraphs =
-                PipelineElementUtils.decryptSecrets(
-                        Collections.singletonList(associatedPipelineExecutor.getMigrationEntity().getTargetElement()),
-                        associatedPipelineExecutor.getPipeline());
-        List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(decryptedGraphs);
+        List<InvocableStreamPipesEntity> graphs = pipelineExecutor.getGraphs().getEntitiesToStart();
+        List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
 
-        RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBePersisted());
+        RelayUtils.updateRelays(relays, pipelineExecutor.getRelays().getEntitiesToStore());
 
-        //State needs to be deleted to not store it
-        associatedPipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
+        //State needs to be deleted to not store it in database
+        pipelineExecutor.getGraphs().getEntitiesToStart().forEach(entity -> entity.setState(null));
 
         PipelineOperationStatus status = CommunicationUtils.
-                startPipelineElementsAndRelays(decryptedGraphs, relays, associatedPipelineExecutor.getPipeline());
+                startPipelineElementsAndRelays(graphs, relays, pipelineExecutor.getPipeline());
 
         long duration = System.nanoTime() - nanoTimeBeforeOperation;
         EvaluationLogger.getInstance().logMQTT("Migration", "start target element", "", duration, duration/1000000000.0);
@@ -64,17 +56,27 @@ public class StartTargetPipelineElementsOperation extends PipelineExecutionOpera
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        List<InvocableStreamPipesEntity> graphs =
-                Collections.singletonList(associatedPipelineExecutor.getMigrationEntity().getTargetElement());
-        List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
+        List<InvocableStreamPipesEntity> graphsToRollBack = pipelineExecutor.getGraphs().getEntitiesToStart();
+        List<SpDataStreamRelayContainer> relaysToRollBack = PipelineElementUtils.extractRelaysFromDataProcessor(graphsToRollBack);
 
-        return new GraphSubmitter(associatedPipelineExecutor.getPipeline().getPipelineId(),
-                associatedPipelineExecutor.getPipeline().getName(), graphs, new ArrayList<>(), relays)
-                .detachPipelineElementsAndRelays();
+        return CommunicationUtils.stopPipelineElementsAndRelays(graphsToRollBack,
+                relaysToRollBack, pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+
+        List<InvocableStreamPipesEntity> graphs = pipelineExecutor.getGraphs().getEntitiesToStart();
+        List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
+
+        List<InvocableStreamPipesEntity> graphsToRollBack =
+                PipelineElementUtils.filterPipelineElementsById(graphs, idsToRollback);
+
+        List<SpDataStreamRelayContainer> relaysToRollBack =
+                RelayUtils.filterRelaysById(relays, idsToRollback);
+
+        return CommunicationUtils.stopPipelineElementsAndRelays(graphsToRollBack,
+                relaysToRollBack, pipelineExecutor.getPipeline());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
index 29bc7e6..720d8c5 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
@@ -19,22 +19,19 @@ package org.apache.streampipes.manager.execution.pipeline.executor.operations;
 
 import org.apache.streampipes.manager.execution.http.GraphSubmitter;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.DataSetUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
+import java.util.Set;
+
 
 public class StartPipelineOperation extends PipelineExecutionOperation{
 
@@ -44,61 +41,44 @@ public class StartPipelineOperation extends PipelineExecutionOperation{
 
     @Override
     public PipelineOperationStatus executeOperation() {
-        Pipeline pipeline = associatedPipelineExecutor.getPipeline();
-        pipeline.getSepas().forEach(this::updateKafkaGroupIds);
-        pipeline.getActions().forEach(this::updateKafkaGroupIds);
-
-        List<DataProcessorInvocation> sepas = pipeline.getSepas();
-        List<DataSinkInvocation> secs = pipeline.getActions();
-
-        List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
-                SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-
-        for (SpDataSet ds : dataSets) {
-            ds.setCorrespondingPipeline(pipeline.getPipelineId());
-        }
-
-        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-        graphs.addAll(sepas);
-        graphs.addAll(secs);
-
-        List<InvocableStreamPipesEntity> decryptedGraphs = PipelineElementUtils.decryptSecrets(graphs, pipeline);
-
-        graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
-
-        List<SpDataStreamRelayContainer> relays = PipelineElementUtils.generateRelays(decryptedGraphs, pipeline);
-
-        associatedPipelineExecutor.setGraphs(graphs);
-        associatedPipelineExecutor.setDataSets(dataSets);
-        associatedPipelineExecutor.setRelays(relays);
-
+        Pipeline pipeline = pipelineExecutor.getPipeline();
         return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
-                decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
+                pipelineExecutor.getGraphs().getEntitiesToStart(),
+                pipelineExecutor.getDataSets().getEntitiesToStart(),
+                pipelineExecutor.getRelays().getEntitiesToStart()).invokePipelineElementsAndRelays();
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        //TODO: Check if there is a possible realization of partial rollback
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        Pipeline pipeline = pipelineExecutor.getPipeline();
+
+        Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+
+        List<InvocableStreamPipesEntity> graphsToRollBack =
+                PipelineElementUtils.filterPipelineElementsById(
+                        pipelineExecutor.getGraphs().getEntitiesToStart(),
+                        idsToRollback);
+        List<SpDataSet> dataSetsToRollBack =
+                DataSetUtils.filterDataSetsById(
+                        pipelineExecutor.getDataSets().getEntitiesToStart(),
+                        idsToRollback);
+        List<SpDataStreamRelayContainer> relaysToRollBack =
+                RelayUtils.filterRelaysById(
+                        pipelineExecutor.getRelays().getEntitiesToStart(),
+                        idsToRollback);
+
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                graphsToRollBack,
+                dataSetsToRollBack,
+                relaysToRollBack).detachPipelineElementsAndRelays();
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        //TODO: Implement full rollback
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
-    }
-
-    /**
-     * Updates group.id for data processor/sink. Note: KafkaTransportProtocol only!!
-     *
-     * @param entity    data processor/sink
-     */
-    private void updateKafkaGroupIds(InvocableStreamPipesEntity entity) {
-        entity.getInputStreams()
-                .stream()
-                .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
-                .map(is -> is.getEventGrounding().getTransportProtocol())
-                .map(KafkaTransportProtocol.class::cast)
-                .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
+        Pipeline pipeline = pipelineExecutor.getPipeline();
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                pipelineExecutor.getGraphs().getEntitiesToStart(),
+                pipelineExecutor.getDataSets().getEntitiesToStart(),
+                pipelineExecutor.getRelays().getEntitiesToStart()).detachPipelineElementsAndRelays();
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysFromPredecessorsOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
similarity index 56%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysFromPredecessorsOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
index 90bbf96..07c6480 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysFromPredecessorsOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
@@ -28,26 +28,20 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
-public class StartRelaysFromPredecessorsOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StartRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
 
 
-    public StartRelaysFromPredecessorsOperation(PipelineExecutor pipelineExecutor) {
+    public StartRelaysOperation(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
     @Override
     public PipelineOperationStatus executeOperation() {
         long nanoTimeBeforeOperation = System.nanoTime();
-        List<SpDataStreamRelayContainer> relays = RelayUtils.findRelays(
-                associatedPipelineExecutor.getPredecessorsAfterMigration(),
-                associatedPipelineExecutor.getMigrationEntity().getTargetElement(),
-                associatedPipelineExecutor.getPipeline());
+        List<SpDataStreamRelayContainer> relays = pipelineExecutor.getRelays().getEntitiesToStart();
 
-        RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBePersisted());
-
-        PipelineOperationStatus status= CommunicationUtils.startRelays(relays, associatedPipelineExecutor.getPipeline());
+        PipelineOperationStatus status= CommunicationUtils.startRelays(relays, pipelineExecutor.getPipeline());
 
         long duration = System.nanoTime() - nanoTimeBeforeOperation;
         EvaluationLogger.getInstance().logMQTT("Migration", "start relay to target", "", duration, duration/1000000000.0);
@@ -57,23 +51,17 @@ public class StartRelaysFromPredecessorsOperation extends PipelineExecutionOpera
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        Set<String> relayIdsToRollback = StatusUtils.extractUniqueRelayIds(this.getStatus());
-        List<SpDataStreamRelayContainer> relaysFromPredecessors = RelayUtils.findRelays(
-                associatedPipelineExecutor.getPredecessorsAfterMigration(),
-                associatedPipelineExecutor.getMigrationEntity().getTargetElement(),
-                associatedPipelineExecutor.getPipeline());
-        List<SpDataStreamRelayContainer> relaysToRollBack = relaysFromPredecessors.stream().
-                filter(relay -> relayIdsToRollback.contains(relay.getRunningStreamRelayInstanceId()))
-                .collect(Collectors.toList());
-        return CommunicationUtils.stopRelays(relaysToRollBack, associatedPipelineExecutor.getPipeline());
+        Set<String> relayIdsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+        List<SpDataStreamRelayContainer> relaysToRollBack =
+                RelayUtils.filterRelaysById(pipelineExecutor.getRelays().getEntitiesToStart(),
+                        relayIdsToRollback);
+
+        return CommunicationUtils.stopRelays(relaysToRollBack, pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        List<SpDataStreamRelayContainer> relaysToRollBack = RelayUtils.findRelays(
-                associatedPipelineExecutor.getPredecessorsAfterMigration(),
-                associatedPipelineExecutor.getMigrationEntity().getTargetElement(),
-                associatedPipelineExecutor.getPipeline());
-        return CommunicationUtils.stopRelays(relaysToRollBack, associatedPipelineExecutor.getPipeline());
+        List<SpDataStreamRelayContainer> relaysToRollBack = pipelineExecutor.getRelays().getEntitiesToStart();
+        return CommunicationUtils.stopRelays(relaysToRollBack, pipelineExecutor.getPipeline());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopOriginPipelineElementAndRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
similarity index 62%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopOriginPipelineElementAndRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
index 42f7c3d..2e1a0d0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopOriginPipelineElementAndRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
@@ -23,29 +23,29 @@ import org.apache.streampipes.manager.execution.pipeline.executor.utils.Communic
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
-import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
-public class StopOriginPipelineElementAndRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StopGraphsAndAssociatedRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
 
-    public StopOriginPipelineElementAndRelaysOperation(PipelineExecutor pipelineExecutor) {
+    public StopGraphsAndAssociatedRelaysOperation(PipelineExecutor pipelineExecutor) {
         super(pipelineExecutor);
     }
 
     @Override
     public PipelineOperationStatus executeOperation() {
         long nanoTimeBeforeOperation = System.nanoTime();
-        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(
-                associatedPipelineExecutor.getMigrationEntity().getSourceElement());
+        List<InvocableStreamPipesEntity> graphs = pipelineExecutor.getGraphs().getEntitiesToStop();
         List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
 
-        RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBeDeleted());
+        RelayUtils.updateRelays(relays, pipelineExecutor.getRelays().getEntitiesToDelete());
 
-        PipelineOperationStatus status = CommunicationUtils.stopPipelineElementsAndRelays(graphs, relays, associatedPipelineExecutor.getPipeline());
+        PipelineOperationStatus status = CommunicationUtils.stopPipelineElementsAndRelays(graphs, relays, pipelineExecutor.getPipeline());
 
         long duration = System.nanoTime() - nanoTimeBeforeOperation;
         EvaluationLogger.getInstance().logMQTT("Migration", "stop origin element", "", duration, duration/1000000000.0);
@@ -55,17 +55,25 @@ public class StopOriginPipelineElementAndRelaysOperation extends PipelineExecuti
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        //TODO: Implement
-        return null;
+        Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+
+        List<InvocableStreamPipesEntity> graphs = pipelineExecutor.getGraphs().getEntitiesToStop();
+        List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
+
+        List<InvocableStreamPipesEntity> graphsToRollBack =
+                PipelineElementUtils.filterPipelineElementsById(graphs, idsToRollback);
+
+        List<SpDataStreamRelayContainer> relaysToRollBack =
+                RelayUtils.filterRelaysById(relays, idsToRollback);
+
+        return CommunicationUtils.startPipelineElementsAndRelays(graphsToRollBack, relaysToRollBack,
+                pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(
-                associatedPipelineExecutor.getMigrationEntity().getSourceElement());
-        List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
-
-        RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBeDeleted());
-        return CommunicationUtils.startPipelineElementsAndRelays(graphs, relays, associatedPipelineExecutor.getPipeline());
+        List<InvocableStreamPipesEntity> graphsToRollBack = pipelineExecutor.getGraphs().getEntitiesToStop();
+        List<SpDataStreamRelayContainer> relaysToRollBack = PipelineElementUtils.extractRelaysFromDataProcessor(graphsToRollBack);
+        return CommunicationUtils.startPipelineElementsAndRelays(graphsToRollBack, relaysToRollBack, pipelineExecutor.getPipeline());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
index 2e8f367..1f28247 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
@@ -19,9 +19,7 @@ package org.apache.streampipes.manager.execution.pipeline.executor.operations;
 
 import org.apache.streampipes.manager.execution.http.GraphSubmitter;
 import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.StorageUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.*;
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
 import org.apache.streampipes.manager.util.TemporaryGraphStorage;
 import org.apache.streampipes.model.SpDataSet;
@@ -33,6 +31,7 @@ import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
+import java.util.Set;
 
 public class StopPipelineOperation extends PipelineExecutionOperation {
 
@@ -42,7 +41,7 @@ public class StopPipelineOperation extends PipelineExecutionOperation {
 
     @Override
     public PipelineOperationStatus executeOperation() {
-        Pipeline pipeline = associatedPipelineExecutor.getPipeline();
+        Pipeline pipeline = pipelineExecutor.getPipeline();
         List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
         List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
         List<SpDataStreamRelayContainer> relays = PipelineElementUtils.generateRelays(graphs, pipeline);
@@ -51,8 +50,8 @@ public class StopPipelineOperation extends PipelineExecutionOperation {
                 dataSets, relays).detachPipelineElementsAndRelays();
 
         if (status.isSuccess()) {
-            if (associatedPipelineExecutor.isMonitor()) StorageUtils.deleteVisualization(pipeline.getPipelineId());
-            if (associatedPipelineExecutor.isStoreStatus()) StorageUtils.setPipelineStopped(pipeline);
+            if (pipelineExecutor.isMonitor()) StorageUtils.deleteVisualization(pipeline.getPipelineId());
+            if (pipelineExecutor.isStoreStatus()) StorageUtils.setPipelineStopped(pipeline);
 
             StorageUtils.deleteDataStreamRelayContainer(relays);
 
@@ -67,13 +66,32 @@ public class StopPipelineOperation extends PipelineExecutionOperation {
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        //TODO: Implement sth?
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        Pipeline pipeline = pipelineExecutor.getPipeline();
+
+        Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+
+        List<InvocableStreamPipesEntity> graphsToRollBack =
+                PipelineElementUtils.filterPipelineElementsById(
+                        pipelineExecutor.getGraphs().getEntitiesToStop(),
+                        idsToRollback);
+        List<SpDataSet> dataSetsToRollBack =
+                DataSetUtils.filterDataSetsById(
+                        pipelineExecutor.getDataSets().getEntitiesToStart(),
+                        idsToRollback);
+        List<SpDataStreamRelayContainer> relaysToRollBack =
+                RelayUtils.filterRelaysById(
+                        pipelineExecutor.getRelays().getEntitiesToStart(),
+                        idsToRollback);
+
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                graphsToRollBack,
+                dataSetsToRollBack,
+                relaysToRollBack).invokePipelineElementsAndRelays();
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
         //TODO: Implement sth?
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysFromPredecessorOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
similarity index 57%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysFromPredecessorOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
index c1ee4ad..dee1933 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysFromPredecessorOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
@@ -21,7 +21,6 @@ import org.apache.streampipes.logging.evaluation.EvaluationLogger;
 import org.apache.streampipes.manager.execution.pipeline.executor.*;
 import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
 import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
@@ -30,24 +29,19 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import java.util.List;
 import java.util.Set;
 
-public class StopRelaysFromPredecessorOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StopRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
 
-    public StopRelaysFromPredecessorOperation(PipelineExecutor pipelineExecutor){
+    public StopRelaysOperation(PipelineExecutor pipelineExecutor){
         super(pipelineExecutor);
     }
 
     @Override
     public PipelineOperationStatus executeOperation() {
         long nanoTimeBeforeOperation = System.nanoTime();
-        List<SpDataStreamRelayContainer> relays = RelayUtils
-                .findRelaysWhenStopping(associatedPipelineExecutor.getPredecessorsBeforeMigration(),
-                associatedPipelineExecutor.getMigrationEntity().getSourceElement(),
-                        associatedPipelineExecutor.getPipeline());
-
-        RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBeDeleted());
+        List<SpDataStreamRelayContainer> relays = pipelineExecutor.getRelays().getEntitiesToStop();
 
         PipelineOperationStatus statusStopRelays =
-                CommunicationUtils.stopRelays(relays, associatedPipelineExecutor.getPipeline());
+                CommunicationUtils.stopRelays(relays, pipelineExecutor.getPipeline());
 
         long duration = System.nanoTime() - nanoTimeBeforeOperation;
         EvaluationLogger.getInstance().logMQTT("Migration", "stop relay from origin", "", duration, duration/1000000000.0);
@@ -57,21 +51,17 @@ public class StopRelaysFromPredecessorOperation extends PipelineExecutionOperati
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        Set<String> relayIdsToRollback = StatusUtils.extractUniqueRelayIds(this.getStatus());
-        List<SpDataStreamRelayContainer> rollbackRelays = PipelineElementUtils.findRelaysAndFilterById(relayIdsToRollback,
-                associatedPipelineExecutor.getPredecessorsBeforeMigration(),
-                associatedPipelineExecutor.getMigrationEntity().getSourceElement(),
-                associatedPipelineExecutor.getPipeline());
+        Set<String> relayIdsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+        List<SpDataStreamRelayContainer> rollbackRelays =
+                RelayUtils.filterRelaysById(pipelineExecutor.getRelays().getEntitiesToStop(),
+                        relayIdsToRollback);
 
-        return CommunicationUtils.startRelays(rollbackRelays, associatedPipelineExecutor.getPipeline());
+        return CommunicationUtils.startRelays(rollbackRelays, pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        List<SpDataStreamRelayContainer> rollbackRelays = RelayUtils.findRelays(
-                associatedPipelineExecutor.getPredecessorsBeforeMigration(),
-                associatedPipelineExecutor.getMigrationEntity().getSourceElement(),
-                associatedPipelineExecutor.getPipeline());
-        return CommunicationUtils.startRelays(rollbackRelays, associatedPipelineExecutor.getPipeline());
+        List<SpDataStreamRelayContainer> rollbackRelays = pipelineExecutor.getRelays().getEntitiesToStop();
+        return CommunicationUtils.startRelays(rollbackRelays, pipelineExecutor.getPipeline());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
index e691e9d..895e38d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
@@ -22,10 +22,8 @@ import org.apache.streampipes.manager.execution.pipeline.executor.utils.Pipeline
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
 import org.apache.streampipes.manager.execution.pipeline.executor.utils.StorageUtils;
 import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
-import java.util.ArrayList;
 import java.util.List;
 
 public class StoreMigratedPipelineOperation extends PipelineExecutionOperation{
@@ -36,26 +34,23 @@ public class StoreMigratedPipelineOperation extends PipelineExecutionOperation{
 
     @Override
     public PipelineOperationStatus executeOperation() {
-        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-        graphs.addAll(associatedPipelineExecutor.getPipeline().getActions());
-        graphs.addAll(associatedPipelineExecutor.getPipeline().getSepas());
 
-        List<SpDataSet> dataSets = PipelineUtils.findDataSets(associatedPipelineExecutor.getPipeline());
+        List<SpDataSet> dataSets = PipelineUtils.findDataSets(pipelineExecutor.getPipeline());
 
-        // store new pipeline and relays
-        StorageUtils.storeInvocationGraphs(associatedPipelineExecutor.getPipeline().getPipelineId(), graphs, dataSets);
-        StorageUtils.deleteDataStreamRelayContainer(associatedPipelineExecutor.getRelaysToBeDeleted());
-        StorageUtils.storeDataStreamRelayContainer(associatedPipelineExecutor.getRelaysToBePersisted());
-        return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+        StorageUtils.storeInvocationGraphs(pipelineExecutor.getPipeline().getPipelineId(),
+                pipelineExecutor.getGraphs().getEntitiesToStore(), dataSets);
+        StorageUtils.deleteDataStreamRelayContainer(pipelineExecutor.getRelays().getEntitiesToDelete());
+        StorageUtils.storeDataStreamRelayContainer(pipelineExecutor.getRelays().getEntitiesToStore());
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        return null;
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        return null;
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
index b0f47bb..960aec0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
@@ -34,12 +34,12 @@ public class StorePipelineOperation extends PipelineExecutionOperation{
 
     @Override
     public PipelineOperationStatus executeOperation() {
-        PipelineOperationStatus status = StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
-        Pipeline pipeline = associatedPipelineExecutor.getPipeline();
+        PipelineOperationStatus status = StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
+        Pipeline pipeline = pipelineExecutor.getPipeline();
         try{
-            StorageUtils.storeInvocationGraphs(pipeline.getPipelineId(), associatedPipelineExecutor.getGraphs(),
-                    associatedPipelineExecutor.getDataSets());
-            StorageUtils.storeDataStreamRelayContainer(associatedPipelineExecutor.getRelays());
+            StorageUtils.storeInvocationGraphs(pipeline.getPipelineId(), pipelineExecutor.getGraphs().getEntitiesToStore(),
+                    pipelineExecutor.getDataSets().getEntitiesToStore());
+            StorageUtils.storeDataStreamRelayContainer(pipelineExecutor.getRelays().getEntitiesToStore());
 
             PipelineStatusManager.addPipelineStatus(
                     pipeline.getPipelineId(),
@@ -48,7 +48,7 @@ public class StorePipelineOperation extends PipelineExecutionOperation{
                             PipelineStatusMessageType.PIPELINE_STARTED.title(),
                             PipelineStatusMessageType.PIPELINE_STARTED.description()));
 
-            if (associatedPipelineExecutor.isStoreStatus()) StorageUtils.setPipelineStarted(pipeline);
+            if (pipelineExecutor.isStoreStatus()) StorageUtils.setPipelineStarted(pipeline);
         }catch (Exception e){
             status.setSuccess(false);
             status.setTitle(e.getMessage());
@@ -59,11 +59,11 @@ public class StorePipelineOperation extends PipelineExecutionOperation{
 
     @Override
     public PipelineOperationStatus rollbackOperationPartially() {
-        return null;
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 
     @Override
     public PipelineOperationStatus rollbackOperationFully() {
-        return null;
+        return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/DataSetUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/DataSetUtils.java
new file mode 100644
index 0000000..0f8deee
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/DataSetUtils.java
@@ -0,0 +1,20 @@
+package org.apache.streampipes.manager.execution.pipeline.executor.utils;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DataSetUtils {
+
+    public static List<SpDataSet> filterDataSetsById(List<SpDataSet> dataSets,
+                                                     Set<String> dataSetIds) {
+        //TODO: Check if DatasetInvocationId is the correct id to check for
+        return dataSets.stream().
+                filter(dataSet -> dataSetIds.contains(dataSet.getDatasetInvocationId()))
+                .collect(Collectors.toList());
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/PipelineElementUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/PipelineElementUtils.java
index 58f731c..83e4712 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/PipelineElementUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/PipelineElementUtils.java
@@ -193,4 +193,11 @@ public class PipelineElementUtils {
                 .filter(r -> r.getOutputStreamRelays().size() > 0)
                 .collect(Collectors.toList());
     }
+
+    public static List<InvocableStreamPipesEntity> filterPipelineElementsById(List<InvocableStreamPipesEntity> pipelineElements,
+                                                                    Set<String> pipelineElementIds) {
+        return pipelineElements.stream().
+                filter(pipelineElement -> pipelineElementIds.contains(pipelineElement.getDeploymentRunningInstanceId()))
+                .collect(Collectors.toList());
+    }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
index e88707e..ac0b210 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
@@ -233,4 +233,11 @@ public class RelayUtils {
                                 String deploymentRunningInstanceId) {
         return relays.stream().anyMatch(r -> r.getRunningStreamRelayInstanceId().equals(deploymentRunningInstanceId));
     }
+
+    public static List<SpDataStreamRelayContainer> filterRelaysById(List<SpDataStreamRelayContainer> relays,
+                                                                    Set<String> relayIds) {
+        return relays.stream().
+                filter(relay -> relayIds.contains(relay.getRunningStreamRelayInstanceId()))
+                .collect(Collectors.toList());
+    }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
index 713045b..c4d8a4f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
@@ -47,7 +47,7 @@ public class StatusUtils {
             status.setTitle(partialStatus.getTitle());
     }
 
-    public static Set<String> extractUniqueRelayIds(PipelineOperationStatus status) {
+    public static Set<String> extractUniqueSuccessfulIds(PipelineOperationStatus status) {
         return status.getElementStatus().stream()
                 .filter(PipelineElementStatus::isSuccess)
                 .map(PipelineElementStatus::getElementId)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
index bf12bb9..e2dd478 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
@@ -87,7 +87,6 @@ public class PipelineElementMigrationHandler {
                 swapPipelineElement(migrationPipeline, failedEntity);
             }
         });
-        //Why overwrite when changes are rolled back? TODO: Check if needed when pipeline is rolled back (else return)
         Operations.overwritePipeline(migrationPipeline);
     }
 
@@ -95,8 +94,6 @@ public class PipelineElementMigrationHandler {
         PipelineExecutor migrationExecutor = PipelineExecutorFactory.
                 createMigrationExecutor(migrationPipeline, visualize, storeStatus, monitor, currentPipeline, migrationEntity);
         return migrationExecutor.execute();
-        //return new PipelineMigrationExecutor(migrationPipeline, currentPipeline, migrationEntity, visualize,
-        //        storeStatus, monitor).migratePipelineElement();
     }
 
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
index 959530d..ba5b7cb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
@@ -60,8 +60,6 @@ public class PipelineElementReconfigurationHandler {
 
             if (entityStatus.isSuccess()) {
                 Operations.overwritePipeline(reconfiguredPipeline);
-            } else {
-                //TODO: Send old/existing configuration
             }
         });
     }
@@ -70,7 +68,6 @@ public class PipelineElementReconfigurationHandler {
         return PipelineExecutorFactory
                 .createReconfigurationExecutor(this.storedPipeline, false, false, false, this.reconfiguredPipeline, entity)
                 .execute();
-        //return new PipelineElementReconfigurationExecutor(reconfiguredPipeline, entity).reconfigurePipelineElement();
     }
 
     private List<PipelineElementReconfigurationEntity> comparePipelinesAndGetReconfiguration() {
@@ -103,12 +100,7 @@ public class PipelineElementReconfigurationHandler {
 
     private PipelineElementReconfigurationEntity reconfigurationEntity(DataProcessorInvocation graph,
                                                                        List<StaticProperty> adaptedStaticProperty) {
-        PipelineElementReconfigurationEntity entity = new PipelineElementReconfigurationEntity();
-        entity.setDeploymentRunningInstanceId(graph.getDeploymentRunningInstanceId());
-        entity.setPipelineElementName(graph.getName());
-        entity.setDeploymentTargetNodeId(graph.getDeploymentTargetNodeId());
-        entity.setDeploymentTargetNodeHostname(graph.getDeploymentTargetNodeHostname());
-        entity.setDeploymentTargetNodePort(graph.getDeploymentTargetNodePort());
+        PipelineElementReconfigurationEntity entity = new PipelineElementReconfigurationEntity(graph);
         entity.setReconfiguredStaticProperties(adaptedStaticProperty);
         return entity;
     }