You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/03/03 20:58:37 UTC
[incubator-streampipes] branch rel/0.69.0 updated: [hotfix] Improve reuse of grounding
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.69.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/rel/0.69.0 by this push:
new 3ebcbba [hotfix] Improve reuse of grounding
3ebcbba is described below
commit 3ebcbbae508d2939ca6cd760712605b44c572ebe
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Mar 3 21:58:25 2022 +0100
[hotfix] Improve reuse of grounding
---
.../manager/matching/ProtocolSelector.java | 39 +++-------------------
.../matching/v2/pipeline/ApplyGroundingStep.java | 12 ++++++-
2 files changed, 15 insertions(+), 36 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index e11ef71..4c55c3e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -21,10 +21,9 @@ package org.apache.streampipes.manager.matching;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.config.backend.SpProtocol;
import org.apache.streampipes.manager.util.TopicGenerator;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
@@ -35,46 +34,16 @@ import java.util.Set;
public class ProtocolSelector extends GroundingSelector {
- private String outputTopic;
- private List<SpProtocol> prioritizedProtocols;
+ private final String outputTopic;
+ private final List<SpProtocol> prioritizedProtocols;
public ProtocolSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
super(source, targets);
- this.outputTopic = getTopic(source);
+ this.outputTopic = TopicGenerator.generateRandomTopic();
this.prioritizedProtocols =
BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols();
}
- private String getTopic(NamedStreamPipesEntity source) {
- if (source instanceof DataProcessorInvocation) {
- DataProcessorInvocation invocation = (DataProcessorInvocation) source;
- if (invocation.getOutputStream() != null) {
- if (existsGrounding(invocation) && existsTopic(invocation)) {
- return invocation
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
- }
- }
- return TopicGenerator.generateRandomTopic();
- }
-
- private boolean existsTopic(DataProcessorInvocation invocation) {
- return invocation
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName() != null;
- }
-
- private boolean existsGrounding(DataProcessorInvocation invocation) {
- return invocation.getOutputStream().getEventGrounding() != null;
- }
-
public TransportProtocol getPreferredProtocol() {
if (source instanceof SpDataStream) {
return ((SpDataStream) source)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
index 21a84f1..6f46bc4 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
@@ -28,11 +28,15 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class ApplyGroundingStep extends AbstractPipelineValidationStep {
+ private final Map<String, EventGrounding> sourceGroundingVisitorMap = new HashMap<>();
+
@Override
public void apply(NamedStreamPipesEntity source,
InvocableStreamPipesEntity target,
@@ -49,7 +53,13 @@ public class ApplyGroundingStep extends AbstractPipelineValidationStep {
if (!match) {
throw new SpValidationException(errorLog);
} else {
- EventGrounding selectedGrounding = new GroundingBuilder(source, allTargets).getEventGrounding();
+ EventGrounding selectedGrounding;
+ if (!sourceGroundingVisitorMap.containsKey(source.getDOM())) {
+ selectedGrounding = new GroundingBuilder(source, allTargets).getEventGrounding();
+ sourceGroundingVisitorMap.put(source.getDOM(), selectedGrounding);
+ } else {
+ selectedGrounding = new EventGrounding(sourceGroundingVisitorMap.get(source.getDOM()));
+ }
if (source instanceof DataProcessorInvocation) {
((DataProcessorInvocation) source)