You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/13 20:50:08 UTC

[incubator-streampipes] 01/02: [STREAMPIPES-494] Improve behaviour of custom output

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 9b84b05fa79ebb3bec8b26560841972f64d8ec97
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Jan 5 22:00:52 2022 +0100

    [STREAMPIPES-494] Improve behaviour of custom output
---
 .../v2/pipeline/UpdateOutputStrategiesStep.java    | 47 +++++++++++----------
 .../custom-output-strategy.component.ts            | 39 ++++++++---------
 .../components/pipeline/pipeline.component.ts      | 49 ++++++++++------------
 3 files changed, 68 insertions(+), 67 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateOutputStrategiesStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateOutputStrategiesStep.java
index d22ccd3..1818e3a 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateOutputStrategiesStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateOutputStrategiesStep.java
@@ -39,34 +39,39 @@ public class UpdateOutputStrategiesStep extends AbstractPipelineValidationStep {
                     List<PipelineElementValidationInfo> validationInfos) throws SpValidationException {
     List<SpDataStream> inputStreams = target.getInputStreams();
     if (target instanceof DataProcessorInvocation) {
-      ((DataProcessorInvocation) target).getOutputStrategies()
-        .stream()
-        .filter(strategy -> strategy instanceof CustomOutputStrategy)
-        .map(strategy -> (CustomOutputStrategy) strategy)
+      ((DataProcessorInvocation) target)
+        .getOutputStrategies()
         .forEach(strategy -> {
-          PropertySelectorGenerator generator = getGenerator(inputStreams, strategy);
-          strategy.setAvailablePropertyKeys(generator.generateSelectors());
-          // delete selected keys that are not present as available keys
-          List<String> selected = getValidSelectedPropertyKeys(strategy);
-          strategy.setSelectedPropertyKeys(selected);
+          if (strategy instanceof CustomOutputStrategy) {
+            handleCustomOutputStrategy(inputStreams, (CustomOutputStrategy) strategy);
+          }
         });
     }
   }
 
