You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/03/03 20:58:37 UTC

[incubator-streampipes] branch rel/0.69.0 updated: [hotfix] Improve reuse of grounding

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch rel/0.69.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/rel/0.69.0 by this push:
     new 3ebcbba  [hotfix] Improve reuse of grounding
3ebcbba is described below

commit 3ebcbbae508d2939ca6cd760712605b44c572ebe
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Mar 3 21:58:25 2022 +0100

    [hotfix] Improve reuse of grounding
---
 .../manager/matching/ProtocolSelector.java         | 39 +++-------------------
 .../matching/v2/pipeline/ApplyGroundingStep.java   | 12 ++++++-
 2 files changed, 15 insertions(+), 36 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index e11ef71..4c55c3e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -21,10 +21,9 @@ package org.apache.streampipes.manager.matching;
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.manager.util.TopicGenerator;
+import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
@@ -35,46 +34,16 @@ import java.util.Set;
 
 public class ProtocolSelector extends GroundingSelector {
 
-    private String outputTopic;
-    private List<SpProtocol> prioritizedProtocols;
+    private final String outputTopic;
+    private final List<SpProtocol> prioritizedProtocols;
 
     public ProtocolSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
         super(source, targets);
-        this.outputTopic = getTopic(source);
+        this.outputTopic = TopicGenerator.generateRandomTopic();
         this.prioritizedProtocols =
                 BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols();
     }
 
-    private String getTopic(NamedStreamPipesEntity source) {
-        if (source instanceof DataProcessorInvocation) {
-            DataProcessorInvocation invocation = (DataProcessorInvocation) source;
-            if (invocation.getOutputStream() != null) {
-                if (existsGrounding(invocation) && existsTopic(invocation)) {
-                    return invocation
-                            .getOutputStream()
-                            .getEventGrounding()
-                            .getTransportProtocol()
-                            .getTopicDefinition()
-                            .getActualTopicName();
-                }
-            }
-        }
-        return TopicGenerator.generateRandomTopic();
-    }
-
-    private boolean existsTopic(DataProcessorInvocation invocation) {
-        return invocation
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName() != null;
-    }
-
-    private boolean existsGrounding(DataProcessorInvocation invocation) {
-        return invocation.getOutputStream().getEventGrounding() != null;
-    }
-
     public TransportProtocol getPreferredProtocol() {
         if (source instanceof SpDataStream) {
             return ((SpDataStream) source)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
index 21a84f1..6f46bc4 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
@@ -28,11 +28,15 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class ApplyGroundingStep extends AbstractPipelineValidationStep {
 
+  private final Map<String, EventGrounding> sourceGroundingVisitorMap = new HashMap<>();
+
   @Override
   public void apply(NamedStreamPipesEntity source,
                     InvocableStreamPipesEntity target,
@@ -49,7 +53,13 @@ public class ApplyGroundingStep extends AbstractPipelineValidationStep {
     if (!match) {
       throw new SpValidationException(errorLog);
     } else {
-      EventGrounding selectedGrounding = new GroundingBuilder(source, allTargets).getEventGrounding();
+      EventGrounding selectedGrounding;
+      if (!sourceGroundingVisitorMap.containsKey(source.getDOM())) {
+        selectedGrounding = new GroundingBuilder(source, allTargets).getEventGrounding();
+        sourceGroundingVisitorMap.put(source.getDOM(), selectedGrounding);
+      } else {
+        selectedGrounding = new EventGrounding(sourceGroundingVisitorMap.get(source.getDOM()));
+      }
 
       if (source instanceof DataProcessorInvocation) {
         ((DataProcessorInvocation) source)