You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 21:16:36 UTC

[incubator-streampipes] 03/03: [hotfix] Properly select transport format based on priority

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 26c6169361e05958c9f2ca5fda7c59a88ee855d6
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 22:16:21 2022 +0100

    [hotfix] Properly select transport format based on priority
---
 .../manager/matching/FormatSelector.java           | 23 +++++++++++++++-------
 1 file changed, 16 insertions(+), 7 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java
index fce4b8a..1616805 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/FormatSelector.java
@@ -20,14 +20,16 @@ package org.apache.streampipes.manager.matching;
 
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.config.backend.SpDataFormat;
+import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.grounding.TransportFormat;
 import org.apache.streampipes.vocabulary.MessageFormat;
 
+import java.net.URI;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class FormatSelector extends GroundingSelector {
 
@@ -46,12 +48,15 @@ public class FormatSelector extends GroundingSelector {
             List<SpDataFormat> prioritizedFormats =
                     BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedFormats();
 
-            return prioritizedFormats
+            List<SpDataFormat> supportedFormats = prioritizedFormats
                     .stream()
-                    .filter(pf -> supportsFormat(pf.getMessageFormat()))
-                    .findFirst()
-                    .map(pf -> new TransportFormat(pf.getMessageFormat()))
-                    .orElse(new TransportFormat(MessageFormat.Json));
+                    .filter(pf -> supportsFormat(pf.getMessageFormat())).collect(Collectors.toList());
+
+            if (supportedFormats.size() > 0) {
+                return new TransportFormat(supportedFormats.get(0).getMessageFormat());
+            } else {
+                return new TransportFormat(MessageFormat.Json);
+            }
         }
     }
 
@@ -63,6 +68,10 @@ public class FormatSelector extends GroundingSelector {
                         .getSupportedGrounding()
                         .getTransportFormats()
                         .stream()
-                        .anyMatch(s -> s.getRdfType().contains(format)));
+                        .anyMatch(s -> rdfTypesAsString(s.getRdfType()).contains(format)));
+    }
+
+    private List<String> rdfTypesAsString(List<URI> uri) {
+        return uri.stream().map(URI::toString).collect(Collectors.toList());
     }
 }