+  private void handleCustomOutputStrategy(List<SpDataStream> inputStreams,
+                                          CustomOutputStrategy strategy) {
+    PropertySelectorGenerator generator = getGenerator(inputStreams, strategy);
+    strategy.setAvailablePropertyKeys(generator.generateSelectors());
+    // delete selected keys that are not present as available keys
+    List<String> selected = getValidSelectedPropertyKeys(strategy);
+    strategy.setSelectedPropertyKeys(selected);
+  }
+
   private PropertySelectorGenerator getGenerator(List<SpDataStream> inputStreams,
                                                  CustomOutputStrategy strategy) {
-   if (inputStreams.size() == 1 || (inputStreams.size() > 1 && !strategy.isOutputRight())) {
-     return new PropertySelectorGenerator(
-             inputStreams.get(0).getEventSchema(),
-             false
-     );
-   } else {
-     return new PropertySelectorGenerator(
-             inputStreams.get(0).getEventSchema(),
-             inputStreams.get(1).getEventSchema(),
-             false
-     );
-   }
+    if (inputStreams.size() == 1 || (inputStreams.size() > 1 && !strategy.isOutputRight())) {
+      return new PropertySelectorGenerator(
+              inputStreams.get(0).getEventSchema(),
+              false
+      );
+    } else {
+      return new PropertySelectorGenerator(
+              inputStreams.get(0).getEventSchema(),
+              inputStreams.get(1).getEventSchema(),
+              false
+      );
+    }
   }
 
   private List<String> getValidSelectedPropertyKeys(CustomOutputStrategy strategy) {
diff --git a/ui/src/app/editor/components/output-strategy/custom-output/custom-output-strategy.component.ts b/ui/src/app/editor/components/output-strategy/custom-output/custom-output-strategy.component.ts
index e39c71a..8d76f46 100644
--- a/ui/src/app/editor/components/output-strategy/custom-output/custom-output-strategy.component.ts
+++ b/ui/src/app/editor/components/output-strategy/custom-output/custom-output-strategy.component.ts
@@ -16,11 +16,11 @@
  *
  */
 
-import {Component, OnInit} from "@angular/core";
-import {CustomOutputStrategy} from "../../../../core-model/gen/streampipes-model";
-import {BaseOutputStrategy} from "../base/BaseOutputStrategy";
-import {PropertySelectorService} from "../../../../services/property-selector.service";
-import {FormControl} from "@angular/forms";
+import { Component, OnInit } from '@angular/core';
+import { CustomOutputStrategy } from '../../../../core-model/gen/streampipes-model';
+import { BaseOutputStrategy } from '../base/BaseOutputStrategy';
+import { PropertySelectorService } from '../../../../services/property-selector.service';
+import { FormControl } from '@angular/forms';
 
 @Component({
   selector: 'custom-output-strategy',
@@ -32,41 +32,42 @@ export class CustomOutputStrategyComponent extends BaseOutputStrategy<CustomOutp
   collectedPropertiesFirstStream: any;
   collectedPropertiesSecondStream: any;
 
-  constructor(private PropertySelectorService: PropertySelectorService) {
+  constructor(private propertySelectorService: PropertySelectorService) {
     super();
   }
 
   ngOnInit() {
-    this.parentForm.addControl("output-strategy", new FormControl());
-    this.collectedPropertiesFirstStream = this.PropertySelectorService
-        .makeProperties(this.getProperties(0), this.outputStrategy.availablePropertyKeys, this.PropertySelectorService.firstStreamPrefix);
-    this.collectedPropertiesSecondStream = this.PropertySelectorService
-        .makeProperties(this.getProperties(1), this.outputStrategy.availablePropertyKeys, this.PropertySelectorService.secondStreamPrefix);
+    this.parentForm.addControl('output-strategy', new FormControl());
+    this.collectedPropertiesFirstStream = this.propertySelectorService
+        .makeProperties(this.getProperties(0), this.outputStrategy.availablePropertyKeys, this.propertySelectorService.firstStreamPrefix);
+    this.collectedPropertiesSecondStream = this.propertySelectorService
+        .makeProperties(this.getProperties(1), this.outputStrategy.availablePropertyKeys, this.propertySelectorService.secondStreamPrefix);
     this.checkFormValidity();
   }
 
   getProperties(streamIndex) {
-    return this.selectedElement.inputStreams[streamIndex] === undefined ? [] : this.selectedElement.inputStreams[streamIndex].eventSchema.eventProperties;
+    return this.selectedElement.inputStreams[streamIndex] === undefined ?
+      [] : this.selectedElement.inputStreams[streamIndex].eventSchema.eventProperties;
   }
 
   selectAll(collectedProperties) {
     collectedProperties.forEach(ep => this.outputStrategy.selectedPropertyKeys.push(ep.runtimeId));
     // This is needed to trigger update of scope
-    this.outputStrategy.selectedPropertyKeys = this.outputStrategy.selectedPropertyKeys.filter(el => {return true;});
-    this.checkFormValidity()
+    this.outputStrategy.selectedPropertyKeys = this.outputStrategy.selectedPropertyKeys.filter(el => true);
+    this.checkFormValidity();
   }
 
   deselectAll(collectedProperties) {
     collectedProperties.forEach(ep => this.outputStrategy.selectedPropertyKeys =
         this.outputStrategy.selectedPropertyKeys.filter(item => item !== ep.runtimeId));
-    this.checkFormValidity()
+    this.checkFormValidity();
   }
 
   checkFormValidity() {
-    if (!this.outputStrategy.selectedPropertyKeys || this.outputStrategy.selectedPropertyKeys.length == 0) {
-      this.parentForm.controls["output-strategy"].setErrors({});
+    if (!this.outputStrategy.selectedPropertyKeys || this.outputStrategy.selectedPropertyKeys.length === 0) {
+      this.parentForm.controls['output-strategy'].setErrors({});
     } else {
-      this.parentForm.controls["output-strategy"].setErrors(undefined);
+      this.parentForm.controls['output-strategy'].setErrors(undefined);
     }
   }
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index e24c7ea..de159ca 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -28,7 +28,7 @@ import {
   DataProcessorInvocation,
   DataSinkInvocation, Notification,
   Pipeline,
-  PipelineCanvasMetadata, PipelineEdgeValidation, PipelineModificationMessage,
+  PipelineCanvasMetadata, PipelineEdgeValidation, PipelineModification, PipelineModificationMessage,
   PipelinePreviewModel,
   SpDataSet,
   SpDataStream
@@ -84,7 +84,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
   @Output()
   pipelineCacheRunningChanged: EventEmitter<boolean> = new EventEmitter<boolean>();
 
-  plumbReady: boolean;
   currentMouseOverElement: string;
   currentPipelineModel: Pipeline;
   idCounter: any;
@@ -109,7 +108,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
               private dialogService: DialogService,
               private dialog: MatDialog,
               private ngZone: NgZone) {
-    this.plumbReady = false;
     this.currentMouseOverElement = '';
     this.currentPipelineModel = new Pipeline();
     this.idCounter = 0;
@@ -135,7 +133,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
   ngOnDestroy() {
     this.deletePipelineElementPreview(false);
     this.jsplumbFactoryService.destroy(this.preview);
-    this.plumbReady = false;
   }
 
   updateMouseover(elementId) {
@@ -247,8 +244,7 @@ export class PipelineComponent implements OnInit, OnDestroy {
         this.validatePipeline();
         this.triggerPipelineCacheUpdate();
       }
-
-    }); // End #assembly.droppable()
+    });
   }
 
   checkTopicModel(pipelineElementConfig: PipelineElementConfig) {
@@ -284,9 +280,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
   }
 
   initPlumb() {
-
-    // this.JsplumbService.prepareJsplumb();
-
     this.JsplumbBridge.unbind(EVENT_CONNECTION);
 
     this.JsplumbBridge.bind(EVENT_CONNECTION_MOVED, (info) => {
@@ -336,7 +329,7 @@ export class PipelineComponent implements OnInit, OnDestroy {
             if (currentConnectionValid) {
               info.targetEndpoint.setType('token');
               this.validatePipeline();
-              this.modifyPipeline(pipelineModificationMessage.pipelineModifications);
+              this.modifyPipeline(pipelineModificationMessage);
               if (this.jsplumbService.isFullyConnected(pe, this.preview)) {
                 const payload = pe.payload as InvocablePipelineElementUnion;
                 if ((payload.staticProperties && payload.staticProperties.length > 0) || this.isCustomOutput(pe)) {
@@ -364,10 +357,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
     window.onresize = () => {
       this.JsplumbBridge.repaintEverything();
     };
-
-    setTimeout(() => {
-      this.plumbReady = true;
-    }, 100);
   }
 
   currentConnectionValid(pe: PipelineElementConfig,
@@ -387,9 +376,9 @@ export class PipelineComponent implements OnInit, OnDestroy {
 
   }
 
-  modifyPipeline(pipelineModifications) {
-    if (pipelineModifications) {
-      pipelineModifications.forEach(modification => {
+  modifyPipeline(pm: PipelineModificationMessage) {
+    if (pm.pipelineModifications) {
+      pm.pipelineModifications.forEach(modification => {
         const id = modification.domId;
         if (id !== 'undefined') {
           const pe = this.objectProvider.findElement(id, this.rawPipelineModel);
@@ -465,17 +454,23 @@ export class PipelineComponent implements OnInit, OnDestroy {
       if (c) {
         pipelineElementConfig.settings.openCustomize = false;
         (pipelineElementConfig.payload as InvocablePipelineElementUnion).configured = true;
-        if (!(pipelineElementConfig.payload instanceof DataSinkInvocation)) {
-          this.JsplumbBridge.activateEndpoint('out-' + pipelineElementConfig.payload.dom, pipelineElementConfig.settings.completed);
-        }
-        this.JsplumbBridge.getSourceEndpoint(pipelineElementConfig.payload.dom).toggleType('token');
-        this.triggerPipelineCacheUpdate();
-        this.announceConfiguredElement(pipelineElementConfig);
-        if (this.previewModeActive) {
-          this.deletePipelineElementPreview(true);
-        }
+        this.currentPipelineModel = this.objectProvider.makePipeline(this.rawPipelineModel);
+        this.objectProvider.updatePipeline(this.currentPipelineModel).subscribe(pm => {
+          this.modifyPipeline(pm);
+          if (!(pipelineElementConfig.payload instanceof DataSinkInvocation)) {
+            this.JsplumbBridge.activateEndpoint('out-' + pipelineElementConfig.payload.dom, pipelineElementConfig.settings.completed);
+          }
+          this.JsplumbBridge.getSourceEndpoint(pipelineElementConfig.payload.dom).toggleType('token');
+          this.triggerPipelineCacheUpdate();
+          this.announceConfiguredElement(pipelineElementConfig);
+          if (this.previewModeActive) {
+            this.deletePipelineElementPreview(true);
+          }
+          this.validatePipeline();
+        });
+      } else {
+        this.validatePipeline();
       }
-      this.validatePipeline();
     });
   }