You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/14 12:48:07 UTC

[incubator-streampipes] branch rel/0.67.0 updated: [hotfix] Fix TransformOutput generators

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch rel/0.67.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/rel/0.67.0 by this push:
     new c50f749  [hotfix] Fix TransformOutput generators
c50f749 is described below

commit c50f7495d034ceef33cb37aeb7d609110f0afb0b
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Sep 14 14:47:37 2020 +0200

    [hotfix] Fix TransformOutput generators
---
 .../matching/PipelineVerificationHandler.java      | 16 +++++++-
 .../CustomTransformOutputSchemaGenerator.java      | 12 +++---
 .../output/TransformOutputSchemaGenerator.java     | 43 +++++++---------------
 .../rest/impl/AbstractRestInterface.java           |  7 ++++
 .../rest/impl/PipelineWithUserResource.java        | 12 +++---
 .../static-mapping-unary.component.ts              |  1 +
 .../components/pipeline/pipeline.component.ts      | 40 ++++++++++++++------
 7 files changed, 75 insertions(+), 56 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
index d92dc75..bbdc6bd 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
@@ -183,7 +183,19 @@ public class PipelineVerificationHandler {
   }
 
   public List<InvocableStreamPipesEntity> makeInvocationGraphs() {
-    PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
-    return new InvocationGraphBuilder(pipelineGraph, null).buildGraphs();
+    if (onlyStreamAncestorsPresentInPipeline()) {
+      return new ArrayList<>();
+    } else {
+      PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
+      return new InvocationGraphBuilder(pipelineGraph, null).buildGraphs();
+    }
+  }
+
+  private boolean onlyStreamAncestorsPresentInPipeline() {
+    return rootPipelineElement
+            .getConnectedTo()
+            .stream()
+            .map(connectedTo -> TreeUtils.findSEPAElement(connectedTo, pipeline.getSepas(), pipeline.getStreams()))
+            .allMatch(pe -> pe instanceof SpDataStream);
   }
 }
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
index f884282..9ea98c9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
@@ -27,7 +27,7 @@ import org.apache.streampipes.model.output.CustomTransformOutputStrategy;
 import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.serializers.json.GsonSerializer;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 
 import java.io.IOException;
 
