You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/01/15 21:07:50 UTC

[streampipes] 01/01: Improve structure of pipeline execution management (#1096)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch SP-1096
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit ad423278cc53b9c8b9a1a35f01191c709ad47eb4
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Jan 15 22:07:21 2023 +0100

    Improve structure of pipeline execution management (#1096)
---
 .../org/apache/streampipes/model/SpDataSet.java    |  15 +-
 .../streampipes/model/api/EndpointSelectable.java  |  17 +-
 .../model/base/InvocableStreamPipesEntity.java     |  18 +-
 .../model/message/PipelineStatusMessage.java       |   9 +-
 .../model/pipeline/PipelineOperationStatus.java    |  10 +-
 .../manager/execution/PipelineExecutionInfo.java   | 106 +++++++++
 .../execution/PipelineExecutionTaskFactory.java    |  60 +++++
 .../manager/execution/PipelineExecutor.java        |  57 +++++
 .../ExtensionsServiceEndpointProvider.java         |  62 ++++++
 .../http/DetachHttpRequest.java}                   |  26 ++-
 .../http/DetachPipelineElementSubmitter.java       |  53 +++++
 .../manager/execution/http/GraphSubmitter.java     | 132 -----------
 .../manager/execution/http/HttpRequestBuilder.java |  98 ---------
 .../manager/execution/http/InvokeHttpRequest.java  |  54 +++++
 .../http/InvokePipelineElementSubmitter.java       |  81 +++++++
 .../execution/http/PipelineElementHttpRequest.java |  73 +++++++
 .../execution/http/PipelineElementSubmitter.java   |  82 +++++++
 .../manager/execution/http/PipelineExecutor.java   | 243 ---------------------
 .../provider/CurrentPipelineElementProvider.java}  |  19 +-
 .../provider/PipelineElementProvider.java}         |  12 +-
 .../provider/StoredPipelineElementProvider.java}   |  20 +-
 .../execution/task/AfterInvocationTask.java        |  65 ++++++
 .../execution/task/DiscoverEndpointsTask.java      |  88 ++++++++
 .../task/PipelineExecutionTask.java}               |  20 +-
 .../task/SecretEncryptionTask.java}                |  24 +-
 .../execution/task/StorePipelineStatusTask.java    |  76 +++++++
 .../manager/execution/task/SubmitRequestTask.java  |  51 +++++
 .../manager/execution/task/UpdateGroupIdTask.java  |  46 ++++
 .../manager/health/PipelineHealthCheck.java        |  10 +-
 .../streampipes/manager/operations/Operations.java |  22 +-
 .../manager/preview/PipelinePreview.java           |  10 +-
 .../http => storage}/PipelineStorageService.java   |   2 +-
 .../RunningPipelineElementStorage.java}            |   8 +-
 .../streampipes/manager/storage/UserService.java   | 101 ---------
 .../rest/core/base/impl/AbstractRestResource.java  |   5 -
 35 files changed, 1093 insertions(+), 682 deletions(-)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java
index b0a9dff08..cfcbf77e3 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java
@@ -17,10 +17,13 @@
  */
 package org.apache.streampipes.model;
 
+import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.schema.EventSchema;
 
