You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/14 12:48:07 UTC
[incubator-streampipes] branch rel/0.67.0 updated: [hotfix] Fix
TransformOutput generators
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.67.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/rel/0.67.0 by this push:
new c50f749 [hotfix] Fix TransformOutput generators
c50f749 is described below
commit c50f7495d034ceef33cb37aeb7d609110f0afb0b
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Sep 14 14:47:37 2020 +0200
[hotfix] Fix TransformOutput generators
---
.../matching/PipelineVerificationHandler.java | 16 +++++++-
.../CustomTransformOutputSchemaGenerator.java | 12 +++---
.../output/TransformOutputSchemaGenerator.java | 43 +++++++---------------
.../rest/impl/AbstractRestInterface.java | 7 ++++
.../rest/impl/PipelineWithUserResource.java | 12 +++---
.../static-mapping-unary.component.ts | 1 +
.../components/pipeline/pipeline.component.ts | 40 ++++++++++++++------
7 files changed, 75 insertions(+), 56 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
index d92dc75..bbdc6bd 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
@@ -183,7 +183,19 @@ public class PipelineVerificationHandler {
}
public List<InvocableStreamPipesEntity> makeInvocationGraphs() {
- PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
- return new InvocationGraphBuilder(pipelineGraph, null).buildGraphs();
+ if (onlyStreamAncestorsPresentInPipeline()) {
+ return new ArrayList<>();
+ } else {
+ PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
+ return new InvocationGraphBuilder(pipelineGraph, null).buildGraphs();
+ }
+ }
+
+ private boolean onlyStreamAncestorsPresentInPipeline() {
+ return rootPipelineElement
+ .getConnectedTo()
+ .stream()
+ .map(connectedTo -> TreeUtils.findSEPAElement(connectedTo, pipeline.getSepas(), pipeline.getStreams()))
+ .allMatch(pe -> pe instanceof SpDataStream);
}
}
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
index f884282..9ea98c9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
@@ -27,7 +27,7 @@ import org.apache.streampipes.model.output.CustomTransformOutputStrategy;
import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.serializers.json.GsonSerializer;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import java.io.IOException;
@@ -58,9 +58,8 @@ public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator<
}
private EventSchema makeRequest() {
- String httpRequestBody = GsonSerializer.getGsonWithIds().toJson(dataProcessorInvocation);
-
try {
+ String httpRequestBody = JacksonSerializer.getObjectMapper().writeValueAsString(dataProcessorInvocation);
Response httpResp = Request.Post(dataProcessorInvocation.getBelongsTo() + "/output").bodyString(httpRequestBody,
ContentType
.APPLICATION_JSON).execute();
@@ -73,10 +72,9 @@ public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator<
private EventSchema handleResponse(Response httpResp) throws JsonSyntaxException, IOException {
String resp = httpResp.returnContent().asString();
- EventSchema outputSchema = GsonSerializer
- .getGsonWithIds()
- .fromJson(resp, EventSchema.class);
- return outputSchema;
+ return JacksonSerializer
+ .getObjectMapper()
+ .readValue(resp, EventSchema.class);
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java
index b069251..269c59f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/TransformOutputSchemaGenerator.java
@@ -26,28 +26,17 @@ import org.apache.streampipes.model.output.TransformOutputStrategy;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
-import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.model.staticproperty.SelectionStaticProperty;
-import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.*;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.sdk.helpers.Tuple2;
import java.net.URI;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.Collectors;
public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<TransformOutputStrategy> {
- private DataProcessorInvocation dataProcessorInvocation;
-
- protected static final String prefix = "urn:streampipes.org:spi:";
+ private List<StaticProperty> staticProperties;
public static TransformOutputSchemaGenerator from(OutputStrategy strategy,
DataProcessorInvocation invocation) {
@@ -57,7 +46,7 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
public TransformOutputSchemaGenerator(TransformOutputStrategy strategy, DataProcessorInvocation
invocation) {
super(strategy);
- this.dataProcessorInvocation = invocation;
+ this.staticProperties = invocation.getStaticProperties();
}
@Override
@@ -68,7 +57,7 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
EventSchema inSchema = stream.getEventSchema();
outputStrategy.getTransformOperations().forEach(to -> {
Optional<MappingPropertyUnary> mappingPropertyOpt = findMappingProperty(to.getMappingPropertyInternalName(),
- dataProcessorInvocation.getStaticProperties());
+ staticProperties);
if (mappingPropertyOpt.isPresent()) {
Optional<EventProperty> eventPropertyOpt = findEventProperty(mappingPropertyOpt.get().getSelectedProperty()
@@ -77,21 +66,16 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
if (eventPropertyOpt.isPresent()) {
EventProperty eventProperty = eventPropertyOpt.get();
- modifiedEventProperties.put(eventProperty.getElementId(), modifyEventProperty(eventProperty, to,
- dataProcessorInvocation.getStaticProperties()));
+ modifiedEventProperties.put(eventProperty.getElementId(), modifyEventProperty(cloneEp(eventProperty), to,
+ staticProperties));
}
}
});
- List<EventProperty> newProperties = inSchema.getEventProperties().stream().map(ep -> {
- if (modifiedEventProperties.containsKey(ep.getElementId())) {
- EventProperty newProperty = modifiedEventProperties.get(ep.getElementId());
- newProperty.setElementId(prefix + UUID.randomUUID().toString());
- return newProperty;
- } else {
- return ep;
- }
- }).collect(Collectors.toList());
+ List<EventProperty> newProperties = inSchema.getEventProperties()
+ .stream()
+ .map(ep -> modifiedEventProperties.getOrDefault(ep.getElementId(), ep))
+ .collect(Collectors.toList());
outSchema.setEventProperties(newProperties);
return makeTuple(outSchema);
@@ -131,7 +115,7 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
private Option findSelected(List<Option> options) {
return options
.stream()
- .filter(o -> o.isSelected())
+ .filter(Option::isSelected)
.findFirst()
.get();
}
@@ -158,7 +142,7 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
}
} else if (transformOperationType == TransformOperationType.DOMAIN_PROPERTY_TRANSFORMATION) {
- eventProperty.setDomainProperties(Arrays.asList(URI.create(value)));
+ eventProperty.setDomainProperties(Collections.singletonList(URI.create(value)));
} else if (transformOperationType == TransformOperationType.RUNTIME_NAME_TRANSFORMATION) {
eventProperty.setRuntimeName(value);
@@ -173,7 +157,6 @@ public class TransformOutputSchemaGenerator extends OutputSchemaGenerator<Transf
return eventProperties
.stream()
.filter(ep -> ep.getRuntimeName().equals(removePrefix(propertySelector)))
- .map(this::cloneEp)
.findFirst();
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AbstractRestInterface.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AbstractRestInterface.java
index cdc7913..066708f 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AbstractRestInterface.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AbstractRestInterface.java
@@ -184,6 +184,13 @@ public abstract class AbstractRestInterface {
.build();
}
+ protected <T> Response serverError(T entity) {
+ return Response
+ .status(500)
+ .entity(entity)
+ .build();
+ }
+
protected StreamPipesJsonLdContainer asContainer(List<? extends AbstractStreamPipesEntity> elements) {
return new StreamPipesJsonLdContainer(elements);
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
index 44d1e9f..6be4d59 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineWithUserResource.java
@@ -207,25 +207,25 @@ public class PipelineWithUserResource extends AbstractRestInterface implements I
try {
return ok(Operations.validatePipeline(pipeline, true, username));
} catch (JsonSyntaxException e) {
- return constructErrorMessage(new Notification(NotificationType.UNKNOWN_ERROR,
+ return badRequest(new Notification(NotificationType.UNKNOWN_ERROR,
e.getMessage()));
} catch (NoMatchingSchemaException e) {
- return constructErrorMessage(new Notification(NotificationType.NO_VALID_CONNECTION,
+ return badRequest(new Notification(NotificationType.NO_VALID_CONNECTION,
e.getMessage()));
} catch (NoMatchingFormatException e) {
- return constructErrorMessage(new Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION,
+ return badRequest(new Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION,
e.getMessage()));
} catch (NoMatchingProtocolException e) {
- return constructErrorMessage(new Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION,
+ return badRequest(new Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION,
e.getMessage()));
} catch (RemoteServerNotAccessibleException | NoMatchingJsonSchemaException e) {
- return constructErrorMessage(new Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE
+ return serverError(new Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE
, e.getMessage()));
} catch (InvalidConnectionException e) {
return badRequest(e.getErrorLog());
} catch (Exception e) {
e.printStackTrace();
- return constructErrorMessage(new Notification(NotificationType.UNKNOWN_ERROR,
+ return serverError(new Notification(NotificationType.UNKNOWN_ERROR,
e.getMessage()));
}
}
diff --git a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
index e885972..4d7bb79 100644
--- a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
@@ -47,6 +47,7 @@ export class StaticMappingUnaryComponent extends StaticMappingComponent<MappingP
.forEach(ep => ep.propertySelector = this.firstStreamPropertySelector + ep.runtimeName);
if (!this.staticProperty.selectedProperty) {
this.staticProperty.selectedProperty = this.availableProperties[0].propertySelector;
+ this.emitUpdate();
}
this.addValidator(this.staticProperty.selectedProperty, Validators.required);
this.enableValidators();
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index d115bc5..6c3d013 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -37,7 +37,7 @@ import {
} from "../../model/editor.model";
import {
CustomOutputStrategy,
- DataProcessorInvocation,
+ DataProcessorInvocation, ErrorMessage,
Pipeline,
SpDataStream
} from "../../../core-model/gen/streampipes-model";
@@ -320,8 +320,12 @@ export class PipelineComponent implements OnInit {
}, status => {
pe.settings.loadingStatus = false;
this.JsplumbBridge.detach(info.connection);
- let matchingResultMessage = (status.error as any[]).map(e => MatchingResultMessage.fromData(e as MatchingResultMessage));
- this.showMatchingErrorDialog(matchingResultMessage);
+ if (Array.isArray(status.error)) {
+ let matchingResultMessage = (status.error as any[]).map(e => MatchingResultMessage.fromData(e as MatchingResultMessage));
+ this.showMatchingErrorDialog(matchingResultMessage);
+ } else {
+ this.showErrorDialog(status.error.title, status.error.description);
+ }
});
}
});
@@ -336,14 +340,16 @@ export class PipelineComponent implements OnInit {
}
modifyPipeline(pipelineModifications) {
- for (var i = 0, modification; modification = pipelineModifications[i]; i++) {
- var id = modification.domId;
- if (id !== "undefined") {
- var pe = this.ObjectProvider.findElement(id, this.rawPipelineModel);
- (pe.payload as InvocablePipelineElementUnion).staticProperties = modification.staticProperties;
- (pe.payload as DataProcessorInvocation).outputStrategies = modification.outputStrategies;
- (pe.payload as InvocablePipelineElementUnion).inputStreams = modification.inputStreams;
- }
+ if (pipelineModifications) {
+ pipelineModifications.forEach(modification => {
+ var id = modification.domId;
+ if (id !== "undefined") {
+ var pe = this.ObjectProvider.findElement(id, this.rawPipelineModel);
+ (pe.payload as InvocablePipelineElementUnion).staticProperties = modification.staticProperties;
+ (pe.payload as DataProcessorInvocation).outputStrategies = modification.outputStrategies;
+ (pe.payload as InvocablePipelineElementUnion).inputStreams = modification.inputStreams;
+ }
+ });
}
}
@@ -368,6 +374,18 @@ export class PipelineComponent implements OnInit {
});
}
+ showErrorDialog(title, description) {
+ this.dialog.open(ConfirmDialogComponent, {
+ width: '500px',
+ data: {
+ "title": title,
+ "subtitle": description,
+ "okTitle": "Ok",
+ "confirmAndCancel": false
+ },
+ });
+ }
+
showMatchingErrorDialog(matchingResultMessage: MatchingResultMessage[]) {
this.dialogService.open(MatchingErrorComponent, {
panelType: PanelType.STANDARD_PANEL,