You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/02/28 21:39:50 UTC
[incubator-streampipes] branch dev updated: [hotfix] Fix bug in pipeline preview
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 34504a6 [hotfix] Fix bug in pipeline preview
34504a6 is described below
commit 34504a66dcae570a2b4f124239df9cad9d163e49
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Mon Feb 28 22:39:03 2022 +0100
[hotfix] Fix bug in pipeline preview
---
.../manager/data/PipelineGraphBuilder.java | 6 +-
.../manager/execution/http/PipelineExecutor.java | 20 +--
.../manager/matching/GroundingSelector.java | 3 +-
.../manager/matching/InvocationGraphBuilder.java | 194 ---------------------
.../matching/PipelineVerificationHandler.java | 194 ---------------------
.../matching/PipelineVerificationHandlerV2.java | 46 +++++
.../streampipes/manager/operations/Operations.java | 42 +----
.../manager/preview/PipelinePreview.java | 43 +++--
.../manager/recommender/AllElementsProvider.java | 4 +
.../streampipes/rest/impl/PipelineResource.java | 2 +-
10 files changed, 99 insertions(+), 455 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
index a83c125..bbcadfb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
@@ -28,9 +28,9 @@ import java.util.stream.Collectors;
public class PipelineGraphBuilder {
- private Pipeline pipeline;
- private List<NamedStreamPipesEntity> allPipelineElements;
- private List<InvocableStreamPipesEntity> invocableElements;
+ private final Pipeline pipeline;
+ private final List<NamedStreamPipesEntity> allPipelineElements;
+ private final List<InvocableStreamPipesEntity> invocableElements;
public PipelineGraphBuilder(Pipeline pipeline) {
this.pipeline = pipeline;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index ed6dda9..d484bce 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -55,21 +55,15 @@ import java.util.stream.Collectors;
public class PipelineExecutor {
- private Pipeline pipeline;
- private boolean visualize;
- private boolean storeStatus;
- private boolean monitor;
- private boolean forceStop;
+ private final Pipeline pipeline;
+ private final boolean storeStatus;
+ private final boolean forceStop;
public PipelineExecutor(Pipeline pipeline,
- boolean visualize,
boolean storeStatus,
- boolean monitor,
boolean forceStop) {
this.pipeline = pipeline;
- this.visualize = visualize;
this.storeStatus = storeStatus;
- this.monitor = monitor;
this.forceStop = forceStop;
}
@@ -200,14 +194,6 @@ public class PipelineExecutor {
.detachGraphs();
if (status.isSuccess()) {
- if (visualize) {
- StorageDispatcher
- .INSTANCE
- .getNoSqlStore()
- .getVisualizationStorageApi()
- .deleteVisualization(pipeline.getPipelineId());
- }
-
PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
new PipelineStatusMessage(pipeline.getPipelineId(),
System.currentTimeMillis(),
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java
index f32763b..8895699 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/GroundingSelector.java
@@ -30,7 +30,8 @@ public abstract class GroundingSelector {
protected NamedStreamPipesEntity source;
protected Set<InvocableStreamPipesEntity> targets;
- public GroundingSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
+ public GroundingSelector(NamedStreamPipesEntity source,
+ Set<InvocableStreamPipesEntity> targets) {
this.source = source;
this.targets = targets;
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
deleted file mode 100644
index 51b3d43..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.matching;
-
-import org.apache.streampipes.config.backend.BackendConfig;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.matching.output.OutputSchemaFactory;
-import org.apache.streampipes.manager.matching.output.OutputSchemaGenerator;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.grounding.EventGrounding;
-import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
-import org.apache.streampipes.model.output.OutputStrategy;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class InvocationGraphBuilder {
-
- private PipelineGraph pipelineGraph;
- private String pipelineId;
- private Integer uniquePeIndex = 0;
-
- private List<InvocableStreamPipesEntity> graphs;
-
- public InvocationGraphBuilder(PipelineGraph pipelineGraph, String pipelineId) {
- this.graphs = new ArrayList<>();
- this.pipelineGraph = pipelineGraph;
- this.pipelineId = pipelineId;
-
- }
-
- public List<InvocableStreamPipesEntity> buildGraphs() {
-
- List<SpDataStream> streams = PipelineGraphHelpers.findStreams(pipelineGraph);
-
- for (SpDataStream stream : streams) {
- Set<InvocableStreamPipesEntity> connectedElements = getConnections(stream);
- configure(stream, connectedElements);
- }
-
- return graphs;
- }
-
- private void configure(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
-
- EventGrounding inputGrounding = new GroundingBuilder(source, targets)
- .getEventGrounding();
-
- if (source instanceof InvocableStreamPipesEntity) {
- if (source instanceof DataProcessorInvocation && ((DataProcessorInvocation) source).isConfigured()) {
-
- DataProcessorInvocation dataProcessorInvocation = (DataProcessorInvocation) source;
- Tuple2<EventSchema, ? extends OutputStrategy> outputSettings;
- OutputSchemaGenerator<?> schemaGenerator = new OutputSchemaFactory(dataProcessorInvocation)
- .getOuputSchemaGenerator();
-
- if (((DataProcessorInvocation) source).getInputStreams().size() == 1) {
- outputSettings = schemaGenerator.buildFromOneStream(dataProcessorInvocation
- .getInputStreams()
- .get(0));
- } else if (graphExists(dataProcessorInvocation.getDOM())) {
- DataProcessorInvocation existingInvocation = (DataProcessorInvocation) find(dataProcessorInvocation.getDOM());
-
- outputSettings = schemaGenerator.buildFromTwoStreams(existingInvocation
- .getInputStreams().get(0), dataProcessorInvocation.getInputStreams().get(1));
- graphs.remove(existingInvocation);
- } else {
- outputSettings = new Tuple2<>(new EventSchema(), dataProcessorInvocation
- .getOutputStrategies().get(0));
- }
-
- SpDataStream outputStream = new SpDataStream();
- outputStream.setEventGrounding(inputGrounding);
- dataProcessorInvocation.setOutputStrategies(Collections.singletonList(outputSettings.b));
- outputStream.setEventSchema(outputSettings.a);
- ((DataProcessorInvocation) source).setOutputStream(outputStream);
- }
-
- if (!graphExists(source.getDOM())) {
- graphs.add((InvocableStreamPipesEntity) source);
- }
- }
-
- targets.forEach(t -> {
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .setEventGrounding(inputGrounding);
-
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .setEventSchema(getInputSchema(source));
-
- String elementIdentifier = makeElementIdentifier(pipelineId, inputGrounding
- .getTransportProtocol().getTopicDefinition().getActualTopicName(), t.getName());
-
- t.setElementId(t.getBelongsTo() + ":" + elementIdentifier);
- t.setCorrespondingPipeline(pipelineId);
- t.setStatusInfoSettings(makeStatusInfoSettings(elementIdentifier));
-
- uniquePeIndex++;
-
- configure(t, getConnections(t));
-
- });
-
- }
-
- private ElementStatusInfoSettings makeStatusInfoSettings(String elementIdentifier) {
- ElementStatusInfoSettings statusSettings = new ElementStatusInfoSettings();
- statusSettings.setKafkaHost(BackendConfig.INSTANCE.getKafkaHost());
- statusSettings.setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
- statusSettings.setErrorTopic(elementIdentifier + ".error");
- statusSettings.setStatsTopic(elementIdentifier + ".stats");
- statusSettings.setElementIdentifier(elementIdentifier);
-
- return statusSettings;
- }
-
- private String makeElementIdentifier(String pipelineId, String topic, String elementName) {
- return pipelineId
- + "-"
- + topic
- + "-"
- + elementName.replaceAll(" ", "").toLowerCase()
- + "-"
- + uniquePeIndex;
- }
-
- private EventSchema getInputSchema(NamedStreamPipesEntity source) {
- if (source instanceof SpDataStream) {
- return ((SpDataStream) source).getEventSchema();
- } else if (source instanceof DataProcessorInvocation) {
- return ((DataProcessorInvocation) source)
- .getOutputStream()
- .getEventSchema();
- } else {
- throw new IllegalArgumentException();
- }
- }
-
- private Set<InvocableStreamPipesEntity> getConnections(NamedStreamPipesEntity source) {
- Set<String> outgoingEdges = pipelineGraph.outgoingEdgesOf(source);
- return outgoingEdges
- .stream()
- .map(o -> pipelineGraph.getEdgeTarget(o))
- .map(g -> (InvocableStreamPipesEntity) g)
- .collect(Collectors.toSet());
-
- }
-
- private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
- return targetElement.getConnectedTo().indexOf(sourceDomId);
- }
-
- private boolean graphExists(String domId) {
- return graphs
- .stream()
- .anyMatch(g -> g.getDOM().equals(domId));
- }
-
- private InvocableStreamPipesEntity find(String domId) {
- return graphs
- .stream()
- .filter(g -> g.getDOM().equals(domId))
- .findFirst()
- .get();
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
deleted file mode 100644
index 6b63cf6..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.matching;
-
-import org.apache.streampipes.commons.exceptions.NoSepaInPipelineException;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.manager.matching.mapping.AbstractRequirementsSelectorGenerator;
-import org.apache.streampipes.manager.matching.mapping.RequirementsSelectorGeneratorFactory;
-import org.apache.streampipes.manager.selector.PropertySelectorGenerator;
-import org.apache.streampipes.manager.util.PipelineVerificationUtils;
-import org.apache.streampipes.manager.util.TreeUtils;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.client.exception.InvalidConnectionException;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.output.CustomOutputStrategy;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineModification;
-import org.apache.streampipes.model.staticproperty.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class PipelineVerificationHandler {
-
- private final Pipeline pipeline;
- private final PipelineModificationMessage pipelineModificationMessage;
- private List<InvocableStreamPipesEntity> invocationGraphs;
- private final InvocableStreamPipesEntity rootPipelineElement;
-
- public PipelineVerificationHandler(Pipeline pipeline) throws NoSepaInPipelineException {
- this(pipeline, PipelineVerificationUtils.getRootNode(pipeline));
- }
-
- public PipelineVerificationHandler(Pipeline pipeline,
- InvocableStreamPipesEntity rootNode) {
- this.pipeline = pipeline;
- this.rootPipelineElement = rootNode;
- this.invocationGraphs = makeInvocationGraphs();
- this.pipelineModificationMessage = new PipelineModificationMessage();
- }
-
- /**
- * Determines whether the current pipeline is valid
- *
- * @return PipelineValidationHandler
- * @throws InvalidConnectionException if the connection is not considered valid
- */
- public PipelineVerificationHandler validateConnection() throws InvalidConnectionException {
- invocationGraphs = new ConnectionValidator(pipeline, invocationGraphs, rootPipelineElement)
- .validateConnection();
- return this;
- }
-
- /**
- * computes mapping properties (based on input/output matching)
- *
- * @return PipelineValidationHandler
- */
- public PipelineVerificationHandler computeMappingProperties() {
- List<String> connectedTo = rootPipelineElement.getConnectedTo();
- String domId = rootPipelineElement.getDOM();
-
- List<SpDataStream> tempStreams = new ArrayList<>();
-
- for (int i = 0; i < connectedTo.size(); i++) {
- NamedStreamPipesEntity element = TreeUtils.findSEPAElement(rootPipelineElement
- .getConnectedTo().get(i), pipeline.getSepas(), pipeline
- .getStreams());
-
- SpDataStream incomingStream;
-
- if (element instanceof DataProcessorInvocation || element instanceof SpDataStream) {
- if (element instanceof DataProcessorInvocation) {
- DataProcessorInvocation ancestor = (DataProcessorInvocation) TreeUtils.findByDomId(
- connectedTo.get(i), invocationGraphs);
-
- incomingStream = ancestor.getOutputStream();
- } else {
- incomingStream = (SpDataStream) element;
- }
-
- tempStreams.add(incomingStream);
- if (rootPipelineElement.getStreamRequirements().size() - 1 == i) {
- updateStaticProperties(tempStreams, rootPipelineElement.getStaticProperties());
- PipelineModification modification = new PipelineModification(
- domId,
- rootPipelineElement.getElementId(),
- rootPipelineElement.getStaticProperties());
- modification.setInputStreams(tempStreams);
- updateOutputStrategy(tempStreams);
- if (rootPipelineElement instanceof DataProcessorInvocation) {
- modification.setOutputStrategies(((DataProcessorInvocation) rootPipelineElement).getOutputStrategies());
- }
- pipelineModificationMessage.addPipelineModification(modification);
- }
- }
- }
- return this;
- }
-
- private void updateStaticProperties(List<SpDataStream> inputStreams,
- List<StaticProperty> staticProperties) {
- staticProperties
- .stream()
- .filter(sp -> (sp instanceof CollectionStaticProperty
- || sp instanceof MappingProperty
- || sp instanceof StaticPropertyGroup
- || sp instanceof StaticPropertyAlternatives))
- .forEach(property -> updateStaticProperty(inputStreams, property));
- }
-
- private void updateStaticProperty(List<SpDataStream> inputStreams, StaticProperty property) {
- if (property instanceof MappingProperty) {
- MappingProperty mappingProperty = property.as(MappingProperty.class);
- AbstractRequirementsSelectorGenerator generator = RequirementsSelectorGeneratorFactory.getRequirementsSelector(mappingProperty, inputStreams, rootPipelineElement);
- mappingProperty.setMapsFromOptions(generator.generateSelectors());
- } else if (property instanceof StaticPropertyGroup) {
- updateStaticProperties(inputStreams, property.as(StaticPropertyGroup.class).getStaticProperties());
- } else if (property instanceof StaticPropertyAlternatives) {
- ((StaticPropertyAlternatives) property)
- .getAlternatives()
- .forEach(al -> updateStaticProperty(inputStreams, al.getStaticProperty()));
- // TODO
- } else if (property instanceof CollectionStaticProperty) {
- CollectionStaticProperty collection = property.as(CollectionStaticProperty.class);
- if (hasMappingEntry(collection)) {
- updateStaticProperty(inputStreams, collection.getStaticPropertyTemplate());
- } else if (hasGroupEntry(collection)) {
- updateStaticProperties(inputStreams, collection.getStaticPropertyTemplate().as(StaticPropertyGroup.class).getStaticProperties());
- }
- }
- }
-
- private Boolean hasMappingEntry(CollectionStaticProperty property) {
- return property.getStaticPropertyTemplate() instanceof MappingProperty;
- }
-
- private Boolean hasGroupEntry(CollectionStaticProperty property) {
- return property.getStaticPropertyTemplate() instanceof StaticPropertyGroup;
- }
-
- private void updateOutputStrategy(List<SpDataStream> inputStreams) {
-
- if (rootPipelineElement instanceof DataProcessorInvocation) {
- ((DataProcessorInvocation) rootPipelineElement)
- .getOutputStrategies()
- .stream()
- .filter(strategy -> strategy instanceof CustomOutputStrategy)
- .forEach(strategy -> {
- CustomOutputStrategy outputStrategy = (CustomOutputStrategy) strategy;
- if (inputStreams.size() == 1 || (inputStreams.size() > 1 && !(outputStrategy
- .isOutputRight()))) {
- outputStrategy.setAvailablePropertyKeys(new PropertySelectorGenerator
- (inputStreams.get(0).getEventSchema(), false).generateSelectors());
- } else {
- outputStrategy.setAvailablePropertyKeys(new PropertySelectorGenerator
- (inputStreams.get(0).getEventSchema(), inputStreams.get(1)
- .getEventSchema(), false)
- .generateSelectors());
- }
- });
- }
- }
-
- public PipelineModificationMessage getPipelineModificationMessage() {
- return pipelineModificationMessage;
- }
-
- public List<InvocableStreamPipesEntity> makeInvocationGraphs() {
- PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
- return new InvocationGraphBuilder(pipelineGraph, null).buildGraphs();
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
index e7d5025..e896dfe 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
@@ -19,8 +19,16 @@ package org.apache.streampipes.manager.matching;
import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import org.apache.streampipes.manager.recommender.AllElementsProvider;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.message.PipelineModificationMessage;
import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineModification;
+
+import java.util.*;
public class PipelineVerificationHandlerV2 {
@@ -34,4 +42,42 @@ public class PipelineVerificationHandlerV2 {
PipelineGraph graph = new PipelineGraphBuilder(pipeline).buildGraph();
return new PipelineModificationGenerator(graph).buildPipelineModificationMessage();
}
+
+ public List<NamedStreamPipesEntity> verifyAndBuildGraphs(boolean ignoreUnconfigured) {
+ List<PipelineModification> pipelineModifications = verifyPipeline().getPipelineModifications();
+ List<NamedStreamPipesEntity> allElements = new AllElementsProvider(pipeline).getAllElements();
+ List<NamedStreamPipesEntity> result = new ArrayList<>();
+ allElements.forEach(pipelineElement -> {
+ Optional<PipelineModification> modificationOpt = getModification(pipelineElement.getDOM(), pipelineModifications);
+ if (modificationOpt.isPresent()) {
+ PipelineModification modification = modificationOpt.get();
+ if (pipelineElement instanceof InvocableStreamPipesEntity) {
+ ((InvocableStreamPipesEntity) pipelineElement).setInputStreams(modification.getInputStreams());
+ ((InvocableStreamPipesEntity) pipelineElement).setStaticProperties(modification.getStaticProperties());
+ if (pipelineElement instanceof DataProcessorInvocation) {
+ ((DataProcessorInvocation) pipelineElement).setOutputStream(modification.getOutputStream());
+ if (((DataProcessorInvocation) pipelineElement).getOutputStream().getEventGrounding() == null) {
+ EventGrounding grounding = new GroundingBuilder(pipelineElement, Collections.emptySet()).getEventGrounding();
+ ((DataProcessorInvocation) pipelineElement).getOutputStream().setEventGrounding(grounding);
+ }
+ if (modification.getOutputStrategies() != null) {
+ ((DataProcessorInvocation) pipelineElement).setOutputStrategies(modification.getOutputStrategies());
+ }
+ }
+ }
+ if (!ignoreUnconfigured || modification.isPipelineElementValid()) {
+ result.add(pipelineElement);
+ }
+ } else {
+ result.add(pipelineElement);
+ }
+ });
+
+ return result;
+ }
+
+ private Optional<PipelineModification> getModification(String id,
+ List<PipelineModification> modifications) {
+ return modifications.stream().filter(m -> m.getDomId().equals(id)).findFirst();
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 704aa85..4b2c65c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -56,36 +56,17 @@ import java.util.List;
/**
* class that provides several (partial) pipeline verification methods
- *
- * @author riemer
*/
public class Operations {
-
- public static PipelineModificationMessage validatePipeline(Pipeline pipeline, boolean isPartial) throws Exception {
- return validatePipeline(pipeline, isPartial, "");
- }
-
/**
- * This method is a fix for the streamsets integration. Remove the username from the signature when you don't need it anymore
*
- * @param pipeline
- * @param isPartial
- * @param username
- * @return
- * @throws Exception
+ * @param pipeline the pipeline to validate
+ * @return PipelineModificationMessage a message containing desired pipeline modifications
*/
- public static PipelineModificationMessage validatePipeline(Pipeline pipeline, boolean isPartial, String username)
- throws Exception {
+ public static PipelineModificationMessage validatePipeline(Pipeline pipeline) throws Exception {
return new PipelineVerificationHandlerV2(pipeline).verifyPipeline();
-// PipelineVerificationHandler validator = new PipelineVerificationHandler(
-// pipeline);
-// return validator
-// .validateConnection()
-// .computeMappingProperties()
-// .storeConnection()
-// .getPipelineModificationMessage();
}
public static DataSetModificationMessage updateDataSet(SpDataSet dataSet) {
@@ -117,18 +98,17 @@ public class Operations {
public static PipelineOperationStatus startPipeline(
Pipeline pipeline) {
- return startPipeline(pipeline,true, true, false);
+ return startPipeline(pipeline, true);
}
public static PipelineOperationStatus startPipeline(
- Pipeline pipeline, boolean visualize, boolean storeStatus,
- boolean monitor) {
- return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, false).startPipeline();
+ Pipeline pipeline, boolean storeStatus) {
+ return new PipelineExecutor(pipeline, storeStatus, false).startPipeline();
}
public static PipelineOperationStatus stopPipeline(
Pipeline pipeline, boolean forceStop) {
- return stopPipeline(pipeline, true, true, false, forceStop);
+ return stopPipeline(pipeline, true, forceStop);
}
public static List<PipelineOperationStatus> stopAllPipelines(boolean forceStop) {
@@ -145,11 +125,9 @@ public class Operations {
}
public static PipelineOperationStatus stopPipeline(Pipeline pipeline,
- boolean visualize,
boolean storeStatus,
- boolean monitor,
boolean forceStop) {
- return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, forceStop).stopPipeline();
+ return new PipelineExecutor(pipeline, storeStatus, forceStop).stopPipeline();
}
public static List<ExtensionsServiceEndpointItem> getEndpointUriContents(List<ExtensionsServiceEndpoint> endpoints) {
@@ -176,10 +154,6 @@ public class Operations {
return new PipelineTemplateInvocationHandler(userSid, pipelineTemplateInvocation).handlePipelineInvocation();
}
- public static PipelineOperationStatus handlePipelineTemplateInvocation(String username, PipelineTemplateInvocation pipelineTemplateInvocation, PipelineTemplateDescription pipelineTemplateDescription) {
- return new PipelineTemplateInvocationHandler(username, pipelineTemplateInvocation, pipelineTemplateDescription).handlePipelineInvocation();
- }
-
public static PipelineTemplateInvocation getPipelineInvocationTemplate(SpDataStream dataStream, PipelineTemplateDescription pipelineTemplateDescription) {
return new PipelineTemplateInvocationGenerator(dataStream, pipelineTemplateDescription).generateInvocation();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 45540a2..1cb792d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -17,10 +17,12 @@
*/
package org.apache.streampipes.manager.preview;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
import org.apache.streampipes.manager.execution.http.HttpRequestBuilder;
-import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
+import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
@@ -41,14 +43,16 @@ public class PipelinePreview {
public PipelinePreviewModel initiatePreview(Pipeline pipeline) {
String previewId = generatePreviewId();
pipeline.setActions(new ArrayList<>());
- PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
- List<NamedStreamPipesEntity> graphs = new ArrayList<>(new InvocationGraphBuilder(pipelineGraph, previewId).buildGraphs());
- graphs.addAll(pipeline.getStreams().stream().filter(stream -> !(stream instanceof SpDataSet)).collect(Collectors.toList()));
+ List<NamedStreamPipesEntity> pipelineElements = new PipelineVerificationHandlerV2(pipeline)
+ .verifyAndBuildGraphs(true)
+ .stream()
+ .filter(pe -> !(pe instanceof SpDataSet))
+ .collect(Collectors.toList());
- invokeGraphs(filter(graphs));
- storeGraphs(previewId, graphs);
+ invokeGraphs(filter(pipelineElements));
+ storeGraphs(previewId, pipelineElements);
- return makePreviewModel(previewId, graphs);
+ return makePreviewModel(previewId, pipelineElements);
}
public void deletePreview(String previewId) {
@@ -77,12 +81,29 @@ public class PipelinePreview {
}
}
+ private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException {
+ return new ExtensionsServiceEndpointGenerator(
+ g.getAppId(),
+ ExtensionsServiceEndpointUtils.getPipelineElementType(g))
+ .getEndpointResourceUrl();
+ }
+
private void invokeGraphs(List<InvocableStreamPipesEntity> graphs) {
- graphs.forEach(g -> new HttpRequestBuilder(g, g.getBelongsTo(), null).invoke());
+ graphs.forEach(g -> {
+ try {
+ g.setSelectedEndpointUrl(findSelectedEndpoint(g));
+ new HttpRequestBuilder(g, g.getSelectedEndpointUrl(), null).invoke();
+ } catch (NoServiceEndpointsAvailableException e) {
+ e.printStackTrace();
+ }
+ });
}
private void detachGraphs(List<InvocableStreamPipesEntity> graphs) {
- graphs.forEach(g -> new HttpRequestBuilder(g, g.getUri(), null).detach());
+ graphs.forEach(g -> {
+ String endpointUrl = g.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(g.getElementId());
+ new HttpRequestBuilder(g, endpointUrl, null).detach();
+ });
}
private void deleteGraphs(String previewId) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
index f8a04bd..b2c3130 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/AllElementsProvider.java
@@ -40,6 +40,10 @@ public class AllElementsProvider {
.collect(Collectors.toList());
}
+ public List<NamedStreamPipesEntity> getAllElements() {
+ return allElements;
+ }
+
public NamedStreamPipesEntity findElement(String domId) {
return this.allElements
.stream()
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index c23fa00..6781649 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -213,7 +213,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
@PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
public Response update(Pipeline pipeline) {
try {
- return ok(Operations.validatePipeline(pipeline, true));
+ return ok(Operations.validatePipeline(pipeline));
} catch (JsonSyntaxException e) {
return badRequest(new Notification(NotificationType.UNKNOWN_ERROR,
e.getMessage()));