You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/03/03 10:41:48 UTC
[incubator-streampipes] branch rel/0.69.0 updated: [hotfix] Reuse existing topics
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.69.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/rel/0.69.0 by this push:
new e3d5c68 [hotfix] Reuse existing topics
new 32bb0b2 Merge branch 'rel/0.69.0' of github.com:apache/incubator-streampipes into rel/0.69.0
e3d5c68 is described below
commit e3d5c68cd4af09e2ef5a71a76d9cb003ab9c35aa
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Mar 3 11:41:19 2022 +0100
[hotfix] Reuse existing topics
---
.../manager/matching/ProtocolSelector.java | 33 +++++++++++++++++++++-
1 file changed, 32 insertions(+), 1 deletion(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index ec3a891..e11ef71 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.manager.util.TopicGenerator;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
@@ -39,11 +40,41 @@ public class ProtocolSelector extends GroundingSelector {
public ProtocolSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
super(source, targets);
- this.outputTopic = TopicGenerator.generateRandomTopic();
+ this.outputTopic = getTopic(source);
this.prioritizedProtocols =
BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols();
}
+ private String getTopic(NamedStreamPipesEntity source) {
+ if (source instanceof DataProcessorInvocation) {
+ DataProcessorInvocation invocation = (DataProcessorInvocation) source;
+ if (invocation.getOutputStream() != null) {
+ if (existsGrounding(invocation) && existsTopic(invocation)) {
+ return invocation
+ .getOutputStream()
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
+ }
+ }
+ }
+ return TopicGenerator.generateRandomTopic();
+ }
+
+ private boolean existsTopic(DataProcessorInvocation invocation) {
+ return invocation
+ .getOutputStream()
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName() != null;
+ }
+
+ private boolean existsGrounding(DataProcessorInvocation invocation) {
+ return invocation.getOutputStream().getEventGrounding() != null;
+ }
+
public TransportProtocol getPreferredProtocol() {
if (source instanceof SpDataStream) {
return ((SpDataStream) source)