You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/01/15 21:07:50 UTC
[streampipes] 01/01: Improve structure of pipeline execution management (#1096)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch SP-1096
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit ad423278cc53b9c8b9a1a35f01191c709ad47eb4
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Jan 15 22:07:21 2023 +0100
Improve structure of pipeline execution management (#1096)
---
.../org/apache/streampipes/model/SpDataSet.java | 15 +-
.../streampipes/model/api/EndpointSelectable.java | 17 +-
.../model/base/InvocableStreamPipesEntity.java | 18 +-
.../model/message/PipelineStatusMessage.java | 9 +-
.../model/pipeline/PipelineOperationStatus.java | 10 +-
.../manager/execution/PipelineExecutionInfo.java | 106 +++++++++
.../execution/PipelineExecutionTaskFactory.java | 60 +++++
.../manager/execution/PipelineExecutor.java | 57 +++++
.../ExtensionsServiceEndpointProvider.java | 62 ++++++
.../http/DetachHttpRequest.java} | 26 ++-
.../http/DetachPipelineElementSubmitter.java | 53 +++++
.../manager/execution/http/GraphSubmitter.java | 132 -----------
.../manager/execution/http/HttpRequestBuilder.java | 98 ---------
.../manager/execution/http/InvokeHttpRequest.java | 54 +++++
.../http/InvokePipelineElementSubmitter.java | 81 +++++++
.../execution/http/PipelineElementHttpRequest.java | 73 +++++++
.../execution/http/PipelineElementSubmitter.java | 82 +++++++
.../manager/execution/http/PipelineExecutor.java | 243 ---------------------
.../provider/CurrentPipelineElementProvider.java} | 19 +-
.../provider/PipelineElementProvider.java} | 12 +-
.../provider/StoredPipelineElementProvider.java} | 20 +-
.../execution/task/AfterInvocationTask.java | 65 ++++++
.../execution/task/DiscoverEndpointsTask.java | 88 ++++++++
.../task/PipelineExecutionTask.java} | 20 +-
.../task/SecretEncryptionTask.java} | 24 +-
.../execution/task/StorePipelineStatusTask.java | 76 +++++++
.../manager/execution/task/SubmitRequestTask.java | 51 +++++
.../manager/execution/task/UpdateGroupIdTask.java | 46 ++++
.../manager/health/PipelineHealthCheck.java | 10 +-
.../streampipes/manager/operations/Operations.java | 22 +-
.../manager/preview/PipelinePreview.java | 10 +-
.../http => storage}/PipelineStorageService.java | 2 +-
.../RunningPipelineElementStorage.java} | 8 +-
.../streampipes/manager/storage/UserService.java | 101 ---------
.../rest/core/base/impl/AbstractRestResource.java | 5 -
35 files changed, 1093 insertions(+), 682 deletions(-)
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java
index b0a9dff08..cfcbf77e3 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java
@@ -17,10 +17,13 @@
*/
package org.apache.streampipes.model;
+import org.apache.streampipes.model.api.EndpointSelectable;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.schema.EventSchema;
-public class SpDataSet extends SpDataStream {
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+public class SpDataSet extends SpDataStream implements EndpointSelectable {
private EventGrounding supportedGrounding;
@@ -90,18 +93,28 @@ public class SpDataSet extends SpDataStream {
this.datasetInvocationId = datasetInvocationId;
}
+ @Override
public String getCorrespondingPipeline() {
return correspondingPipeline;
}
+ @Override
public void setCorrespondingPipeline(String correspondingPipeline) {
this.correspondingPipeline = correspondingPipeline;
}
+ @Override
+ @JsonIgnore
+ public String getDetachPath() {
+ return "/" + getCorrespondingAdapterId() + "/" + getDatasetInvocationId();
+ }
+
+ @Override
public String getSelectedEndpointUrl() {
return selectedEndpointUrl;
}
+ @Override
public void setSelectedEndpointUrl(String selectedEndpointUrl) {
this.selectedEndpointUrl = selectedEndpointUrl;
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java
similarity index 64%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java
index 085d17e44..585400010 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java
@@ -16,19 +16,18 @@
*
*/
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.model.api;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+public interface EndpointSelectable {
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+ String getName();
-public class TemporaryGraphStorage {
+ String getSelectedEndpointUrl();
+ void setSelectedEndpointUrl(String selectedEndpointUrl);
- public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
+ String getCorrespondingPipeline();
- public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+ void setCorrespondingPipeline(String pipelineId);
+ String getDetachPath();
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 3a87cc5ba..0d64297fc 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -18,19 +18,21 @@
package org.apache.streampipes.model.base;
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
import org.apache.streampipes.logging.LoggerFactory;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.api.EndpointSelectable;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.util.Cloner;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonIgnore;
-public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
+import java.util.List;
- private static final long serialVersionUID = 2727573914765473470L;
+public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity implements EndpointSelectable {
protected List<SpDataStream> inputStreams;
@@ -120,10 +122,12 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
this.supportedGrounding = supportedGrounding;
}
+ @Override
public String getCorrespondingPipeline() {
return correspondingPipeline;
}
+ @Override
public void setCorrespondingPipeline(String correspondingPipeline) {
this.correspondingPipeline = correspondingPipeline;
}
@@ -168,14 +172,22 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
this.uncompleted = uncompleted;
}
+ @Override
public String getSelectedEndpointUrl() {
return selectedEndpointUrl;
}
+ @Override
public void setSelectedEndpointUrl(String selectedEndpointUrl) {
this.selectedEndpointUrl = selectedEndpointUrl;
}
+ @Override
+ @JsonIgnore
+ public String getDetachPath() {
+ return "/" + InstanceIdExtractor.extractId(getElementId());
+ }
+
//public Logger getLogger(Class clazz, PeConfig peConfig) {
public Logger getLogger(Class clazz) {
//return LoggerFactory.getPeLogger(clazz, getCorrespondingPipeline(), getUri(), peConfig);
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java
index a688a101c..6fa8f7816 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java
@@ -28,13 +28,14 @@ public class PipelineStatusMessage {
private String messageType;
private String message;
- public PipelineStatusMessage(String pipelineId, long timestamp,
- String messageType, String message) {
+ public PipelineStatusMessage(String pipelineId,
+ long timestamp,
+ PipelineStatusMessageType message) {
super();
this.pipelineId = pipelineId;
this.timestamp = timestamp;
- this.messageType = messageType;
- this.message = message;
+ this.messageType = message.title();
+ this.message = message.description();
}
public String getPipelineId() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java
index 25b3c5b27..8e42372e2 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java
@@ -33,7 +33,9 @@ public class PipelineOperationStatus {
private List<PipelineElementStatus> elementStatus;
- public PipelineOperationStatus(String pipelineId, String pipelineName, String title,
+ public PipelineOperationStatus(String pipelineId,
+ String pipelineName,
+ String title,
List<PipelineElementStatus> elementStatus) {
super();
this.title = title;
@@ -46,6 +48,12 @@ public class PipelineOperationStatus {
this.elementStatus = new ArrayList<>();
}
+ public PipelineOperationStatus(String pipelineId, String pipelineName) {
+ this();
+ this.pipelineId = pipelineId;
+ this.pipelineName = pipelineName;
+ }
+
public String getPipelineId() {
return pipelineId;
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
new file mode 100644
index 000000000..ba13ab7b0
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class PipelineExecutionInfo {
+
+ private final List<NamedStreamPipesEntity> failedServices;
+ private final List<InvocableStreamPipesEntity> processorsAndSinks;
+ private final List<SpDataSet> dataSets;
+
+ private PipelineOperationStatus pipelineOperationStatus;
+
+ private final String pipelineId;
+
+ public static PipelineExecutionInfo create(Pipeline pipeline) {
+ return new PipelineExecutionInfo(pipeline);
+ }
+
+ private PipelineExecutionInfo(Pipeline pipeline) {
+ this.failedServices = new ArrayList<>();
+ this.processorsAndSinks = findProcessorsAndSinks(pipeline);
+ this.dataSets = findDataSets(pipeline);
+ this.pipelineOperationStatus = new PipelineOperationStatus();
+ this.pipelineId = pipeline.getPipelineId();
+ }
+
+ private List<InvocableStreamPipesEntity> findProcessorsAndSinks(Pipeline pipeline) {
+ return Stream
+ .concat(
+ pipeline.getSepas().stream(),
+ pipeline.getActions().stream()
+ ).collect(Collectors.toList());
+ }
+
+ private List<SpDataSet> findDataSets(Pipeline pipeline) {
+ return pipeline
+ .getStreams()
+ .stream()
+ .filter(s -> s instanceof SpDataSet)
+ .map(s -> new SpDataSet((SpDataSet) s))
+ .toList();
+ }
+
+ public void addFailedPipelineElement(NamedStreamPipesEntity failedElement) {
+ this.failedServices.add(failedElement);
+ }
+
+ public void addDataSets(List<SpDataSet> dataSets) {
+ this.dataSets.addAll(dataSets);
+ }
+
+ public List<SpDataSet> getDataSets() {
+ return dataSets;
+ }
+
+ public List<NamedStreamPipesEntity> getFailedServices() {
+ return failedServices;
+ }
+
+ public List<InvocableStreamPipesEntity> getProcessorsAndSinks() {
+ return processorsAndSinks;
+ }
+
+ public void applyPipelineOperationStatus(PipelineOperationStatus status) {
+ this.pipelineOperationStatus = status;
+ }
+
+ public PipelineOperationStatus getPipelineOperationStatus() {
+ return this.pipelineOperationStatus;
+ }
+
+ public String getPipelineId() {
+ return pipelineId;
+ }
+
+ public boolean isOperationSuccessful() {
+ return failedServices.size() == 0 && pipelineOperationStatus.isSuccess();
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
new file mode 100644
index 000000000..e290c485d
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution;
+
+import org.apache.streampipes.manager.execution.http.DetachPipelineElementSubmitter;
+import org.apache.streampipes.manager.execution.http.InvokePipelineElementSubmitter;
+import org.apache.streampipes.manager.execution.provider.CurrentPipelineElementProvider;
+import org.apache.streampipes.manager.execution.provider.StoredPipelineElementProvider;
+import org.apache.streampipes.manager.execution.task.AfterInvocationTask;
+import org.apache.streampipes.manager.execution.task.DiscoverEndpointsTask;
+import org.apache.streampipes.manager.execution.task.SubmitRequestTask;
+import org.apache.streampipes.manager.execution.task.PipelineExecutionTask;
+import org.apache.streampipes.manager.execution.task.SecretEncryptionTask;
+import org.apache.streampipes.manager.execution.task.StorePipelineStatusTask;
+import org.apache.streampipes.manager.execution.task.UpdateGroupIdTask;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.resource.management.secret.SecretProvider;
+
+import java.util.List;
+
+public class PipelineExecutionTaskFactory {
+
+ public static List<PipelineExecutionTask> makeStartPipelineTasks(Pipeline pipeline) {
+ return List.of(
+ new UpdateGroupIdTask(),
+ new SecretEncryptionTask(SecretProvider.getDecryptionService()),
+ new DiscoverEndpointsTask(),
+ new SubmitRequestTask(new InvokePipelineElementSubmitter(pipeline), new CurrentPipelineElementProvider()),
+ new SecretEncryptionTask(SecretProvider.getEncryptionService()),
+ new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STARTED),
+ new StorePipelineStatusTask(true, false)
+ );
+ }
+
+ public static List<PipelineExecutionTask> makeStopPipelineTasks(Pipeline pipeline,
+ boolean forceStop) {
+ return List.of(
+ new SubmitRequestTask(new DetachPipelineElementSubmitter(pipeline), new StoredPipelineElementProvider()),
+ new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STOPPED),
+ new StorePipelineStatusTask(false, forceStop)
+ );
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
new file mode 100644
index 000000000..bc14e46f5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution;
+
+import org.apache.streampipes.manager.execution.task.PipelineExecutionTask;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.List;
+
+public class PipelineExecutor {
+
+ private final Pipeline pipeline;
+ private final boolean forceStop;
+
+ public PipelineExecutor(Pipeline pipeline,
+ boolean forceStop) {
+ this.pipeline = pipeline;
+ this.forceStop = forceStop;
+ }
+
+ public PipelineOperationStatus startPipeline() {
+ return executeOperation(PipelineExecutionTaskFactory.makeStartPipelineTasks(pipeline));
+ }
+
+ public PipelineOperationStatus stopPipeline() {
+ return executeOperation(PipelineExecutionTaskFactory.makeStopPipelineTasks(pipeline, forceStop));
+ }
+
+ private PipelineOperationStatus executeOperation(List<PipelineExecutionTask> executionTasks) {
+ var executionInfo = PipelineExecutionInfo.create(pipeline);
+ executionTasks
+ .forEach(task -> {
+ if (task.shouldExecute(executionInfo)) {
+ task.executeTask(pipeline, executionInfo);
+ }
+ });
+ return executionInfo.getPipelineOperationStatus();
+ }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java
new file mode 100644
index 000000000..51760b252
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.endpoint;
+
+import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ExtensionsServiceEndpointProvider {
+
+ public String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException {
+ return new ExtensionsServiceEndpointGenerator(
+ g.getAppId(),
+ ExtensionsServiceEndpointUtils.getPipelineElementType(g))
+ .getEndpointResourceUrl();
+ }
+
+ public String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvailableException {
+ String appId = ds.getAppId() != null ? ds.getAppId() : ds.getCorrespondingAdapterId();
+ if (ds.isInternallyManaged()) {
+ return getConnectMasterSourcesUrl();
+ } else {
+ return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.DATA_SET)
+ .getEndpointResourceUrl();
+ }
+ }
+
+ private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException {
+ List<String> connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery()
+ .getServiceEndpoints(DefaultSpServiceGroups.CORE, true,
+ Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString()));
+ if (connectMasterEndpoints.size() > 0) {
+ return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT;
+ } else {
+ throw new NoServiceEndpointsAvailableException("Could not find any available connect master service endpoint");
+ }
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
similarity index 50%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
index 085d17e44..ac9b73705 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
@@ -16,19 +16,27 @@
*
*/
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.api.EndpointSelectable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.http.client.fluent.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class TemporaryGraphStorage {
- public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
+public class DetachHttpRequest extends PipelineElementHttpRequest {
- public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+ private static final Logger LOG = LoggerFactory.getLogger(DetachHttpRequest.class);
+ @Override
+ protected Request initRequest(EndpointSelectable pipelineElement, String endpointUrl) {
+ LOG.info("Detaching element: " + endpointUrl);
+ return Request.Delete(endpointUrl);
+ }
+
+ @Override
+ protected void logError(String endpointUrl, String pipelineElementName, String exceptionMessage) {
+ LOG.error("Could not stop pipeline element {} at {}: {}", endpointUrl, pipelineElementName, exceptionMessage);
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
new file mode 100644
index 000000000..47651cda3
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+
+import java.util.List;
+
+public class DetachPipelineElementSubmitter extends PipelineElementSubmitter {
+
+ public DetachPipelineElementSubmitter(Pipeline pipeline) {
+ super(pipeline);
+ }
+
+ @Override
+ protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) {
+ return performDetach(pipelineElement);
+ }
+
+ @Override
+ protected boolean shouldSubmitDataSets() {
+ return true;
+ }
+
+ @Override
+ protected void onSuccess() {
+ status.setTitle("Pipeline " + pipelineName + " successfully stopped");
+ }
+
+ @Override
+ protected void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks) {
+ status.setTitle("Could not stop all pipeline elements of pipeline " + pipelineName + ".");
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
deleted file mode 100644
index 59598fa14..000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public class GraphSubmitter {
-
- private List<InvocableStreamPipesEntity> graphs;
- private List<SpDataSet> dataSets;
-
- private String pipelineId;
- private String pipelineName;
-
- private static final Logger LOG = LoggerFactory.getLogger(GraphSubmitter.class);
-
- public GraphSubmitter(String pipelineId,
- String pipelineName,
- List<InvocableStreamPipesEntity> graphs,
- List<SpDataSet> dataSets) {
- this.graphs = graphs != null ? graphs : new ArrayList<>();
- this.pipelineId = pipelineId;
- this.pipelineName = pipelineName;
- this.dataSets = dataSets != null ? dataSets : new ArrayList<>();
- }
-
- public PipelineOperationStatus invokeGraphs() {
- PipelineOperationStatus status = new PipelineOperationStatus();
- status.setPipelineId(pipelineId);
- status.setPipelineName(pipelineName);
-
-
- graphs.forEach(g -> status.addPipelineElementStatus(performInvocation(g)));
- if (status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)) {
- dataSets.forEach(dataSet ->
- status.addPipelineElementStatus
- (performInvocation(dataSet)));
- }
- status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
- if (status.isSuccess()) {
- status.setTitle("Pipeline " + pipelineName + " successfully started");
- } else {
- LOG.info("Could not start pipeline, initializing rollback...");
- rollbackInvokedPipelineElements(status);
- status.setTitle("Could not start pipeline " + pipelineName + ".");
- }
- return status;
- }
-
- private void rollbackInvokedPipelineElements(PipelineOperationStatus status) {
- for (PipelineElementStatus s : status.getElementStatus()) {
- if (s.isSuccess()) {
- Optional<InvocableStreamPipesEntity> graph = findGraph(s.getElementId());
- graph.ifPresent(g -> {
- LOG.info("Rolling back element " + g.getElementId());
- performDetach(g);
- });
- }
- }
- }
-
- private Optional<InvocableStreamPipesEntity> findGraph(String elementId) {
- return graphs.stream().filter(g -> g.getBelongsTo().equals(elementId)).findFirst();
- }
-
- public PipelineOperationStatus detachGraphs() {
- PipelineOperationStatus status = new PipelineOperationStatus();
- status.setPipelineId(pipelineId);
- status.setPipelineName(pipelineName);
-
- graphs.forEach(g -> status.addPipelineElementStatus(performDetach(g)));
- dataSets.forEach(dataSet -> status.addPipelineElementStatus(performDetach(dataSet)));
- status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
- if (status.isSuccess()) {
- status.setTitle("Pipeline " + pipelineName + " successfully stopped");
- } else {
- status.setTitle("Could not stop all pipeline elements of pipeline " + pipelineName + ".");
- }
-
- return status;
- }
-
- private PipelineElementStatus performInvocation(InvocableStreamPipesEntity entity) {
- String endpointUrl = entity.getSelectedEndpointUrl();
- return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).invoke();
- }
-
- private PipelineElementStatus performInvocation(SpDataSet dataset) {
- String endpointUrl = dataset.getSelectedEndpointUrl();
- return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).invoke();
- }
-
- private PipelineElementStatus performDetach(InvocableStreamPipesEntity entity) {
- String endpointUrl = entity.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(entity.getElementId());
- return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).detach();
- }
-
- private PipelineElementStatus performDetach(SpDataSet dataset) {
- String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getCorrespondingAdapterId() + "/"
- + dataset.getDatasetInvocationId();
- return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).detach();
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
deleted file mode 100644
index d9dd5a313..000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.manager.util.AuthTokenUtils;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-
-import com.google.gson.JsonSyntaxException;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.entity.ContentType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class HttpRequestBuilder {
-
- private final NamedStreamPipesEntity payload;
- private final String endpointUrl;
- private String pipelineId;
-
- private static final Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class);
-
- public HttpRequestBuilder(NamedStreamPipesEntity payload,
- String endpointUrl,
- String pipelineId) {
- this.payload = payload;
- this.endpointUrl = endpointUrl;
- this.pipelineId = pipelineId;
- }
-
- public PipelineElementStatus invoke() {
- LOG.info("Invoking element: " + endpointUrl);
- try {
- String jsonDocument = toJson();
- Response httpResp =
- Request.Post(endpointUrl)
- .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId))
- .bodyString(jsonDocument, ContentType.APPLICATION_JSON)
- .connectTimeout(10000)
- .execute();
- return handleResponse(httpResp);
- } catch (Exception e) {
- LOG.error("Could not perform invocation request", e);
- return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage());
- }
- }
-
- public PipelineElementStatus detach() {
- try {
- Response httpResp = Request.Delete(endpointUrl)
- .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId))
- .connectTimeout(10000).execute();
- return handleResponse(httpResp);
- } catch (Exception e) {
- LOG.error("Could not stop pipeline {}", endpointUrl, e);
- return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage());
- }
- }
-
- private PipelineElementStatus handleResponse(Response httpResp) throws JsonSyntaxException, IOException {
- String resp = httpResp.returnContent().asString();
- org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer
- .getObjectMapper()
- .readValue(resp, org.apache.streampipes.model.Response.class);
- return convert(streamPipesResp);
- }
-
- private String toJson() throws Exception {
- return JacksonSerializer.getObjectMapper().writeValueAsString(payload);
- }
-
- private PipelineElementStatus convert(org.apache.streampipes.model.Response response) {
- return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(),
- response.getOptionalMessage());
- }
-
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java
new file mode 100644
index 000000000..1656e677b
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InvokeHttpRequest extends PipelineElementHttpRequest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InvokeHttpRequest.class);
+
+ @Override
+ protected Request initRequest(EndpointSelectable pipelineElement,
+ String endpointUrl) throws JsonProcessingException {
+ LOG.info("Invoking element: " + endpointUrl);
+ return Request
+ .Post(endpointUrl)
+ .bodyString(toJson(pipelineElement), ContentType.APPLICATION_JSON);
+ }
+
+ @Override
+ protected void logError(String endpointUrl,
+ String pipelineElementName,
+ String exceptionMessage) {
+ LOG.error("Could not perform invocation request at {} for pipeline element {}: {}",
+ endpointUrl, pipelineElementName, exceptionMessage);
+ }
+
+ private String toJson(EndpointSelectable pipelineElement) throws JsonProcessingException {
+ return JacksonSerializer.getObjectMapper().writeValueAsString(pipelineElement);
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
new file mode 100644
index 000000000..d746d2319
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public class InvokePipelineElementSubmitter extends PipelineElementSubmitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InvokePipelineElementSubmitter.class);
+
+ public InvokePipelineElementSubmitter(Pipeline pipeline) {
+ super(pipeline);
+ }
+
+ @Override
+ protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) {
+ String endpointUrl = pipelineElement.getSelectedEndpointUrl();
+ return new InvokeHttpRequest().execute(pipelineElement, endpointUrl, this.pipelineId);
+ }
+
+ @Override
+ protected boolean shouldSubmitDataSets() {
+ return isSuccess();
+ }
+
+ @Override
+ protected void onSuccess() {
+ status.setTitle("Pipeline " + pipelineName + " successfully started");
+ }
+
+ @Override
+ protected void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks) {
+ LOG.info("Could not start pipeline, initializing rollback...");
+ rollbackInvokedPipelineElements(status, processorsAndSinks);
+ status.setTitle("Could not start pipeline " + pipelineName + ".");
+ }
+
+ private void rollbackInvokedPipelineElements(PipelineOperationStatus status,
+ List<InvocableStreamPipesEntity> pe) {
+ for (PipelineElementStatus s : status.getElementStatus()) {
+ if (s.isSuccess()) {
+ Optional<InvocableStreamPipesEntity> graph = findPipelineElements(s.getElementId(), pe);
+ graph.ifPresent(this::performDetach);
+ }
+ }
+ }
+
+ private Optional<InvocableStreamPipesEntity> findPipelineElements(String elementId,
+ List<InvocableStreamPipesEntity> pe) {
+ return pe
+ .stream()
+ .filter(g -> g.getBelongsTo().equals(elementId))
+ .findFirst();
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java
new file mode 100644
index 000000000..400626e54
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.manager.util.AuthTokenUtils;
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.gson.JsonSyntaxException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+
+import java.io.IOException;
+
+public abstract class PipelineElementHttpRequest {
+
+ public PipelineElementStatus execute(EndpointSelectable pipelineElement,
+ String endpointUrl,
+ String pipelineId) {
+ try {
+ Response httpResp = initRequest(pipelineElement, endpointUrl)
+ .addHeader("Authorization", AuthTokenUtils.getAuthToken(pipelineId))
+ .connectTimeout(10000)
+ .execute();
+ return handleResponse(httpResp, pipelineElement, endpointUrl);
+ } catch (Exception e) {
+ logError(endpointUrl, pipelineElement.getName(), e.getMessage());
+ return new PipelineElementStatus(endpointUrl, pipelineElement.getName(), false, e.getMessage());
+ }
+ }
+
+ protected abstract Request initRequest(EndpointSelectable pipelineElement,
+ String endpointUrl) throws JsonProcessingException;
+
+ protected abstract void logError(String endpointUrl,
+ String pipelineElementName,
+ String exceptionMessage);
+
+ protected PipelineElementStatus handleResponse(Response httpResp,
+ EndpointSelectable pipelineElement,
+ String endpointUrl) throws JsonSyntaxException, IOException {
+ String resp = httpResp.returnContent().asString();
+ org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer
+ .getObjectMapper()
+ .readValue(resp, org.apache.streampipes.model.Response.class);
+ return convert(streamPipesResp, endpointUrl, pipelineElement.getName());
+ }
+
+ private PipelineElementStatus convert(org.apache.streampipes.model.Response response,
+ String endpointUrl,
+ String pipelineElementName) {
+ return new PipelineElementStatus(endpointUrl, pipelineElementName, response.isSuccess(),
+ response.getOptionalMessage());
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
new file mode 100644
index 000000000..8be496b13
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.List;
+
+public abstract class PipelineElementSubmitter {
+
+ protected final String pipelineId;
+ protected final String pipelineName;
+
+ protected final PipelineOperationStatus status;
+
+ public PipelineElementSubmitter(Pipeline pipeline) {
+ this.pipelineId = pipeline.getPipelineId();
+ this.pipelineName = pipeline.getName();
+ this.status = new PipelineOperationStatus(pipelineId, pipelineName);
+ }
+
+ public PipelineOperationStatus submit(List<InvocableStreamPipesEntity> processorsAndSinks,
+ List<SpDataSet> dataSets) {
+ // First, try handling all data processors and sinks
+ processorsAndSinks.forEach(g -> status.addPipelineElementStatus(submitElement(g)));
+
+ // Then,submit data sets always for detach operation and otherwise only in case of success
+ if (shouldSubmitDataSets()) {
+ dataSets.forEach(dataSet -> status.addPipelineElementStatus(submitElement(dataSet)));
+ }
+
+ applySuccess(processorsAndSinks);
+ return status;
+ }
+
+ protected boolean isSuccess() {
+ return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
+ }
+
+ protected void applySuccess(List<InvocableStreamPipesEntity> processorsAndSinks) {
+ status.setSuccess(isSuccess());
+ if (status.isSuccess()) {
+ this.onSuccess();
+ } else {
+ this.onFailure(processorsAndSinks);
+ }
+ }
+
+ protected PipelineElementStatus performDetach(EndpointSelectable pipelineElement) {
+ String endpointUrl = pipelineElement.getSelectedEndpointUrl() + pipelineElement.getDetachPath();
+ return new DetachHttpRequest().execute(pipelineElement, endpointUrl, this.pipelineId);
+ }
+
+ protected abstract PipelineElementStatus submitElement(EndpointSelectable pipelineElement);
+
+ protected abstract boolean shouldSubmitDataSets();
+
+ protected abstract void onSuccess();
+
+ protected abstract void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks);
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
deleted file mode 100644
index 9ee3b8467..000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.commons.MD5;
-import org.apache.streampipes.commons.Utils;
-import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
-import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
-import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.resource.management.secret.SecretProvider;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import org.lightcouch.DocumentConflictException;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class PipelineExecutor {
-
- private final Pipeline pipeline;
- private final boolean storeStatus;
- private final boolean forceStop;
-
- public PipelineExecutor(Pipeline pipeline,
- boolean storeStatus,
- boolean forceStop) {
- this.pipeline = pipeline;
- this.storeStatus = storeStatus;
- this.forceStop = forceStop;
- }
-
- public PipelineOperationStatus startPipeline() {
-
- pipeline.getSepas().forEach(this::updateGroupIds);
- pipeline.getActions().forEach(this::updateGroupIds);
-
- List<DataProcessorInvocation> sepas = pipeline.getSepas();
- List<DataSinkInvocation> secs = pipeline.getActions();
-
- List<SpDataSet> dataSets = pipeline
- .getStreams()
- .stream()
- .filter(s -> s instanceof SpDataSet)
- .map(s -> new SpDataSet((SpDataSet) s))
- .collect(Collectors.toList());
-
- List<NamedStreamPipesEntity> failedServices = new ArrayList<>();
-
- dataSets.forEach(ds -> {
- ds.setCorrespondingPipeline(pipeline.getPipelineId());
- try {
- ds.setSelectedEndpointUrl(findSelectedEndpoint(ds));
- } catch (NoServiceEndpointsAvailableException e) {
- failedServices.add(ds);
- }
- });
-
- List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
- graphs.addAll(sepas);
- graphs.addAll(secs);
-
- decryptSecrets(graphs);
-
- graphs.forEach(g -> {
- try {
- g.setSelectedEndpointUrl(findSelectedEndpoint(g));
- g.setCorrespondingPipeline(pipeline.getPipelineId());
- } catch (NoServiceEndpointsAvailableException e) {
- failedServices.add(g);
- }
- });
-
- PipelineOperationStatus status;
- if (failedServices.size() == 0) {
-
- status = new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), graphs, dataSets)
- .invokeGraphs();
-
- encryptSecrets(graphs);
-
- if (status.isSuccess()) {
- storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-
- PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
- new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(),
- PipelineStatusMessageType.PIPELINE_STARTED.title(),
- PipelineStatusMessageType.PIPELINE_STARTED.description()));
-
- if (storeStatus) {
- pipeline.setHealthStatus(PipelineHealthStatus.OK);
- setPipelineStarted(pipeline);
- }
- }
- } else {
- List<PipelineElementStatus> pe = failedServices.stream().map(fs ->
- new PipelineElementStatus(fs.getElementId(),
- fs.getName(),
- false,
- "No active supporting service found")).collect(Collectors.toList());
- status = new PipelineOperationStatus(pipeline.getPipelineId(),
- pipeline.getName(),
- "Could not start pipeline " + pipeline.getName() + ".",
- pe);
- }
- return status;
- }
-
- private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException {
- return new ExtensionsServiceEndpointGenerator(
- g.getAppId(),
- ExtensionsServiceEndpointUtils.getPipelineElementType(g))
- .getEndpointResourceUrl();
- }
-
- private String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvailableException {
- String appId = ds.getAppId() != null ? ds.getAppId() : ds.getCorrespondingAdapterId();
- if (ds.isInternallyManaged()) {
- return getConnectMasterSourcesUrl();
- } else {
- return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.DATA_SET)
- .getEndpointResourceUrl();
- }
- }
-
- private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException {
- List<String> connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery()
- .getServiceEndpoints(DefaultSpServiceGroups.CORE, true,
- Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString()));
- if (connectMasterEndpoints.size() > 0) {
- return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT;
- } else {
- throw new NoServiceEndpointsAvailableException("Could not find any available connect master service endpoint");
- }
- }
-
- private void updateGroupIds(InvocableStreamPipesEntity entity) {
- entity.getInputStreams()
- .stream()
- .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
- .map(is -> is.getEventGrounding().getTransportProtocol())
- .map(KafkaTransportProtocol.class::cast)
- .forEach(tp -> tp.setGroupId(Utils.filterSpecialChar(pipeline.getName()) + MD5.crypt(tp.getElementId())));
- }
-
- private void decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
- SecretProvider.getDecryptionService().apply(graphs);
- }
-
- private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
- SecretProvider.getEncryptionService().apply(graphs);
- }
-
- public PipelineOperationStatus stopPipeline() {
- List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
- List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
-
- PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), graphs, dataSets)
- .detachGraphs();
-
- if (status.isSuccess()) {
- PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
- new PipelineStatusMessage(pipeline.getPipelineId(),
- System.currentTimeMillis(),
- PipelineStatusMessageType.PIPELINE_STOPPED.title(),
- PipelineStatusMessageType.PIPELINE_STOPPED.description()));
-
- }
-
- if (status.isSuccess() || forceStop) {
- if (storeStatus) {
- setPipelineStopped(pipeline);
- }
- }
- return status;
- }
-
- private void setPipelineStarted(Pipeline pipeline) {
- pipeline.setRunning(true);
- pipeline.setStartedAt(new Date().getTime());
- try {
- getPipelineStorageApi().updatePipeline(pipeline);
- } catch (DocumentConflictException dce) {
- //dce.printStackTrace();
- }
- }
-
- private void setPipelineStopped(Pipeline pipeline) {
- pipeline.setRunning(false);
- getPipelineStorageApi().updatePipeline(pipeline);
- }
-
- private void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
- List<SpDataSet> dataSets) {
- TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
- TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
- }
-
- private IPipelineStorage getPipelineStorageApi() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
similarity index 64%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
index 085d17e44..282f08537 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
@@ -16,19 +16,22 @@
*
*/
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.provider;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-public class TemporaryGraphStorage {
-
- public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
- public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+public class CurrentPipelineElementProvider implements PipelineElementProvider {
+ @Override
+ public List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
+ return executionInfo.getProcessorsAndSinks();
+ }
+ @Override
+ public List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo) {
+ return executionInfo.getDataSets();
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
similarity index 73%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
index 085d17e44..d8f724801 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
@@ -16,19 +16,17 @@
*
*/
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.provider;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-public class TemporaryGraphStorage {
+public interface PipelineElementProvider {
- public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
- public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+ List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo);
+ List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
similarity index 57%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
index 085d17e44..a41adbde7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
@@ -16,19 +16,23 @@
*
*/
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.provider;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-public class TemporaryGraphStorage {
-
- public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
- public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+public class StoredPipelineElementProvider implements PipelineElementProvider {
+ @Override
+ public List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
+ return RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId());
+ }
+ @Override
+ public List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo) {
+ return RunningPipelineElementStorage.runningDataSets.get(executionInfo.getPipelineId());
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
new file mode 100644
index 000000000..a7d967d2a
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
+import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.message.PipelineStatusMessage;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+import java.util.List;
+
+public class AfterInvocationTask implements PipelineExecutionTask {
+
+ private final PipelineStatusMessageType statusMessageType;
+
+ public AfterInvocationTask(PipelineStatusMessageType statusMessageType) {
+ this.statusMessageType = statusMessageType;
+ }
+
+ @Override
+ public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
+ return executionInfo.isOperationSuccessful();
+ }
+
+ @Override
+ public void executeTask(Pipeline pipeline,
+ PipelineExecutionInfo executionInfo) {
+ var graphs = executionInfo.getProcessorsAndSinks();
+ var dataSets = executionInfo.getDataSets();
+ storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+ addPipelineStatus(pipeline);
+ }
+
+ private void storeInvocationGraphs(String pipelineId,
+ List<InvocableStreamPipesEntity> graphs,
+ List<SpDataSet> dataSets) {
+ RunningPipelineElementStorage.runningProcessorsAndSinks.put(pipelineId, graphs);
+ RunningPipelineElementStorage.runningDataSets.put(pipelineId, dataSets);
+ }
+
+ private void addPipelineStatus(Pipeline pipeline) {
+ PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
+ new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), statusMessageType));
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
new file mode 100644
index 000000000..713644650
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointProvider;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.api.EndpointSelectable;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DiscoverEndpointsTask implements PipelineExecutionTask {
+ @Override
+ public void executeTask(Pipeline pipeline,
+ PipelineExecutionInfo executionInfo) {
+ var processorsAndSinks = executionInfo.getProcessorsAndSinks();
+ var dataSets = executionInfo.getDataSets();
+
+ processorsAndSinks.forEach(el -> {
+ try {
+ var endpointUrl = findSelectedEndpoint(el);
+ applyEndpointAndPipeline(pipeline.getPipelineId(), el, endpointUrl);
+ } catch (NoServiceEndpointsAvailableException e) {
+ executionInfo.addFailedPipelineElement(el);
+ }
+ });
+ dataSets.forEach(ds -> {
+ try {
+ var endpointUrl = findSelectedDsEndpoint(ds);
+ applyEndpointAndPipeline(pipeline.getPipelineId(), ds, endpointUrl);
+ } catch (NoServiceEndpointsAvailableException e) {
+ executionInfo.addFailedPipelineElement(ds);
+ }
+ });
+
+ var failedServices = executionInfo.getFailedServices();
+ if (executionInfo.getFailedServices().size() > 0) {
+ List<PipelineElementStatus> pe = failedServices
+ .stream()
+ .map(fs -> new PipelineElementStatus(fs.getElementId(), fs.getName(), false,
+ "No active extensions service found which provides this pipeline element"))
+ .collect(Collectors.toList());
+ var status = new PipelineOperationStatus(pipeline.getPipelineId(),
+ pipeline.getName(),
+ "Could not start pipeline " + pipeline.getName() + ".",
+ pe);
+ executionInfo.applyPipelineOperationStatus(status);
+ }
+ }
+
+ private void applyEndpointAndPipeline(String pipelineId,
+ EndpointSelectable pipelineElement,
+ String endpointUrl) {
+ pipelineElement.setSelectedEndpointUrl(endpointUrl);
+ pipelineElement.setCorrespondingPipeline(pipelineId);
+ }
+
+ private String findSelectedEndpoint(InvocableStreamPipesEntity pipelineElement)
+ throws NoServiceEndpointsAvailableException {
+ return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(pipelineElement);
+ }
+
+ private String findSelectedDsEndpoint(SpDataSet dataSet) throws NoServiceEndpointsAvailableException {
+ return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(dataSet);
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java
similarity index 64%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java
index 085d17e44..096456061 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java
@@ -16,19 +16,17 @@
*
*/
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.task;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public interface PipelineExecutionTask {
-public class TemporaryGraphStorage {
-
- public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
- public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+ default boolean shouldExecute(PipelineExecutionInfo executionInfo) {
+ return true;
+ }
+ void executeTask(Pipeline pipeline,
+ PipelineExecutionInfo executionInfo);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java
similarity index 54%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java
index 085d17e44..b530f93c8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java
@@ -16,19 +16,23 @@
*
*/
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.execution.task;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.resource.management.secret.SecretService;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public class SecretEncryptionTask implements PipelineExecutionTask {
-public class TemporaryGraphStorage {
+ private final SecretService secretService;
- public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
-
- public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+ public SecretEncryptionTask(SecretService secretService) {
+ this.secretService = secretService;
+ }
+ @Override
+ public void executeTask(Pipeline pipeline,
+ PipelineExecutionInfo executionInfo) {
+ this.secretService.apply(executionInfo.getProcessorsAndSinks());
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
new file mode 100644
index 000000000..f01f99005
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import org.lightcouch.DocumentConflictException;
+
+import java.util.Date;
+
+public class StorePipelineStatusTask implements PipelineExecutionTask {
+
+ private final boolean start;
+ private final boolean forceStop;
+
+ public StorePipelineStatusTask(boolean start,
+ boolean forceStop) {
+ this.start = start;
+ this.forceStop = forceStop;
+ }
+
+ @Override
+ public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
+ return executionInfo.isOperationSuccessful() || forceStop;
+ }
+
+ @Override
+ public void executeTask(Pipeline pipeline,
+ PipelineExecutionInfo executionInfo) {
+ if (this.start) {
+ pipeline.setHealthStatus(PipelineHealthStatus.OK);
+ setPipelineStarted(pipeline);
+ } else {
+ setPipelineStopped(pipeline);
+ }
+ }
+
+ private void setPipelineStarted(Pipeline pipeline) {
+ pipeline.setRunning(true);
+ pipeline.setStartedAt(new Date().getTime());
+ try {
+ getPipelineStorageApi().updatePipeline(pipeline);
+ } catch (DocumentConflictException dce) {
+ //dce.printStackTrace();
+ }
+ }
+
+ private void setPipelineStopped(Pipeline pipeline) {
+ pipeline.setRunning(false);
+ getPipelineStorageApi().updatePipeline(pipeline);
+ }
+
+ private IPipelineStorage getPipelineStorageApi() {
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
new file mode 100644
index 000000000..c29d74909
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.manager.execution.http.PipelineElementSubmitter;
+import org.apache.streampipes.manager.execution.provider.PipelineElementProvider;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+public class SubmitRequestTask implements PipelineExecutionTask {
+
+ private final PipelineElementProvider elementProvider;
+ private final PipelineElementSubmitter submitter;
+
+ public SubmitRequestTask(PipelineElementSubmitter submitter,
+ PipelineElementProvider elementProvider) {
+ this.elementProvider = elementProvider;
+ this.submitter = submitter;
+ }
+
+ @Override
+ public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
+ return executionInfo.getFailedServices().size() == 0;
+ }
+
+ @Override
+ public void executeTask(Pipeline pipeline, PipelineExecutionInfo executionInfo) {
+ var processorsAndSinks = elementProvider.getProcessorsAndSinks(executionInfo);
+ var dataSets = elementProvider.getDataSets(executionInfo);
+
+ var status = submitter.submit(processorsAndSinks, dataSets);
+
+ executionInfo.applyPipelineOperationStatus(status);
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java
new file mode 100644
index 000000000..19fd87910
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.task;
+
+import org.apache.streampipes.commons.MD5;
+import org.apache.streampipes.commons.Utils;
+import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+public class UpdateGroupIdTask implements PipelineExecutionTask {
+ @Override
+ public void executeTask(Pipeline pipeline,
+ PipelineExecutionInfo executionInfo) {
+ var sanitizedPipelineName = Utils.filterSpecialChar(pipeline.getName());
+ pipeline.getSepas().forEach(processor -> updateGroupIds(processor, sanitizedPipelineName));
+ pipeline.getActions().forEach(sink -> updateGroupIds(sink, sanitizedPipelineName));
+ }
+
+ private void updateGroupIds(InvocableStreamPipesEntity entity,
+ String sanitizedPipelineName) {
+ entity.getInputStreams()
+ .stream()
+ .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
+ .map(is -> is.getEventGrounding().getTransportProtocol())
+ .map(KafkaTransportProtocol.class::cast)
+ .forEach(tp -> tp.setGroupId(sanitizedPipelineName + MD5.crypt(tp.getElementId())));
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 335d76c46..c0e159df0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -22,8 +22,8 @@ import org.apache.streampipes.commons.constants.InstanceIdExtractor;
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.http.HttpRequestBuilder;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
+import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
+import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
@@ -67,7 +67,7 @@ public class PipelineHealthCheck implements Runnable {
List<String> failedInstances = new ArrayList<>();
List<String> recoveredInstances = new ArrayList<>();
List<String> pipelineNotifications = new ArrayList<>();
- List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
+ List<InvocableStreamPipesEntity> graphs = RunningPipelineElementStorage.runningProcessorsAndSinks.get(pipeline.getPipelineId());
graphs.forEach(graph -> {
String instanceId = extractInstanceId(graph);
if (allRunningInstances.stream().noneMatch(runningInstanceId -> runningInstanceId.equals(instanceId))) {
@@ -77,7 +77,7 @@ public class PipelineHealthCheck implements Runnable {
boolean success;
try {
endpointUrl = findEndpointUrl(graph);
- success = new HttpRequestBuilder(graph, endpointUrl, pipeline.getPipelineId()).invoke().isSuccess();
+ success = new InvokeHttpRequest().execute(graph, endpointUrl, pipeline.getPipelineId()).isSuccess();
} catch (NoServiceEndpointsAvailableException e) {
success = false;
}
@@ -182,7 +182,7 @@ public class PipelineHealthCheck implements Runnable {
private Map<String, List<InvocableStreamPipesEntity>> generateEndpointMap() {
Map<String, List<InvocableStreamPipesEntity>> endpointMap = new HashMap<>();
- TemporaryGraphStorage.graphStorage.forEach((pipelineId, graphs) ->
+ RunningPipelineElementStorage.runningProcessorsAndSinks.forEach((pipelineId, graphs) ->
graphs.forEach(graph -> addEndpoint(endpointMap, graph)));
return endpointMap;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 6e45801ca..3457d63ea 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -22,8 +22,8 @@ import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableExcepti
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.manager.endpoint.EndpointItemFetcher;
-import org.apache.streampipes.manager.execution.http.PipelineExecutor;
-import org.apache.streampipes.manager.execution.http.PipelineStorageService;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
+import org.apache.streampipes.manager.storage.PipelineStorageService;
import org.apache.streampipes.manager.matching.DataSetGroundingSelector;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.recommender.ElementRecommender;
@@ -96,19 +96,8 @@ public class Operations {
new PipelineStorageService(pipeline).updatePipeline();
}
- public static PipelineOperationStatus startPipeline(
- Pipeline pipeline) {
- return startPipeline(pipeline, true);
- }
-
- public static PipelineOperationStatus startPipeline(
- Pipeline pipeline, boolean storeStatus) {
- return new PipelineExecutor(pipeline, storeStatus, false).startPipeline();
- }
-
- public static PipelineOperationStatus stopPipeline(
- Pipeline pipeline, boolean forceStop) {
- return stopPipeline(pipeline, true, forceStop);
+ public static PipelineOperationStatus startPipeline(Pipeline pipeline) {
+ return new PipelineExecutor(pipeline, false).startPipeline();
}
public static List<PipelineOperationStatus> stopAllPipelines(boolean forceStop) {
@@ -125,9 +114,8 @@ public class Operations {
}
public static PipelineOperationStatus stopPipeline(Pipeline pipeline,
- boolean storeStatus,
boolean forceStop) {
- return new PipelineExecutor(pipeline, storeStatus, forceStop).stopPipeline();
+ return new PipelineExecutor(pipeline, forceStop).stopPipeline();
}
public static List<ExtensionsServiceEndpointItem> getEndpointUriContents(List<ExtensionsServiceEndpoint> endpoints) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index bb37f1e73..7b5e8e67d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -17,11 +17,11 @@
*/
package org.apache.streampipes.manager.preview;
-import org.apache.streampipes.commons.constants.InstanceIdExtractor;
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.http.HttpRequestBuilder;
+import org.apache.streampipes.manager.execution.http.DetachHttpRequest;
+import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.SpDataSet;
@@ -92,7 +92,7 @@ public class PipelinePreview {
graphs.forEach(g -> {
try {
g.setSelectedEndpointUrl(findSelectedEndpoint(g));
- new HttpRequestBuilder(g, g.getSelectedEndpointUrl(), null).invoke();
+ new InvokeHttpRequest().execute(g, g.getSelectedEndpointUrl(), null);
} catch (NoServiceEndpointsAvailableException e) {
e.printStackTrace();
}
@@ -101,8 +101,8 @@ public class PipelinePreview {
private void detachGraphs(List<InvocableStreamPipesEntity> graphs) {
graphs.forEach(g -> {
- String endpointUrl = g.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(g.getElementId());
- new HttpRequestBuilder(g, endpointUrl, null).detach();
+ String endpointUrl = g.getSelectedEndpointUrl() + g.getDetachPath();
+ new DetachHttpRequest().execute(g, endpointUrl, null);
});
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
similarity index 98%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
index 5c42b5eb0..9d0f2b582 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.manager.execution.http;
+package org.apache.streampipes.manager.storage;
import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphBuilder;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
similarity index 78%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
index 085d17e44..360e6db90 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.manager.util;
+package org.apache.streampipes.manager.storage;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -25,10 +25,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class TemporaryGraphStorage {
+public class RunningPipelineElementStorage {
- public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>();
+ public static Map<String, List<InvocableStreamPipesEntity>> runningProcessorsAndSinks = new HashMap<>();
- public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>();
+ public static Map<String, List<SpDataSet>> runningDataSets = new HashMap<>();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java
deleted file mode 100644
index 0523158ff..000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.storage;
-
-import org.apache.streampipes.model.client.user.Principal;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.storage.api.INoSqlStorage;
-import org.apache.streampipes.storage.api.IUserStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class UserService {
-
- private IUserStorage userStorage;
-
- public UserService(IUserStorage userStorage) {
- this.userStorage = userStorage;
- }
-
- public List<Pipeline> getOwnPipelines(String email) {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines().stream().filter(p -> p
- .getCreatedByUser()
- .equals(email))
- .collect(Collectors.toList());
- }
-
- public void deleteOwnSource(String username, String sourceId) {
- if (checkUser(username)) {
- Principal user = getPrincipal(username);
- //user.getOwnSources().removeIf(a -> a.getElementId().equals(sourceId));
- userStorage.updateUser(user);
- }
- }
-
- /**
- * Get actions/sepas/sources
- */
-
- public List<String> getOwnActionUris(String username) {
- // TODO permissions
- return new ArrayList<>();
- //return userStorage.getUser(username)
- // .getOwnActions().stream().map(r -> r.getElementId()).collect(Collectors.toList());
- }
-
- public List<String> getOwnSepaUris(String username) {
- // TODO Permissions
- return new ArrayList<>();
- //return userStorage.getUser(username)
- // .getOwnSepas().stream().map(r -> r.getElementId()).collect(Collectors.toList());
- }
-
-
- public List<String> getOwnSourceUris(String email) {
- // TODO permissions
- return new ArrayList<>();
-// return userStorage
-// .getUser(email)
-// .getOwnSources()
-// .stream()
-// .map(r -> r.getElementId())
-// .collect(Collectors.toList());
- }
-
- private Principal getPrincipal(String username) {
- return userStorage.getUser(username);
- }
-
-
- /**
- * @param username
- * @return True if user exists exactly once, false otherwise
- */
- public boolean checkUser(String username) {
- return userStorage.checkUser(username);
- }
-
- private INoSqlStorage getStorageManager() {
- return StorageDispatcher.INSTANCE.getNoSqlStore();
- }
-
-}
diff --git a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
index 33b2607b9..10c74f619 100644
--- a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
+++ b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.rest.core.base.impl;
import org.apache.streampipes.manager.endpoint.HttpJsonParser;
-import org.apache.streampipes.manager.storage.UserService;
import org.apache.streampipes.model.message.ErrorMessage;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.Notification;
@@ -68,10 +67,6 @@ public abstract class AbstractRestResource extends AbstractSharedRestInterface {
return getNoSqlStorage().getUserStorageAPI();
}
- protected UserService getUserService() {
- return new UserService(getUserStorage());
- }
-
protected IVisualizationStorage getVisualizationStorage() {
return getNoSqlStorage().getVisualizationStorageApi();
}