@@ -58,9 +58,8 @@ public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator<
   }
 
   private EventSchema makeRequest() {
-    String httpRequestBody = GsonSerializer.getGsonWithIds().toJson(dataProcessorInvocation);
-
     try {
+      String httpRequestBody = JacksonSerializer.getObjectMapper().writeValueAsString(dataProcessorInvocation);
       Response httpResp = Request.Post(dataProcessorInvocation.getBelongsTo() + "/output").bodyString(httpRequestBody,
               ContentType
                       .APPLICATION_JSON).execute();
@@ -73,10 +72,9 @@ public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator<
 
   private EventSchema handleResponse(Response httpResp) throws JsonSyntaxException, IOException {
     String resp = httpResp.returnContent().asString();
-    EventSchema outputSchema = GsonSerializer
-            .getGsonWithIds()
-            .fromJson(resp, EventSchema.class);
 
-    return outputSchema;
+    return JacksonSerializer
+            .getObjectMapper()
+            .readValue(resp, EventSchema.class);
   }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java
index b069251..269c59f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java
@@ -26,28 +26,17 @@ import org.apache.streampipes.model.output.TransformOutputStrategy;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
-import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.model.staticproperty.SelectionStaticProperty;
-import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.*;
 import org.apache.streampipes.model.util.Cloner;
 import org.apache.streampipes.sdk.helpers.Tuple2;
 
 import java.net.URI;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<TransformOutputStrategy> {
 
-  private DataProcessorInvocation dataProcessorInvocation;
-
-  protected static final String prefix = "urn:streampipes.org:spi:";
+  private  List<StaticProperty> staticProperties;
 
   public static TransformOutputSchemaGenerator from(OutputStrategy strategy,
                                                     DataProcessorInvocation invocation) {
@@ -57,7 +46,7 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
   public TransformOutputSchemaGenerator(TransformOutputStrategy strategy, DataProcessorInvocation
    invocation) {
     super(strategy);
-    this.dataProcessorInvocation = invocation;
+    this.staticProperties = invocation.getStaticProperties();
   }
 
   @Override
@@ -68,7 +57,7 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
     EventSchema inSchema = stream.getEventSchema();
     outputStrategy.getTransformOperations().forEach(to -> {
       Optional<MappingPropertyUnary> mappingPropertyOpt = findMappingProperty(to.getMappingPropertyInternalName(),
-              dataProcessorInvocation.getStaticProperties());
+              staticProperties);
 
       if (mappingPropertyOpt.isPresent()) {
         Optional<EventProperty> eventPropertyOpt = findEventProperty(mappingPropertyOpt.get().getSelectedProperty()
@@ -77,21 +66,16 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
 
         if (eventPropertyOpt.isPresent()) {
           EventProperty eventProperty = eventPropertyOpt.get();
-          modifiedEventProperties.put(eventProperty.getElementId(), modifyEventProperty(eventProperty, to,
-                  dataProcessorInvocation.getStaticProperties()));
+          modifiedEventProperties.put(eventProperty.getElementId(), modifyEventProperty(cloneEp(eventProperty), to,
+                  staticProperties));
         }
       }
     });
 
-    List<EventProperty> newProperties = inSchema.getEventProperties().stream().map(ep -> {
-      if (modifiedEventProperties.containsKey(ep.getElementId())) {
-        EventProperty newProperty = modifiedEventProperties.get(ep.getElementId());
-        newProperty.setElementId(prefix + UUID.randomUUID().toString());
-        return newProperty;
-      } else {
-        return ep;
-      }
-    }).collect(Collectors.toList());
+    List<EventProperty> newProperties = inSchema.getEventProperties()
+            .stream()
+            .map(ep -> modifiedEventProperties.getOrDefault(ep.getElementId(), ep))
+            .collect(Collectors.toList());
 
     outSchema.setEventProperties(newProperties);
     return makeTuple(outSchema);
@@ -131,7 +115,7 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
   private Option findSelected(List<Option> options) {
     return options
             .stream()
-            .filter(o -> o.isSelected())
+            .filter(Option::isSelected)
             .findFirst()
             .get();
   }
@@ -158,7 +142,7 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
       }
 
     } else if (transformOperationType == TransformOperationType.DOMAIN_PROPERTY_TRANSFORMATION) {
-      eventProperty.setDomainProperties(Arrays.asList(URI.create(value)));
+      eventProperty.setDomainProperties(Collections.singletonList(URI.create(value)));
 
     } else if (transformOperationType == TransformOperationType.RUNTIME_NAME_TRANSFORMATION) {
       eventProperty.setRuntimeName(value);
@@ -173,7 +157,6 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
     return eventProperties
             .stream()
             .filter(ep -> ep.getRuntimeName().equals(removePrefix(propertySelector)))
-            .map(this::cloneEp)
             .findFirst();
   }
 
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AbstractRestInterface.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AbstractRestInterface.java
index cdc7913..066708f 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AbstractRestInterface.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AbstractRestInterface.java
@@ -184,6 +184,13 @@ public abstract class AbstractRestInterface {
             .build();
   }
 
+  protected <T> Response serverError(T entity) {
+    return Response
+            .status(500)
+            .entity(entity)
+            .build();
+  }
+
   protected StreamPipesJsonLdContainer asContainer(List<? extends AbstractStreamPipesEntity> elements) {
     return new StreamPipesJsonLdContainer(elements);
   }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
index 44d1e9f..6be4d59 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
@@ -207,25 +207,25 @@ public class PipelineWithUserResource extends AbstractRestInterface implements I
         try {
             return ok(Operations.validatePipeline(pipeline, true, username));
         } catch (JsonSyntaxException e) {
-            return constructErrorMessage(new Notification(NotificationType.UNKNOWN_ERROR,
+            return badRequest(new Notification(NotificationType.UNKNOWN_ERROR,
                     e.getMessage()));
         } catch (NoMatchingSchemaException e) {
-            return constructErrorMessage(new Notification(NotificationType.NO_VALID_CONNECTION,
+            return badRequest(new Notification(NotificationType.NO_VALID_CONNECTION,
                     e.getMessage()));
         } catch (NoMatchingFormatException e) {
-            return constructErrorMessage(new Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION,
+            return badRequest(new Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION,
                     e.getMessage()));
         } catch (NoMatchingProtocolException e) {
-            return constructErrorMessage(new Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION,
+            return badRequest(new Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION,
                     e.getMessage()));
         } catch (RemoteServerNotAccessibleException | NoMatchingJsonSchemaException e) {
-            return constructErrorMessage(new Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE
+            return serverError(new Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE
                     , e.getMessage()));
         } catch (InvalidConnectionException e) {
             return badRequest(e.getErrorLog());
         } catch (Exception e) {
             e.printStackTrace();
-            return constructErrorMessage(new Notification(NotificationType.UNKNOWN_ERROR,
+            return serverError(new Notification(NotificationType.UNKNOWN_ERROR,
                     e.getMessage()));
         }
     }
diff --git a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
index e885972..4d7bb79 100644
--- a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
@@ -47,6 +47,7 @@ export class StaticMappingUnaryComponent extends StaticMappingComponent<MappingP
             .forEach(ep => ep.propertySelector = this.firstStreamPropertySelector + ep.runtimeName);
         if (!this.staticProperty.selectedProperty) {
             this.staticProperty.selectedProperty = this.availableProperties[0].propertySelector;
+            this.emitUpdate();
         }
         this.addValidator(this.staticProperty.selectedProperty, Validators.required);
         this.enableValidators();
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index d115bc5..6c3d013 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -37,7 +37,7 @@ import {
 } from "../../model/editor.model";
 import {
   CustomOutputStrategy,
-  DataProcessorInvocation,
+  DataProcessorInvocation, ErrorMessage,
   Pipeline,
   SpDataStream
 } from "../../../core-model/gen/streampipes-model";
@@ -320,8 +320,12 @@ export class PipelineComponent implements OnInit {
             }, status => {
               pe.settings.loadingStatus = false;
               this.JsplumbBridge.detach(info.connection);
-              let matchingResultMessage = (status.error as any[]).map(e => MatchingResultMessage.fromData(e as MatchingResultMessage));
-              this.showMatchingErrorDialog(matchingResultMessage);
+              if (Array.isArray(status.error)) {
+                let matchingResultMessage = (status.error as any[]).map(e => MatchingResultMessage.fromData(e as MatchingResultMessage));
+                this.showMatchingErrorDialog(matchingResultMessage);
+              } else {
+                this.showErrorDialog(status.error.title, status.error.description);
+              }
             });
       }
     });
@@ -336,14 +340,16 @@ export class PipelineComponent implements OnInit {
   }
 
   modifyPipeline(pipelineModifications) {
-    for (var i = 0, modification; modification = pipelineModifications[i]; i++) {
-      var id = modification.domId;
-      if (id !== "undefined") {
-        var pe = this.ObjectProvider.findElement(id, this.rawPipelineModel);
-        (pe.payload as InvocablePipelineElementUnion).staticProperties = modification.staticProperties;
-        (pe.payload as DataProcessorInvocation).outputStrategies = modification.outputStrategies;
-        (pe.payload as InvocablePipelineElementUnion).inputStreams = modification.inputStreams;
-      }
+    if (pipelineModifications) {
+      pipelineModifications.forEach(modification => {
+        var id = modification.domId;
+        if (id !== "undefined") {
+          var pe = this.ObjectProvider.findElement(id, this.rawPipelineModel);
+          (pe.payload as InvocablePipelineElementUnion).staticProperties = modification.staticProperties;
+          (pe.payload as DataProcessorInvocation).outputStrategies = modification.outputStrategies;
+          (pe.payload as InvocablePipelineElementUnion).inputStreams = modification.inputStreams;
+        }
+      });
     }
   }
 
@@ -368,6 +374,18 @@ export class PipelineComponent implements OnInit {
     });
   }
 
+  showErrorDialog(title, description) {
+    this.dialog.open(ConfirmDialogComponent, {
+      width: '500px',
+      data: {
+        "title": title,
+        "subtitle": description,
+        "okTitle": "Ok",
+        "confirmAndCancel": false
+      },
+    });
+  }
+
   showMatchingErrorDialog(matchingResultMessage: MatchingResultMessage[]) {
     this.dialogService.open(MatchingErrorComponent, {
       panelType: PanelType.STANDARD_PANEL,