You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/15 20:06:00 UTC

[incubator-streampipes] 02/03: [STREAMPIPES-494] Support edge validation for nodes with multiple inputs

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit bc8471432c8ea18031020794d6349f49131f82cc
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Jan 14 23:24:57 2022 +0100

    [STREAMPIPES-494] Support edge validation for nodes with multiple inputs
---
 .../matching/PipelineModificationGenerator.java      | 20 +++++++++++++++-----
 .../matching/v2/pipeline/ApplyGroundingStep.java     |  4 +++-
 .../matching/v2/pipeline/SchemaValidationStep.java   |  4 +++-
 .../components/pipeline/pipeline.component.html      |  3 ++-
 .../editor/components/pipeline/pipeline.component.ts |  3 ++-
 5 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
index 41f720c..e43d535 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
@@ -22,14 +22,14 @@ public class PipelineModificationGenerator {
 
   private final PipelineGraph pipelineGraph;
   private final Map<String, PipelineModification> pipelineModifications;
-  private List<PipelineEdgeValidation> edgeValidations;
+  private Map<String, PipelineEdgeValidation> edgeValidations;
   private final PipelineValidator pipelineValidator;
 
   public PipelineModificationGenerator(PipelineGraph pipelineGraph) {
     this.pipelineGraph = pipelineGraph;
     this.pipelineModifications = new HashMap<>();
     this.pipelineValidator = new PipelineValidator();
-    this.edgeValidations = new ArrayList<>();
+    this.edgeValidations = new HashMap<>();
   }
 
   public PipelineModificationMessage buildPipelineModificationMessage() {
@@ -42,7 +42,7 @@ public class PipelineModificationGenerator {
 
     PipelineModificationMessage message = new PipelineModificationMessage();
     message.setPipelineModifications(getModifications());
-    message.setEdgeValidations(this.edgeValidations);
+    message.setEdgeValidations(toList(this.edgeValidations));
     return message;
   }
 
@@ -57,10 +57,10 @@ public class PipelineModificationGenerator {
       try {
         pipelineValidator.apply(source, t, targets, validationInfos);
         buildModification(modification, t);
-        edgeValidations.add(PipelineEdgeValidation.complete(source.getDOM(), t.getDOM()));
+        edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.complete(source.getDOM(), t.getDOM()));
       } catch (SpValidationException e) {
         //e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString())));
-        edgeValidations.add(PipelineEdgeValidation.invalid(source.getDOM(), t.getDOM(), toNotifications(e.getErrorLog())));
+        edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.invalid(source.getDOM(), t.getDOM(), toNotifications(e.getErrorLog())));
         modification.setPipelineElementValid(false);
       }
       modification.setValidationInfos(validationInfos);
@@ -70,6 +70,16 @@ public class PipelineModificationGenerator {
     });
   }
 
+  private String makeKey(NamedStreamPipesEntity source,
+                         InvocableStreamPipesEntity t) {
+    return source.getDOM() + "-" + t.getDOM();
+  }
+
+  private List<PipelineEdgeValidation> toList(Map<String,
+          PipelineEdgeValidation> edgeValidations) {
+    return new ArrayList<>(edgeValidations.values());
+  }
+
   private void buildModification(PipelineModification modification,
                                  InvocableStreamPipesEntity t) {
     if (t instanceof DataProcessorInvocation) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
index cc5e54e..21a84f1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java
@@ -62,7 +62,9 @@ public class ApplyGroundingStep extends AbstractPipelineValidationStep {
               .get(getIndex(target))
               .setEventGrounding(selectedGrounding);
 
-      this.visitorHistory.put(target.getDOM(), 1);
+      if (target.getInputStreams().size() > 1) {
+        this.visitorHistory.put(target.getDOM(), 1);
+      }
     }
   }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/SchemaValidationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/SchemaValidationStep.java
index a306383..23fd614 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/SchemaValidationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/SchemaValidationStep.java
@@ -47,7 +47,9 @@ public class SchemaValidationStep extends AbstractPipelineValidationStep {
       target.getInputStreams().get(getIndex(target)).setEventSchema(getSourceSchema(source));
     }
 
-    this.visitorHistory.put(target.getDOM(), 1);
+    if (target.getInputStreams().size() > 1) {
+      this.visitorHistory.put(target.getDOM(), 1);
+    }
   }
 
   private EventSchema getSourceSchema(NamedStreamPipesEntity source) {
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.html b/ui/src/app/editor/components/pipeline/pipeline.component.html
index 571ddc9..206b322 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.html
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.html
@@ -39,7 +39,8 @@
         </div>
         <pipeline-element [pipelineElement]="pipelineElement.payload" [preview]="preview"></pipeline-element>
     </div>
-    <pipeline-element-options *ngIf="!preview" (delete)="handleDeleteOption($event)"
+    <pipeline-element-options *ngIf="!preview && currentMouseOverElement==pipelineElement.payload.dom"
+                              (delete)="handleDeleteOption($event)"
                               (customize)="showCustomizeDialog($event)"
                               [currentMouseOverElement]="currentMouseOverElement"
                               [pipelineValid]="pipelineValid"
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index 05d860d..ac82c26 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -362,7 +362,8 @@ export class PipelineComponent implements OnInit, OnDestroy {
   currentConnectionValid(pe: PipelineElementConfig,
                          targetEdges: PipelineEdgeValidation[]) {
     const entity = pe.payload as InvocablePipelineElementUnion;
-    return entity.streamRequirements.length === targetEdges.length && targetEdges.every(e => e.status.validationStatusType === 'COMPLETE');
+    //return entity.streamRequirements.length === targetEdges.length && targetEdges.every(e => e.status.validationStatusType === 'COMPLETE');
+    return targetEdges.every(e => e.status.validationStatusType === 'COMPLETE');
   }
 
   getTargetEdgeValidations(pipelineModificationMessage: PipelineModificationMessage,