You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/03/03 10:41:48 UTC

[incubator-streampipes] branch rel/0.69.0 updated: [hotfix] Reuse existing topics

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch rel/0.69.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/rel/0.69.0 by this push:
     new e3d5c68  [hotfix] Reuse existing topics
     new 32bb0b2  Merge branch 'rel/0.69.0' of github.com:apache/incubator-streampipes into rel/0.69.0
e3d5c68 is described below

commit e3d5c68cd4af09e2ef5a71a76d9cb003ab9c35aa
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Mar 3 11:41:19 2022 +0100

    [hotfix] Reuse existing topics
---
 .../manager/matching/ProtocolSelector.java         | 33 +++++++++++++++++++++-
 1 file changed, 32 insertions(+), 1 deletion(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index ec3a891..e11ef71 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.manager.util.TopicGenerator;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
@@ -39,11 +40,41 @@ public class ProtocolSelector extends GroundingSelector {
 
     public ProtocolSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
         super(source, targets);
-        this.outputTopic = TopicGenerator.generateRandomTopic();
+        this.outputTopic = getTopic(source);
         this.prioritizedProtocols =
                 BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols();
     }
 
+    private String getTopic(NamedStreamPipesEntity source) {
+        if (source instanceof DataProcessorInvocation) {
+            DataProcessorInvocation invocation = (DataProcessorInvocation) source;
+            if (invocation.getOutputStream() != null) {
+                if (existsGrounding(invocation) && existsTopic(invocation)) {
+                    return invocation
+                            .getOutputStream()
+                            .getEventGrounding()
+                            .getTransportProtocol()
+                            .getTopicDefinition()
+                            .getActualTopicName();
+                }
+            }
+        }
+        return TopicGenerator.generateRandomTopic();
+    }
+
+    private boolean existsTopic(DataProcessorInvocation invocation) {
+        return invocation
+                .getOutputStream()
+                .getEventGrounding()
+                .getTransportProtocol()
+                .getTopicDefinition()
+                .getActualTopicName() != null;
+    }
+
+    private boolean existsGrounding(DataProcessorInvocation invocation) {
+        return invocation.getOutputStream().getEventGrounding() != null;
+    }
+
     public TransportProtocol getPreferredProtocol() {
         if (source instanceof SpDataStream) {
             return ((SpDataStream) source)