-public class SpDataSet extends SpDataStream {
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+public class SpDataSet extends SpDataStream implements EndpointSelectable {
 
   private EventGrounding supportedGrounding;
 
@@ -90,18 +93,28 @@ public class SpDataSet extends SpDataStream {
     this.datasetInvocationId = datasetInvocationId;
   }
 
+  @Override
   public String getCorrespondingPipeline() {
     return correspondingPipeline;
   }
 
+  @Override
   public void setCorrespondingPipeline(String correspondingPipeline) {
     this.correspondingPipeline = correspondingPipeline;
   }
 
+  @Override
+  @JsonIgnore
+  public String getDetachPath() {
+    return "/" + getCorrespondingAdapterId() + "/" + getDatasetInvocationId();
+  }
+
+  @Override
   public String getSelectedEndpointUrl() {
     return selectedEndpointUrl;
   }
 
+  @Override
   public void setSelectedEndpointUrl(String selectedEndpointUrl) {
     this.selectedEndpointUrl = selectedEndpointUrl;
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java
similarity index 64%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java
index 085d17e44..585400010 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java
@@ -16,19 +16,18 @@
  *
  */
 
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.model.api;
 
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+public interface EndpointSelectable {
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+  String getName();
 
-public class TemporaryGraphStorage {
+  String getSelectedEndpointUrl();
+  void setSelectedEndpointUrl(String selectedEndpointUrl);
 
-  public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
+  String getCorrespondingPipeline();
 
-  public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+  void setCorrespondingPipeline(String pipelineId);
 
+  String getDetachPath();
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 3a87cc5ba..0d64297fc 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -18,19 +18,21 @@
 
 package org.apache.streampipes.model.base;
 
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
 import org.apache.streampipes.logging.LoggerFactory;
 import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.model.util.Cloner;
 
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 
-public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
+import java.util.List;
 
-  private static final long serialVersionUID = 2727573914765473470L;
+public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity implements EndpointSelectable {
 
   protected List<SpDataStream> inputStreams;
 
@@ -120,10 +122,12 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
     this.supportedGrounding = supportedGrounding;
   }
 
+  @Override
   public String getCorrespondingPipeline() {
     return correspondingPipeline;
   }
 
+  @Override
   public void setCorrespondingPipeline(String correspondingPipeline) {
     this.correspondingPipeline = correspondingPipeline;
   }
@@ -168,14 +172,22 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
     this.uncompleted = uncompleted;
   }
 
+  @Override
   public String getSelectedEndpointUrl() {
     return selectedEndpointUrl;
   }
 
+  @Override
   public void setSelectedEndpointUrl(String selectedEndpointUrl) {
     this.selectedEndpointUrl = selectedEndpointUrl;
   }
 
+  @Override
+  @JsonIgnore
+  public String getDetachPath() {
+    return "/" + InstanceIdExtractor.extractId(getElementId());
+  }
+
   //public Logger getLogger(Class clazz, PeConfig peConfig) {
   public Logger getLogger(Class clazz) {
     //return LoggerFactory.getPeLogger(clazz, getCorrespondingPipeline(), getUri(), peConfig);
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java
index a688a101c..6fa8f7816 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java
@@ -28,13 +28,14 @@ public class PipelineStatusMessage {
   private String messageType;
   private String message;
 
-  public PipelineStatusMessage(String pipelineId, long timestamp,
-                               String messageType, String message) {
+  public PipelineStatusMessage(String pipelineId,
+                               long timestamp,
+                               PipelineStatusMessageType message) {
     super();
     this.pipelineId = pipelineId;
     this.timestamp = timestamp;
-    this.messageType = messageType;
-    this.message = message;
+    this.messageType = message.title();
+    this.message = message.description();
   }
 
   public String getPipelineId() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java
index 25b3c5b27..8e42372e2 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java
@@ -33,7 +33,9 @@ public class PipelineOperationStatus {
 
   private List<PipelineElementStatus> elementStatus;
 
-  public PipelineOperationStatus(String pipelineId, String pipelineName, String title,
+  public PipelineOperationStatus(String pipelineId,
+                                 String pipelineName,
+                                 String title,
                                  List<PipelineElementStatus> elementStatus) {
     super();
     this.title = title;
@@ -46,6 +48,12 @@ public class PipelineOperationStatus {
     this.elementStatus = new ArrayList<>();
   }
 
+  public PipelineOperationStatus(String pipelineId, String pipelineName) {
+    this();
+    this.pipelineId = pipelineId;
+    this.pipelineName = pipelineName;
+  }
+
   public String getPipelineId() {
     return pipelineId;
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
new file mode 100644
index 000000000..ba13ab7b0
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class PipelineExecutionInfo {
+
+  private final List<NamedStreamPipesEntity> failedServices;
+  private final List<InvocableStreamPipesEntity> processorsAndSinks;
+  private final List<SpDataSet> dataSets;
+
+  private PipelineOperationStatus pipelineOperationStatus;
+
+  private final String pipelineId;
+
+  public static PipelineExecutionInfo create(Pipeline pipeline) {
+    return new PipelineExecutionInfo(pipeline);
+  }
+
+  private PipelineExecutionInfo(Pipeline pipeline) {
+    this.failedServices = new ArrayList<>();
+    this.processorsAndSinks = findProcessorsAndSinks(pipeline);
+    this.dataSets = findDataSets(pipeline);
+    this.pipelineOperationStatus = new PipelineOperationStatus();
+    this.pipelineId = pipeline.getPipelineId();
+  }
+
+  private List<InvocableStreamPipesEntity> findProcessorsAndSinks(Pipeline pipeline) {
+    return Stream
+        .concat(
+            pipeline.getSepas().stream(),
+            pipeline.getActions().stream()
+        ).collect(Collectors.toList());
+  }
+
+  private List<SpDataSet> findDataSets(Pipeline pipeline) {
+    return pipeline
+        .getStreams()
+        .stream()
+        .filter(s -> s instanceof SpDataSet)
+        .map(s -> new SpDataSet((SpDataSet) s))
+        .toList();
+  }
+
+  public void addFailedPipelineElement(NamedStreamPipesEntity failedElement) {
+    this.failedServices.add(failedElement);
+  }
+
+  public void addDataSets(List<SpDataSet> dataSets) {
+    this.dataSets.addAll(dataSets);
+  }
+
+  public List<SpDataSet> getDataSets() {
+    return dataSets;
+  }
+
+  public List<NamedStreamPipesEntity> getFailedServices() {
+    return failedServices;
+  }
+
+  public List<InvocableStreamPipesEntity> getProcessorsAndSinks() {
+    return processorsAndSinks;
+  }
+
+  public void applyPipelineOperationStatus(PipelineOperationStatus status) {
+    this.pipelineOperationStatus = status;
+  }
+
+  public PipelineOperationStatus getPipelineOperationStatus() {
+    return this.pipelineOperationStatus;
+  }
+
+  public String getPipelineId() {
+    return pipelineId;
+  }
+
+  public boolean isOperationSuccessful() {
+    return failedServices.size() == 0 && pipelineOperationStatus.isSuccess();
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
new file mode 100644
index 000000000..e290c485d
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution;
+
+import org.apache.streampipes.manager.execution.http.DetachPipelineElementSubmitter;
+import org.apache.streampipes.manager.execution.http.InvokePipelineElementSubmitter;
+import org.apache.streampipes.manager.execution.provider.CurrentPipelineElementProvider;
+import org.apache.streampipes.manager.execution.provider.StoredPipelineElementProvider;
+import org.apache.streampipes.manager.execution.task.AfterInvocationTask;
+import org.apache.streampipes.manager.execution.task.DiscoverEndpointsTask;
+import org.apache.streampipes.manager.execution.task.SubmitRequestTask;
+import org.apache.streampipes.manager.execution.task.PipelineExecutionTask;
+import org.apache.streampipes.manager.execution.task.SecretEncryptionTask;
+import org.apache.streampipes.manager.execution.task.StorePipelineStatusTask;
+import org.apache.streampipes.manager.execution.task.UpdateGroupIdTask;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.resource.management.secret.SecretProvider;
+
+import java.util.List;
+
+public class PipelineExecutionTaskFactory {
+
+  public static List<PipelineExecutionTask> makeStartPipelineTasks(Pipeline pipeline) {
+    return List.of(
+        new UpdateGroupIdTask(),
+        new SecretEncryptionTask(SecretProvider.getDecryptionService()),
+        new DiscoverEndpointsTask(),
+        new SubmitRequestTask(new InvokePipelineElementSubmitter(pipeline), new CurrentPipelineElementProvider()),
+        new SecretEncryptionTask(SecretProvider.getEncryptionService()),
+        new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STARTED),
+        new StorePipelineStatusTask(true, false)
+    );
+  }
+
+  public static List<PipelineExecutionTask> makeStopPipelineTasks(Pipeline pipeline,
+                                                                  boolean forceStop) {
+    return List.of(
+        new SubmitRequestTask(new DetachPipelineElementSubmitter(pipeline), new StoredPipelineElementProvider()),
+        new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STOPPED),
+        new StorePipelineStatusTask(false, forceStop)
+    );
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
new file mode 100644
index 000000000..bc14e46f5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution;
+
+import org.apache.streampipes.manager.execution.task.PipelineExecutionTask;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.List;
+
+public class PipelineExecutor {
+
+  private final Pipeline pipeline;
+  private final boolean forceStop;
+
+  public PipelineExecutor(Pipeline pipeline,
+                          boolean forceStop) {
+    this.pipeline = pipeline;
+    this.forceStop = forceStop;
+  }
+
+  public PipelineOperationStatus startPipeline() {
+    return executeOperation(PipelineExecutionTaskFactory.makeStartPipelineTasks(pipeline));
+  }
+
+  public PipelineOperationStatus stopPipeline() {
+    return executeOperation(PipelineExecutionTaskFactory.makeStopPipelineTasks(pipeline, forceStop));
+  }
+
+  private PipelineOperationStatus executeOperation(List<PipelineExecutionTask> executionTasks) {
+    var executionInfo = PipelineExecutionInfo.create(pipeline);
+    executionTasks
+        .forEach(task -> {
+          if (task.shouldExecute(executionInfo)) {
+            task.executeTask(pipeline, executionInfo);
+          }
+        });
+    return executionInfo.getPipelineOperationStatus();
+  }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java
new file mode 100644
index 000000000..51760b252
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.endpoint;
+
+import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ExtensionsServiceEndpointProvider {
+
+  public String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException {
+    return new ExtensionsServiceEndpointGenerator(
+        g.getAppId(),
+        ExtensionsServiceEndpointUtils.getPipelineElementType(g))
+        .getEndpointResourceUrl();
+  }
+
+  public String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvailableException {
+    String appId = ds.getAppId() != null ? ds.getAppId() : ds.getCorrespondingAdapterId();
+    if (ds.isInternallyManaged()) {
+      return getConnectMasterSourcesUrl();
+    } else {
+      return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.DATA_SET)
+          .getEndpointResourceUrl();
+    }
+  }
+
+  private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException {
+    List<String> connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery()
+        .getServiceEndpoints(DefaultSpServiceGroups.CORE, true,
+            Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString()));
+    if (connectMasterEndpoints.size() > 0) {
+      return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT;
+    } else {
+      throw new NoServiceEndpointsAvailableException("Could not find any available connect master service endpoint");
+    }
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
similarity index 50%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
index 085d17e44..ac9b73705 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
@@ -16,19 +16,27 @@
  *
  */
 
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.api.EndpointSelectable;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.http.client.fluent.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class TemporaryGraphStorage {
 
-  public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
+public class DetachHttpRequest extends PipelineElementHttpRequest {
 
-  public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+  private static final Logger LOG = LoggerFactory.getLogger(DetachHttpRequest.class);
 
+  @Override
+  protected Request initRequest(EndpointSelectable pipelineElement, String endpointUrl) {
+    LOG.info("Detaching element: " + endpointUrl);
+    return Request.Delete(endpointUrl);
+  }
+
+  @Override
+  protected void logError(String endpointUrl, String pipelineElementName, String exceptionMessage) {
+    LOG.error("Could not stop pipeline element {} at {}: {}", endpointUrl, pipelineElementName, exceptionMessage);
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
new file mode 100644
index 000000000..47651cda3
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+
+import java.util.List;
+
+public class DetachPipelineElementSubmitter extends PipelineElementSubmitter {
+
+  public DetachPipelineElementSubmitter(Pipeline pipeline) {
+    super(pipeline);
+  }
+
+  @Override
+  protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) {
+    return performDetach(pipelineElement);
+  }
+
+  @Override
+  protected boolean shouldSubmitDataSets() {
+    return true;
+  }
+
+  @Override
+  protected void onSuccess() {
+    status.setTitle("Pipeline " + pipelineName + " successfully stopped");
+  }
+
+  @Override
+  protected void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks) {
+    status.setTitle("Could not stop all pipeline elements of pipeline " + pipelineName + ".");
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
deleted file mode 100644
index 59598fa14..000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public class GraphSubmitter {
-
-  private List<InvocableStreamPipesEntity> graphs;
-  private List<SpDataSet> dataSets;
-
-  private String pipelineId;
-  private String pipelineName;
-
-  private static final Logger LOG = LoggerFactory.getLogger(GraphSubmitter.class);
-
-  public GraphSubmitter(String pipelineId,
-                        String pipelineName,
-                        List<InvocableStreamPipesEntity> graphs,
-                        List<SpDataSet> dataSets) {
-    this.graphs = graphs != null ? graphs : new ArrayList<>();
-    this.pipelineId = pipelineId;
-    this.pipelineName = pipelineName;
-    this.dataSets = dataSets != null ? dataSets : new ArrayList<>();
-  }
-
-  public PipelineOperationStatus invokeGraphs() {
-    PipelineOperationStatus status = new PipelineOperationStatus();
-    status.setPipelineId(pipelineId);
-    status.setPipelineName(pipelineName);
-
-
-    graphs.forEach(g -> status.addPipelineElementStatus(performInvocation(g)));
-    if (status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)) {
-      dataSets.forEach(dataSet ->
-          status.addPipelineElementStatus
-              (performInvocation(dataSet)));
-    }
-    status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
-    if (status.isSuccess()) {
-      status.setTitle("Pipeline " + pipelineName + " successfully started");
-    } else {
-      LOG.info("Could not start pipeline, initializing rollback...");
-      rollbackInvokedPipelineElements(status);
-      status.setTitle("Could not start pipeline " + pipelineName + ".");
-    }
-    return status;
-  }
-
-  private void rollbackInvokedPipelineElements(PipelineOperationStatus status) {
-    for (PipelineElementStatus s : status.getElementStatus()) {
-      if (s.isSuccess()) {
-        Optional<InvocableStreamPipesEntity> graph = findGraph(s.getElementId());
-        graph.ifPresent(g -> {
-          LOG.info("Rolling back element " + g.getElementId());
-          performDetach(g);
-        });
-      }
-    }
-  }
-
-  private Optional<InvocableStreamPipesEntity> findGraph(String elementId) {
-    return graphs.stream().filter(g -> g.getBelongsTo().equals(elementId)).findFirst();
-  }
-
-  public PipelineOperationStatus detachGraphs() {
-    PipelineOperationStatus status = new PipelineOperationStatus();
-    status.setPipelineId(pipelineId);
-    status.setPipelineName(pipelineName);
-
-    graphs.forEach(g -> status.addPipelineElementStatus(performDetach(g)));
-    dataSets.forEach(dataSet -> status.addPipelineElementStatus(performDetach(dataSet)));
-    status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
-    if (status.isSuccess()) {
-      status.setTitle("Pipeline " + pipelineName + " successfully stopped");
-    } else {
-      status.setTitle("Could not stop all pipeline elements of pipeline " + pipelineName + ".");
-    }
-
-    return status;
-  }
-
-  private PipelineElementStatus performInvocation(InvocableStreamPipesEntity entity) {
-    String endpointUrl = entity.getSelectedEndpointUrl();
-    return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).invoke();
-  }
-
-  private PipelineElementStatus performInvocation(SpDataSet dataset) {
-    String endpointUrl = dataset.getSelectedEndpointUrl();
-    return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).invoke();
-  }
-
-  private PipelineElementStatus performDetach(InvocableStreamPipesEntity entity) {
-    String endpointUrl = entity.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(entity.getElementId());
-    return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).detach();
-  }
-
-  private PipelineElementStatus performDetach(SpDataSet dataset) {
-    String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getCorrespondingAdapterId() + "/"
-        + dataset.getDatasetInvocationId();
-    return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).detach();
-  }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
deleted file mode 100644
index d9dd5a313..000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.manager.util.AuthTokenUtils;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-
-import com.google.gson.JsonSyntaxException;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.entity.ContentType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class HttpRequestBuilder {
-
-  private final NamedStreamPipesEntity payload;
-  private final String endpointUrl;
-  private String pipelineId;
-
-  private static final Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class);
-
-  public HttpRequestBuilder(NamedStreamPipesEntity payload,
-                            String endpointUrl,
-                            String pipelineId) {
-    this.payload = payload;
-    this.endpointUrl = endpointUrl;
-    this.pipelineId = pipelineId;
-  }
-
-  public PipelineElementStatus invoke() {
-    LOG.info("Invoking element: " + endpointUrl);
-    try {
-      String jsonDocument = toJson();
-      Response httpResp =
-          Request.Post(endpointUrl)
-              .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId))
-              .bodyString(jsonDocument, ContentType.APPLICATION_JSON)
-              .connectTimeout(10000)
-              .execute();
-      return handleResponse(httpResp);
-    } catch (Exception e) {
-      LOG.error("Could not perform invocation request", e);
-      return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage());
-    }
-  }
-
-  public PipelineElementStatus detach() {
-    try {
-      Response httpResp = Request.Delete(endpointUrl)
-          .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId))
-          .connectTimeout(10000).execute();
-      return handleResponse(httpResp);
-    } catch (Exception e) {
-      LOG.error("Could not stop pipeline {}", endpointUrl, e);
-      return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage());
-    }
-  }
-
-  private PipelineElementStatus handleResponse(Response httpResp) throws JsonSyntaxException, IOException {
-    String resp = httpResp.returnContent().asString();
-    org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer
-        .getObjectMapper()
-        .readValue(resp, org.apache.streampipes.model.Response.class);
-    return convert(streamPipesResp);
-  }
-
-  private String toJson() throws Exception {
-    return JacksonSerializer.getObjectMapper().writeValueAsString(payload);
-  }
-
-  private PipelineElementStatus convert(org.apache.streampipes.model.Response response) {
-    return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(),
-        response.getOptionalMessage());
-  }
-
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java
new file mode 100644
index 000000000..1656e677b
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InvokeHttpRequest extends PipelineElementHttpRequest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InvokeHttpRequest.class);
+
+  @Override
+  protected Request initRequest(EndpointSelectable pipelineElement,
+                             String endpointUrl) throws JsonProcessingException {
+    LOG.info("Invoking element: " + endpointUrl);
+    return Request
+        .Post(endpointUrl)
+        .bodyString(toJson(pipelineElement), ContentType.APPLICATION_JSON);
+  }
+
+  @Override
+  protected void logError(String endpointUrl,
+                       String pipelineElementName,
+                       String exceptionMessage) {
+    LOG.error("Could not perform invocation request at {} for pipeline element {}: {}",
+        endpointUrl, pipelineElementName, exceptionMessage);
+  }
+
+  private String toJson(EndpointSelectable pipelineElement) throws JsonProcessingException {
+    return JacksonSerializer.getObjectMapper().writeValueAsString(pipelineElement);
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
new file mode 100644
index 000000000..d746d2319
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public class InvokePipelineElementSubmitter extends PipelineElementSubmitter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InvokePipelineElementSubmitter.class);
+
+  public InvokePipelineElementSubmitter(Pipeline pipeline) {
+    super(pipeline);
+  }
+
+  @Override
+  protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) {
+    String endpointUrl = pipelineElement.getSelectedEndpointUrl();
+    return new InvokeHttpRequest().execute(pipelineElement, endpointUrl, this.pipelineId);
+  }
+
+  @Override
+  protected boolean shouldSubmitDataSets() {
+    return isSuccess();
+  }
+
+  @Override
+  protected void onSuccess() {
+    status.setTitle("Pipeline " + pipelineName + " successfully started");
+  }
+
+  @Override
+  protected void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks) {
+    LOG.info("Could not start pipeline, initializing rollback...");
+    rollbackInvokedPipelineElements(status, processorsAndSinks);
+    status.setTitle("Could not start pipeline " + pipelineName + ".");
+  }
+
+  private void rollbackInvokedPipelineElements(PipelineOperationStatus status,
+                                               List<InvocableStreamPipesEntity> pe) {
+    for (PipelineElementStatus s : status.getElementStatus()) {
+      if (s.isSuccess()) {
+        Optional<InvocableStreamPipesEntity> graph = findPipelineElements(s.getElementId(), pe);
+        graph.ifPresent(this::performDetach);
+      }
+    }
+  }
+
+  private Optional<InvocableStreamPipesEntity> findPipelineElements(String elementId,
+                                                                    List<InvocableStreamPipesEntity> pe) {
+    return pe
+        .stream()
+        .filter(g -> g.getBelongsTo().equals(elementId))
+        .findFirst();
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java
new file mode 100644
index 000000000..400626e54
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.manager.util.AuthTokenUtils;
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.gson.JsonSyntaxException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+
+import java.io.IOException;
+
+public abstract class PipelineElementHttpRequest {
+
+  public PipelineElementStatus execute(EndpointSelectable pipelineElement,
+                                       String endpointUrl,
+                                       String pipelineId) {
+    try {
+      Response httpResp = initRequest(pipelineElement, endpointUrl)
+              .addHeader("Authorization", AuthTokenUtils.getAuthToken(pipelineId))
+              .connectTimeout(10000)
+              .execute();
+      return handleResponse(httpResp, pipelineElement, endpointUrl);
+    } catch (Exception e) {
+      logError(endpointUrl, pipelineElement.getName(), e.getMessage());
+      return new PipelineElementStatus(endpointUrl, pipelineElement.getName(), false, e.getMessage());
+    }
+  }
+
+  protected abstract Request initRequest(EndpointSelectable pipelineElement,
+                                      String endpointUrl) throws JsonProcessingException;
+
+  protected abstract void logError(String endpointUrl,
+                                String pipelineElementName,
+                                String exceptionMessage);
+
+  protected PipelineElementStatus handleResponse(Response httpResp,
+                                                 EndpointSelectable pipelineElement,
+                                                 String endpointUrl) throws JsonSyntaxException, IOException {
+    String resp = httpResp.returnContent().asString();
+    org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer
+        .getObjectMapper()
+        .readValue(resp, org.apache.streampipes.model.Response.class);
+    return convert(streamPipesResp, endpointUrl, pipelineElement.getName());
+  }
+
+  private PipelineElementStatus convert(org.apache.streampipes.model.Response response,
+                                        String endpointUrl,
+                                        String pipelineElementName) {
+    return new PipelineElementStatus(endpointUrl, pipelineElementName, response.isSuccess(),
+        response.getOptionalMessage());
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
new file mode 100644
index 000000000..8be496b13
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.List;
+
+public abstract class PipelineElementSubmitter {
+
+  protected final String pipelineId;
+  protected final String pipelineName;
+
+  protected final PipelineOperationStatus status;
+
+  public PipelineElementSubmitter(Pipeline pipeline) {
+    this.pipelineId = pipeline.getPipelineId();
+    this.pipelineName = pipeline.getName();
+    this.status = new PipelineOperationStatus(pipelineId, pipelineName);
+  }
+
+  public PipelineOperationStatus submit(List<InvocableStreamPipesEntity> processorsAndSinks,
+                                        List<SpDataSet> dataSets) {
+    // First, try handling all data processors and sinks
+    processorsAndSinks.forEach(g -> status.addPipelineElementStatus(submitElement(g)));
+
+    // Then,submit data sets always for detach operation and otherwise only in case of success
+    if (shouldSubmitDataSets()) {
+      dataSets.forEach(dataSet -> status.addPipelineElementStatus(submitElement(dataSet)));
+    }
+
+    applySuccess(processorsAndSinks);
+    return status;
+  }
+
+  protected boolean isSuccess() {
+    return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
+  }
+
+  protected void applySuccess(List<InvocableStreamPipesEntity> processorsAndSinks) {
+    status.setSuccess(isSuccess());
+    if (status.isSuccess()) {
+      this.onSuccess();
+    } else {
+      this.onFailure(processorsAndSinks);
+    }
+  }
+
+  protected PipelineElementStatus performDetach(EndpointSelectable pipelineElement) {
+    String endpointUrl = pipelineElement.getSelectedEndpointUrl() + pipelineElement.getDetachPath();
+    return new DetachHttpRequest().execute(pipelineElement, endpointUrl, this.pipelineId);
+  }
+
+  protected abstract PipelineElementStatus submitElement(EndpointSelectable pipelineElement);
+
+  protected abstract boolean shouldSubmitDataSets();
+
+  protected abstract void onSuccess();
+
+  protected abstract void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks);
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
deleted file mode 100644
index 9ee3b8467..000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.commons.MD5;
-import org.apache.streampipes.commons.Utils;
-import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
-import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
-import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.resource.management.secret.SecretProvider;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import org.lightcouch.DocumentConflictException;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class PipelineExecutor {
-
-  private final Pipeline pipeline;
-  private final boolean storeStatus;
-  private final boolean forceStop;
-
-  public PipelineExecutor(Pipeline pipeline,
-                          boolean storeStatus,
-                          boolean forceStop) {
-    this.pipeline = pipeline;
-    this.storeStatus = storeStatus;
-    this.forceStop = forceStop;
-  }
-
-  public PipelineOperationStatus startPipeline() {
-
-    pipeline.getSepas().forEach(this::updateGroupIds);
-    pipeline.getActions().forEach(this::updateGroupIds);
-
-    List<DataProcessorInvocation> sepas = pipeline.getSepas();
-    List<DataSinkInvocation> secs = pipeline.getActions();
-
-    List<SpDataSet> dataSets = pipeline
-        .getStreams()
-        .stream()
-        .filter(s -> s instanceof SpDataSet)
-        .map(s -> new SpDataSet((SpDataSet) s))
-        .collect(Collectors.toList());
-
-    List<NamedStreamPipesEntity> failedServices = new ArrayList<>();
-
-    dataSets.forEach(ds -> {
-      ds.setCorrespondingPipeline(pipeline.getPipelineId());
-      try {
-        ds.setSelectedEndpointUrl(findSelectedEndpoint(ds));
-      } catch (NoServiceEndpointsAvailableException e) {
-        failedServices.add(ds);
-      }
-    });
-
-    List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-    graphs.addAll(sepas);
-    graphs.addAll(secs);
-
-    decryptSecrets(graphs);
-
-    graphs.forEach(g -> {
-      try {
-        g.setSelectedEndpointUrl(findSelectedEndpoint(g));
-        g.setCorrespondingPipeline(pipeline.getPipelineId());
-      } catch (NoServiceEndpointsAvailableException e) {
-        failedServices.add(g);
-      }
-    });
-
-    PipelineOperationStatus status;
-    if (failedServices.size() == 0) {
-
-      status = new GraphSubmitter(pipeline.getPipelineId(),
-          pipeline.getName(), graphs, dataSets)
-          .invokeGraphs();
-
-      encryptSecrets(graphs);
-
-      if (status.isSuccess()) {
-        storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-
-        PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
-            new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(),
-                PipelineStatusMessageType.PIPELINE_STARTED.title(),
-                PipelineStatusMessageType.PIPELINE_STARTED.description()));
-
-        if (storeStatus) {
-          pipeline.setHealthStatus(PipelineHealthStatus.OK);
-          setPipelineStarted(pipeline);
-        }
-      }
-    } else {
-      List<PipelineElementStatus> pe = failedServices.stream().map(fs ->
-          new PipelineElementStatus(fs.getElementId(),
-              fs.getName(),
-              false,
-              "No active supporting service found")).collect(Collectors.toList());
-      status = new PipelineOperationStatus(pipeline.getPipelineId(),
-          pipeline.getName(),
-          "Could not start pipeline " + pipeline.getName() + ".",
-          pe);
-    }
-    return status;
-  }
-
-  private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException {
-    return new ExtensionsServiceEndpointGenerator(
-        g.getAppId(),
-        ExtensionsServiceEndpointUtils.getPipelineElementType(g))
-        .getEndpointResourceUrl();
-  }
-
-  private String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvailableException {
-    String appId = ds.getAppId() != null ? ds.getAppId() : ds.getCorrespondingAdapterId();
-    if (ds.isInternallyManaged()) {
-      return getConnectMasterSourcesUrl();
-    } else {
-      return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.DATA_SET)
-          .getEndpointResourceUrl();
-    }
-  }
-
-  private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException {
-    List<String> connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery()
-        .getServiceEndpoints(DefaultSpServiceGroups.CORE, true,
-            Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString()));
-    if (connectMasterEndpoints.size() > 0) {
-      return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT;
-    } else {
-      throw new NoServiceEndpointsAvailableException("Could not find any available connect master service endpoint");
-    }
-  }
-
-  private void updateGroupIds(InvocableStreamPipesEntity entity) {
-    entity.getInputStreams()
-        .stream()
-        .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
-        .map(is -> is.getEventGrounding().getTransportProtocol())
-        .map(KafkaTransportProtocol.class::cast)
-        .forEach(tp -> tp.setGroupId(Utils.filterSpecialChar(pipeline.getName()) + MD5.crypt(tp.getElementId())));
-  }
-
-  private void decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-    SecretProvider.getDecryptionService().apply(graphs);
-  }
-
-  private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-    SecretProvider.getEncryptionService().apply(graphs);
-  }
-
-  public PipelineOperationStatus stopPipeline() {
-    List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
-    List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
-
-    PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
-        pipeline.getName(), graphs, dataSets)
-        .detachGraphs();
-
-    if (status.isSuccess()) {
-      PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
-          new PipelineStatusMessage(pipeline.getPipelineId(),
-              System.currentTimeMillis(),
-              PipelineStatusMessageType.PIPELINE_STOPPED.title(),
-              PipelineStatusMessageType.PIPELINE_STOPPED.description()));
-
-    }
-
-    if (status.isSuccess() || forceStop) {
-      if (storeStatus) {
-        setPipelineStopped(pipeline);
-      }
-    }
-    return status;
-  }
-
-  private void setPipelineStarted(Pipeline pipeline) {
-    pipeline.setRunning(true);
-    pipeline.setStartedAt(new Date().getTime());
-    try {
-      getPipelineStorageApi().updatePipeline(pipeline);
-    } catch (DocumentConflictException dce) {
-      //dce.printStackTrace();
-    }
-  }
-
-  private void setPipelineStopped(Pipeline pipeline) {
-    pipeline.setRunning(false);
-    getPipelineStorageApi().updatePipeline(pipeline);
-  }
-
-  private void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
-                                     List<SpDataSet> dataSets) {
-    TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
-    TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
-  }
-
-  private IPipelineStorage getPipelineStorageApi() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
-  }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
similarity index 64%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
index 085d17e44..282f08537 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
@@ -16,19 +16,22 @@
  *
  */
 
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.provider;
 
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-public class TemporaryGraphStorage {
-
-  public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
-  public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+public class CurrentPipelineElementProvider implements PipelineElementProvider {
+  @Override
+  public List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
+    return executionInfo.getProcessorsAndSinks();
+  }
 
+  @Override
+  public List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo) {
+    return executionInfo.getDataSets();
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
similarity index 73%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
index 085d17e44..d8f724801 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
@@ -16,19 +16,17 @@
  *
  */
 
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.provider;
 
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-public class TemporaryGraphStorage {
+public interface PipelineElementProvider {
 
-  public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
-  public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+  List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo);
 
+  List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo);
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
similarity index 57%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
index 085d17e44..a41adbde7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
@@ -16,19 +16,23 @@
  *
  */
 
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.provider;
 
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-public class TemporaryGraphStorage {
-
-  public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
-  public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+public class StoredPipelineElementProvider implements PipelineElementProvider {
+  @Override
+  public List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
+    return RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId());
+  }
 
+  @Override
+  public List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo) {
+    return RunningPipelineElementStorage.runningDataSets.get(executionInfo.getPipelineId());
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
new file mode 100644
index 000000000..a7d967d2a
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
+import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.message.PipelineStatusMessage;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+import java.util.List;
+
+public class AfterInvocationTask implements PipelineExecutionTask {
+
+  private final PipelineStatusMessageType statusMessageType;
+
+  public AfterInvocationTask(PipelineStatusMessageType statusMessageType) {
+    this.statusMessageType = statusMessageType;
+  }
+
+  @Override
+  public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
+    return executionInfo.isOperationSuccessful();
+  }
+
+  @Override
+  public void executeTask(Pipeline pipeline,
+                          PipelineExecutionInfo executionInfo) {
+    var graphs = executionInfo.getProcessorsAndSinks();
+    var dataSets = executionInfo.getDataSets();
+    storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+    addPipelineStatus(pipeline);
+  }
+
+  private void storeInvocationGraphs(String pipelineId,
+                                     List<InvocableStreamPipesEntity> graphs,
+                                     List<SpDataSet> dataSets) {
+    RunningPipelineElementStorage.runningProcessorsAndSinks.put(pipelineId, graphs);
+    RunningPipelineElementStorage.runningDataSets.put(pipelineId, dataSets);
+  }
+
+  private void addPipelineStatus(Pipeline pipeline) {
+    PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
+        new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), statusMessageType));
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
new file mode 100644
index 000000000..713644650
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointProvider;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DiscoverEndpointsTask implements PipelineExecutionTask {
+  @Override
+  public void executeTask(Pipeline pipeline,
+                          PipelineExecutionInfo executionInfo) {
+    var processorsAndSinks = executionInfo.getProcessorsAndSinks();
+    var dataSets = executionInfo.getDataSets();
+
+    processorsAndSinks.forEach(el -> {
+      try {
+        var endpointUrl = findSelectedEndpoint(el);
+        applyEndpointAndPipeline(pipeline.getPipelineId(), el, endpointUrl);
+      } catch (NoServiceEndpointsAvailableException e) {
+        executionInfo.addFailedPipelineElement(el);
+      }
+    });
+    dataSets.forEach(ds -> {
+      try {
+        var endpointUrl = findSelectedDsEndpoint(ds);
+        applyEndpointAndPipeline(pipeline.getPipelineId(), ds, endpointUrl);
+      } catch (NoServiceEndpointsAvailableException e) {
+        executionInfo.addFailedPipelineElement(ds);
+      }
+    });
+
+    var failedServices = executionInfo.getFailedServices();
+    if (executionInfo.getFailedServices().size() > 0) {
+      List<PipelineElementStatus> pe = failedServices
+          .stream()
+          .map(fs -> new PipelineElementStatus(fs.getElementId(), fs.getName(), false,
+              "No active extensions service found which provides this pipeline element"))
+          .collect(Collectors.toList());
+      var status = new PipelineOperationStatus(pipeline.getPipelineId(),
+          pipeline.getName(),
+          "Could not start pipeline " + pipeline.getName() + ".",
+          pe);
+      executionInfo.applyPipelineOperationStatus(status);
+    }
+  }
+
+  private void applyEndpointAndPipeline(String pipelineId,
+                                        EndpointSelectable pipelineElement,
+                                        String endpointUrl) {
+    pipelineElement.setSelectedEndpointUrl(endpointUrl);
+    pipelineElement.setCorrespondingPipeline(pipelineId);
+  }
+
+  private String findSelectedEndpoint(InvocableStreamPipesEntity pipelineElement)
+      throws NoServiceEndpointsAvailableException {
+    return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(pipelineElement);
+  }
+
+  private String findSelectedDsEndpoint(SpDataSet dataSet) throws NoServiceEndpointsAvailableException {
+    return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(dataSet);
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java
similarity index 64%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java
index 085d17e44..096456061 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java
@@ -16,19 +16,17 @@
  *
  */
 
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.task;
 
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public interface PipelineExecutionTask {
 
-public class TemporaryGraphStorage {
-
-  public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
-  public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+  default boolean shouldExecute(PipelineExecutionInfo executionInfo) {
+    return true;
+  }
 
+  void executeTask(Pipeline pipeline,
+                   PipelineExecutionInfo executionInfo);
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java
similarity index 54%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java
index 085d17e44..b530f93c8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java
@@ -16,19 +16,23 @@
  *
  */
 
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.task;
 
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.resource.management.secret.SecretService;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public class SecretEncryptionTask implements PipelineExecutionTask {
 
-public class TemporaryGraphStorage {
+  private final SecretService secretService;
 
-  public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
-  public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+  public SecretEncryptionTask(SecretService secretService) {
+    this.secretService = secretService;
+  }
 
+  @Override
+  public void executeTask(Pipeline pipeline,
+                          PipelineExecutionInfo executionInfo) {
+    this.secretService.apply(executionInfo.getProcessorsAndSinks());
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
new file mode 100644
index 000000000..f01f99005
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import org.lightcouch.DocumentConflictException;
+
+import java.util.Date;
+
+public class StorePipelineStatusTask implements PipelineExecutionTask {
+
+  private final boolean start;
+  private final boolean forceStop;
+
+  public StorePipelineStatusTask(boolean start,
+                                 boolean forceStop) {
+    this.start = start;
+    this.forceStop = forceStop;
+  }
+
+  @Override
+  public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
+    return executionInfo.isOperationSuccessful() || forceStop;
+  }
+
+  @Override
+  public void executeTask(Pipeline pipeline,
+                          PipelineExecutionInfo executionInfo) {
+    if (this.start) {
+      pipeline.setHealthStatus(PipelineHealthStatus.OK);
+      setPipelineStarted(pipeline);
+    } else {
+      setPipelineStopped(pipeline);
+    }
+  }
+
+  private void setPipelineStarted(Pipeline pipeline) {
+    pipeline.setRunning(true);
+    pipeline.setStartedAt(new Date().getTime());
+    try {
+      getPipelineStorageApi().updatePipeline(pipeline);
+    } catch (DocumentConflictException dce) {
+      //dce.printStackTrace();
+    }
+  }
+
+  private void setPipelineStopped(Pipeline pipeline) {
+    pipeline.setRunning(false);
+    getPipelineStorageApi().updatePipeline(pipeline);
+  }
+
+  private IPipelineStorage getPipelineStorageApi() {
+    return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
new file mode 100644
index 000000000..c29d74909
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.manager.execution.http.PipelineElementSubmitter;
+import org.apache.streampipes.manager.execution.provider.PipelineElementProvider;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+public class SubmitRequestTask implements PipelineExecutionTask {
+
+  private final PipelineElementProvider elementProvider;
+  private final PipelineElementSubmitter submitter;
+
+  public SubmitRequestTask(PipelineElementSubmitter submitter,
+                           PipelineElementProvider elementProvider) {
+    this.elementProvider = elementProvider;
+    this.submitter = submitter;
+  }
+
+  @Override
+  public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
+    return executionInfo.getFailedServices().size() == 0;
+  }
+
+  @Override
+  public void executeTask(Pipeline pipeline, PipelineExecutionInfo executionInfo) {
+    var processorsAndSinks = elementProvider.getProcessorsAndSinks(executionInfo);
+    var dataSets = elementProvider.getDataSets(executionInfo);
+
+    var status = submitter.submit(processorsAndSinks, dataSets);
+
+    executionInfo.applyPipelineOperationStatus(status);
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java
new file mode 100644
index 000000000..19fd87910
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.commons.MD5;
+import org.apache.streampipes.commons.Utils;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+public class UpdateGroupIdTask implements PipelineExecutionTask {
+  @Override
+  public void executeTask(Pipeline pipeline,
+                          PipelineExecutionInfo executionInfo) {
+    var sanitizedPipelineName = Utils.filterSpecialChar(pipeline.getName());
+    pipeline.getSepas().forEach(processor -> updateGroupIds(processor, sanitizedPipelineName));
+    pipeline.getActions().forEach(sink -> updateGroupIds(sink, sanitizedPipelineName));
+  }
+
+  private void updateGroupIds(InvocableStreamPipesEntity entity,
+                              String sanitizedPipelineName) {
+    entity.getInputStreams()
+        .stream()
+        .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
+        .map(is -> is.getEventGrounding().getTransportProtocol())
+        .map(KafkaTransportProtocol.class::cast)
+        .forEach(tp -> tp.setGroupId(sanitizedPipelineName + MD5.crypt(tp.getElementId())));
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 335d76c46..c0e159df0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -22,8 +22,8 @@ import org.apache.streampipes.commons.constants.InstanceIdExtractor;
 import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.http.HttpRequestBuilder;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
+import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
+import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
@@ -67,7 +67,7 @@ public class PipelineHealthCheck implements Runnable {
         List<String> failedInstances = new ArrayList<>();
         List<String> recoveredInstances = new ArrayList<>();
         List<String> pipelineNotifications = new ArrayList<>();
-        List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
+        List<InvocableStreamPipesEntity> graphs = RunningPipelineElementStorage.runningProcessorsAndSinks.get(pipeline.getPipelineId());
         graphs.forEach(graph -> {
           String instanceId = extractInstanceId(graph);
           if (allRunningInstances.stream().noneMatch(runningInstanceId -> runningInstanceId.equals(instanceId))) {
@@ -77,7 +77,7 @@ public class PipelineHealthCheck implements Runnable {
               boolean success;
               try {
                 endpointUrl = findEndpointUrl(graph);
-                success = new HttpRequestBuilder(graph, endpointUrl, pipeline.getPipelineId()).invoke().isSuccess();
+                success = new InvokeHttpRequest().execute(graph, endpointUrl, pipeline.getPipelineId()).isSuccess();
               } catch (NoServiceEndpointsAvailableException e) {
                 success = false;
               }
@@ -182,7 +182,7 @@ public class PipelineHealthCheck implements Runnable {
 
   private Map<String, List<InvocableStreamPipesEntity>> generateEndpointMap() {
     Map<String, List<InvocableStreamPipesEntity>> endpointMap = new HashMap<>();
-    TemporaryGraphStorage.graphStorage.forEach((pipelineId, graphs) ->
+    RunningPipelineElementStorage.runningProcessorsAndSinks.forEach((pipelineId, graphs) ->
         graphs.forEach(graph -> addEndpoint(endpointMap, graph)));
 
     return endpointMap;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 6e45801ca..3457d63ea 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -22,8 +22,8 @@ import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableExcepti
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.manager.endpoint.EndpointItemFetcher;
-import org.apache.streampipes.manager.execution.http.PipelineExecutor;
-import org.apache.streampipes.manager.execution.http.PipelineStorageService;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
+import org.apache.streampipes.manager.storage.PipelineStorageService;
 import org.apache.streampipes.manager.matching.DataSetGroundingSelector;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
 import org.apache.streampipes.manager.recommender.ElementRecommender;
@@ -96,19 +96,8 @@ public class Operations {
     new PipelineStorageService(pipeline).updatePipeline();
   }
 
-  public static PipelineOperationStatus startPipeline(
-      Pipeline pipeline) {
-    return startPipeline(pipeline, true);
-  }
-
-  public static PipelineOperationStatus startPipeline(
-      Pipeline pipeline, boolean storeStatus) {
-    return new PipelineExecutor(pipeline, storeStatus, false).startPipeline();
-  }
-
-  public static PipelineOperationStatus stopPipeline(
-      Pipeline pipeline, boolean forceStop) {
-    return stopPipeline(pipeline, true, forceStop);
+  public static PipelineOperationStatus startPipeline(Pipeline pipeline) {
+    return new PipelineExecutor(pipeline, false).startPipeline();
   }
 
   public static List<PipelineOperationStatus> stopAllPipelines(boolean forceStop) {
@@ -125,9 +114,8 @@ public class Operations {
   }
 
   public static PipelineOperationStatus stopPipeline(Pipeline pipeline,
-                                                     boolean storeStatus,
                                                      boolean forceStop) {
-    return new PipelineExecutor(pipeline, storeStatus, forceStop).stopPipeline();
+    return new PipelineExecutor(pipeline, forceStop).stopPipeline();
   }
 
   public static List<ExtensionsServiceEndpointItem> getEndpointUriContents(List<ExtensionsServiceEndpoint> endpoints) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index bb37f1e73..7b5e8e67d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -17,11 +17,11 @@
  */
 package org.apache.streampipes.manager.preview;
 
-import org.apache.streampipes.commons.constants.InstanceIdExtractor;
 import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.http.HttpRequestBuilder;
+import org.apache.streampipes.manager.execution.http.DetachHttpRequest;
+import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
 import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.SpDataSet;
@@ -92,7 +92,7 @@ public class PipelinePreview {
     graphs.forEach(g -> {
       try {
         g.setSelectedEndpointUrl(findSelectedEndpoint(g));
-        new HttpRequestBuilder(g, g.getSelectedEndpointUrl(), null).invoke();
+        new InvokeHttpRequest().execute(g, g.getSelectedEndpointUrl(), null);
       } catch (NoServiceEndpointsAvailableException e) {
         e.printStackTrace();
       }
@@ -101,8 +101,8 @@ public class PipelinePreview {
 
   private void detachGraphs(List<InvocableStreamPipesEntity> graphs) {
     graphs.forEach(g -> {
-      String endpointUrl = g.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(g.getElementId());
-      new HttpRequestBuilder(g, endpointUrl, null).detach();
+      String endpointUrl = g.getSelectedEndpointUrl() + g.getDetachPath();
+      new DetachHttpRequest().execute(g, endpointUrl, null);
     });
   }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
similarity index 98%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
index 5c42b5eb0..9d0f2b582 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.manager.execution.http;
+package org.apache.streampipes.manager.storage;
 
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphBuilder;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
similarity index 78%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
index 085d17e44..360e6db90 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.storage;
 
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -25,10 +25,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class TemporaryGraphStorage {
+public class RunningPipelineElementStorage {
 
-  public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
+  public static Map<String, List<InvocableStreamPipesEntity>> runningProcessorsAndSinks = new HashMap<>();
 
-  public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+  public static Map<String, List<SpDataSet>> runningDataSets = new HashMap<>();
 
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java
deleted file mode 100644
index 0523158ff..000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.storage;
-
-import org.apache.streampipes.model.client.user.Principal;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.storage.api.INoSqlStorage;
-import org.apache.streampipes.storage.api.IUserStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class UserService {
-
-  private IUserStorage userStorage;
-
-  public UserService(IUserStorage userStorage) {
-    this.userStorage = userStorage;
-  }
-
-  public List<Pipeline> getOwnPipelines(String email) {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines().stream().filter(p -> p
-            .getCreatedByUser()
-            .equals(email))
-        .collect(Collectors.toList());
-  }
-
-  public void deleteOwnSource(String username, String sourceId) {
-    if (checkUser(username)) {
-      Principal user = getPrincipal(username);
-      //user.getOwnSources().removeIf(a -> a.getElementId().equals(sourceId));
-      userStorage.updateUser(user);
-    }
-  }
-
-  /**
-   * Get actions/sepas/sources
-   */
-
-  public List<String> getOwnActionUris(String username) {
-    // TODO permissions
-    return new ArrayList<>();
-    //return userStorage.getUser(username)
-    // .getOwnActions().stream().map(r -> r.getElementId()).collect(Collectors.toList());
-  }
-
-  public List<String> getOwnSepaUris(String username) {
-    // TODO Permissions
-    return new ArrayList<>();
-    //return userStorage.getUser(username)
-    // .getOwnSepas().stream().map(r -> r.getElementId()).collect(Collectors.toList());
-  }
-
-
-  public List<String> getOwnSourceUris(String email) {
-    // TODO permissions
-    return new ArrayList<>();
-//    return userStorage
-//            .getUser(email)
-//            .getOwnSources()
-//            .stream()
-//            .map(r -> r.getElementId())
-//            .collect(Collectors.toList());
-  }
-
-  private Principal getPrincipal(String username) {
-    return userStorage.getUser(username);
-  }
-
-
-  /**
-   * @param username
-   * @return True if user exists exactly once, false otherwise
-   */
-  public boolean checkUser(String username) {
-    return userStorage.checkUser(username);
-  }
-
-  private INoSqlStorage getStorageManager() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore();
-  }
-
-}
diff --git a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
index 33b2607b9..10c74f619 100644
--- a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
+++ b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.rest.core.base.impl;
 
 import org.apache.streampipes.manager.endpoint.HttpJsonParser;
-import org.apache.streampipes.manager.storage.UserService;
 import org.apache.streampipes.model.message.ErrorMessage;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.Notification;
@@ -68,10 +67,6 @@ public abstract class AbstractRestResource extends AbstractSharedRestInterface {
     return getNoSqlStorage().getUserStorageAPI();
   }
 
-  protected UserService getUserService() {
-    return new UserService(getUserStorage());
-  }
-
   protected IVisualizationStorage getVisualizationStorage() {
     return getNoSqlStorage().getVisualizationStorageApi();
   }