You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/15 20:06:00 UTC
[incubator-streampipes] 02/03: [STREAMPIPES-494] Support edge validation for nodes with multiple inputs
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit bc8471432c8ea18031020794d6349f49131f82cc
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Jan 14 23:24:57 2022 +0100
[STREAMPIPES-494] Support edge validation for nodes with multiple inputs
---
.../matching/PipelineModificationGenerator.java | 20 +++++++++++++++-----
.../matching/v2/pipeline/ApplyGroundingStep.java | 4 +++-
.../matching/v2/pipeline/SchemaValidationStep.java | 4 +++-
.../components/pipeline/pipeline.component.html | 3 ++-
.../editor/components/pipeline/pipeline.component.ts | 3 ++-
5 files changed, 25 insertions(+), 9 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
index 41f720c..e43d535 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
@@ -22,14 +22,14 @@ public class PipelineModificationGenerator {
private final PipelineGraph pipelineGraph;
private final Map<String, PipelineModification> pipelineModifications;
- private List<PipelineEdgeValidation> edgeValidations;
+ private Map<String, PipelineEdgeValidation> edgeValidations;
private final PipelineValidator pipelineValidator;
public PipelineModificationGenerator(PipelineGraph pipelineGraph) {
this.pipelineGraph = pipelineGraph;
this.pipelineModifications = new HashMap<>();
this.pipelineValidator = new PipelineValidator();
- this.edgeValidations = new ArrayList<>();
+ this.edgeValidations = new HashMap<>();
}
public PipelineModificationMessage buildPipelineModificationMessage() {
@@ -42,7 +42,7 @@ public class PipelineModificationGenerator {
PipelineModificationMessage message = new PipelineModificationMessage();
message.setPipelineModifications(getModifications());
- message.setEdgeValidations(this.edgeValidations);
+ message.setEdgeValidations(toList(this.edgeValidations));
return message;
}
@@ -57,10 +57,10 @@ public class PipelineModificationGenerator {
try {
pipelineValidator.apply(source, t, targets, validationInfos);
buildModification(modification, t);
- edgeValidations.add(PipelineEdgeValidation.complete(source.getDOM(), t.getDOM()));
+ edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.complete(source.getDOM(), t.getDOM()));
} catch (SpValidationException e) {
//e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString())));
- edgeValidations.add(PipelineEdgeValidation.invalid(source.getDOM(), t.getDOM(), toNotifications(e.getErrorLog())));
+ edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.invalid(source.getDOM(), t.getDOM(), toNotifications(e.getErrorLog())));
modification.setPipelineElementValid(false);
}
modification.setValidationInfos(validationInfos);
@@ -70,6 +70,16 @@ public class PipelineModificationGenerator {
});
}
+ private String makeKey(NamedStreamPipesEntity source,
+ InvocableStreamPipesEntity t) {
+ return source.getDOM() + "-" + t.getDOM();
+ }
+
+ private List<PipelineEdgeValidation> toList(Map<String,
+ PipelineEdgeValidation> edgeValidations) {
+ return new ArrayList<>(edgeValidations.values());
+ }
+
private void buildModification(PipelineModification modification,
InvocableStreamPipesEntity t) {
if (t instanceof DataProcessorInvocation) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
index cc5e54e..21a84f1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
@@ -62,7 +62,9 @@ public class ApplyGroundingStep extends AbstractPipelineValidationStep {
.get(getIndex(target))
.setEventGrounding(selectedGrounding);
- this.visitorHistory.put(target.getDOM(), 1);
+ if (target.getInputStreams().size() > 1) {
+ this.visitorHistory.put(target.getDOM(), 1);
+ }
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/SchemaValidationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/SchemaValidationStep.java
index a306383..23fd614 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/SchemaValidationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/SchemaValidationStep.java
@@ -47,7 +47,9 @@ public class SchemaValidationStep extends AbstractPipelineValidationStep {
target.getInputStreams().get(getIndex(target)).setEventSchema(getSourceSchema(source));
}
- this.visitorHistory.put(target.getDOM(), 1);
+ if (target.getInputStreams().size() > 1) {
+ this.visitorHistory.put(target.getDOM(), 1);
+ }
}
private EventSchema getSourceSchema(NamedStreamPipesEntity source) {
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.html b/ui/src/app/editor/components/pipeline/pipeline.component.html
index 571ddc9..206b322 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.html
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.html
@@ -39,7 +39,8 @@
</div>
<pipeline-element [pipelineElement]="pipelineElement.payload" [preview]="preview"></pipeline-element>
</div>
- <pipeline-element-options *ngIf="!preview" (delete)="handleDeleteOption($event)"
+ <pipeline-element-options *ngIf="!preview && currentMouseOverElement==pipelineElement.payload.dom"
+ (delete)="handleDeleteOption($event)"
(customize)="showCustomizeDialog($event)"
[currentMouseOverElement]="currentMouseOverElement"
[pipelineValid]="pipelineValid"
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index 05d860d..ac82c26 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -362,7 +362,8 @@ export class PipelineComponent implements OnInit, OnDestroy {
currentConnectionValid(pe: PipelineElementConfig,
targetEdges: PipelineEdgeValidation[]) {
const entity = pe.payload as InvocablePipelineElementUnion;
- return entity.streamRequirements.length === targetEdges.length && targetEdges.every(e => e.status.validationStatusType === 'COMPLETE');
+ //return entity.streamRequirements.length === targetEdges.length && targetEdges.every(e => e.status.validationStatusType === 'COMPLETE');
+ return targetEdges.every(e => e.status.validationStatusType === 'COMPLETE');
}
getTargetEdgeValidations(pipelineModificationMessage: PipelineModificationMessage,