You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/13 20:50:08 UTC
[incubator-streampipes] 01/02: [STREAMPIPES-494] Improve behaviour of custom output
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 9b84b05fa79ebb3bec8b26560841972f64d8ec97
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Jan 5 22:00:52 2022 +0100
[STREAMPIPES-494] Improve behaviour of custom output
---
.../v2/pipeline/UpdateOutputStrategiesStep.java | 47 +++++++++++----------
.../custom-output-strategy.component.ts | 39 ++++++++---------
.../components/pipeline/pipeline.component.ts | 49 ++++++++++------------
3 files changed, 68 insertions(+), 67 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateOutputStrategiesStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateOutputStrategiesStep.java
index d22ccd3..1818e3a 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateOutputStrategiesStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/UpdateOutputStrategiesStep.java
@@ -39,34 +39,39 @@ public class UpdateOutputStrategiesStep extends AbstractPipelineValidationStep {
List<PipelineElementValidationInfo> validationInfos) throws SpValidationException {
List<SpDataStream> inputStreams = target.getInputStreams();
if (target instanceof DataProcessorInvocation) {
- ((DataProcessorInvocation) target).getOutputStrategies()
- .stream()
- .filter(strategy -> strategy instanceof CustomOutputStrategy)
- .map(strategy -> (CustomOutputStrategy) strategy)
+ ((DataProcessorInvocation) target)
+ .getOutputStrategies()
.forEach(strategy -> {
- PropertySelectorGenerator generator = getGenerator(inputStreams, strategy);
- strategy.setAvailablePropertyKeys(generator.generateSelectors());
- // delete selected keys that are not present as available keys
- List<String> selected = getValidSelectedPropertyKeys(strategy);
- strategy.setSelectedPropertyKeys(selected);
+ if (strategy instanceof CustomOutputStrategy) {
+ handleCustomOutputStrategy(inputStreams, (CustomOutputStrategy) strategy);
+ }
});
}
}
+ private void handleCustomOutputStrategy(List<SpDataStream> inputStreams,
+ CustomOutputStrategy strategy) {
+ PropertySelectorGenerator generator = getGenerator(inputStreams, strategy);
+ strategy.setAvailablePropertyKeys(generator.generateSelectors());
+ // delete selected keys that are not present as available keys
+ List<String> selected = getValidSelectedPropertyKeys(strategy);
+ strategy.setSelectedPropertyKeys(selected);
+ }
+
private PropertySelectorGenerator getGenerator(List<SpDataStream> inputStreams,
CustomOutputStrategy strategy) {
- if (inputStreams.size() == 1 || (inputStreams.size() > 1 && !strategy.isOutputRight())) {
- return new PropertySelectorGenerator(
- inputStreams.get(0).getEventSchema(),
- false
- );
- } else {
- return new PropertySelectorGenerator(
- inputStreams.get(0).getEventSchema(),
- inputStreams.get(1).getEventSchema(),
- false
- );
- }
+ if (inputStreams.size() == 1 || (inputStreams.size() > 1 && !strategy.isOutputRight())) {
+ return new PropertySelectorGenerator(
+ inputStreams.get(0).getEventSchema(),
+ false
+ );
+ } else {
+ return new PropertySelectorGenerator(
+ inputStreams.get(0).getEventSchema(),
+ inputStreams.get(1).getEventSchema(),
+ false
+ );
+ }
}
private List<String> getValidSelectedPropertyKeys(CustomOutputStrategy strategy) {
diff --git a/ui/src/app/editor/components/output-strategy/custom-output/custom-output-strategy.component.ts b/ui/src/app/editor/components/output-strategy/custom-output/custom-output-strategy.component.ts
index e39c71a..8d76f46 100644
--- a/ui/src/app/editor/components/output-strategy/custom-output/custom-output-strategy.component.ts
+++ b/ui/src/app/editor/components/output-strategy/custom-output/custom-output-strategy.component.ts
@@ -16,11 +16,11 @@
*
*/
-import {Component, OnInit} from "@angular/core";
-import {CustomOutputStrategy} from "../../../../core-model/gen/streampipes-model";
-import {BaseOutputStrategy} from "../base/BaseOutputStrategy";
-import {PropertySelectorService} from "../../../../services/property-selector.service";
-import {FormControl} from "@angular/forms";
+import { Component, OnInit } from '@angular/core';
+import { CustomOutputStrategy } from '../../../../core-model/gen/streampipes-model';
+import { BaseOutputStrategy } from '../base/BaseOutputStrategy';
+import { PropertySelectorService } from '../../../../services/property-selector.service';
+import { FormControl } from '@angular/forms';
@Component({
selector: 'custom-output-strategy',
@@ -32,41 +32,42 @@ export class CustomOutputStrategyComponent extends BaseOutputStrategy<CustomOutp
collectedPropertiesFirstStream: any;
collectedPropertiesSecondStream: any;
- constructor(private PropertySelectorService: PropertySelectorService) {
+ constructor(private propertySelectorService: PropertySelectorService) {
super();
}
ngOnInit() {
- this.parentForm.addControl("output-strategy", new FormControl());
- this.collectedPropertiesFirstStream = this.PropertySelectorService
- .makeProperties(this.getProperties(0), this.outputStrategy.availablePropertyKeys, this.PropertySelectorService.firstStreamPrefix);
- this.collectedPropertiesSecondStream = this.PropertySelectorService
- .makeProperties(this.getProperties(1), this.outputStrategy.availablePropertyKeys, this.PropertySelectorService.secondStreamPrefix);
+ this.parentForm.addControl('output-strategy', new FormControl());
+ this.collectedPropertiesFirstStream = this.propertySelectorService
+ .makeProperties(this.getProperties(0), this.outputStrategy.availablePropertyKeys, this.propertySelectorService.firstStreamPrefix);
+ this.collectedPropertiesSecondStream = this.propertySelectorService
+ .makeProperties(this.getProperties(1), this.outputStrategy.availablePropertyKeys, this.propertySelectorService.secondStreamPrefix);
this.checkFormValidity();
}
getProperties(streamIndex) {
- return this.selectedElement.inputStreams[streamIndex] === undefined ? [] : this.selectedElement.inputStreams[streamIndex].eventSchema.eventProperties;
+ return this.selectedElement.inputStreams[streamIndex] === undefined ?
+ [] : this.selectedElement.inputStreams[streamIndex].eventSchema.eventProperties;
}
selectAll(collectedProperties) {
collectedProperties.forEach(ep => this.outputStrategy.selectedPropertyKeys.push(ep.runtimeId));
// This is needed to trigger update of scope
- this.outputStrategy.selectedPropertyKeys = this.outputStrategy.selectedPropertyKeys.filter(el => {return true;});
- this.checkFormValidity()
+ this.outputStrategy.selectedPropertyKeys = this.outputStrategy.selectedPropertyKeys.filter(el => true);
+ this.checkFormValidity();
}
deselectAll(collectedProperties) {
collectedProperties.forEach(ep => this.outputStrategy.selectedPropertyKeys =
this.outputStrategy.selectedPropertyKeys.filter(item => item !== ep.runtimeId));
- this.checkFormValidity()
+ this.checkFormValidity();
}
checkFormValidity() {
- if (!this.outputStrategy.selectedPropertyKeys || this.outputStrategy.selectedPropertyKeys.length == 0) {
- this.parentForm.controls["output-strategy"].setErrors({});
+ if (!this.outputStrategy.selectedPropertyKeys || this.outputStrategy.selectedPropertyKeys.length === 0) {
+ this.parentForm.controls['output-strategy'].setErrors({});
} else {
- this.parentForm.controls["output-strategy"].setErrors(undefined);
+ this.parentForm.controls['output-strategy'].setErrors(undefined);
}
}
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index e24c7ea..de159ca 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -28,7 +28,7 @@ import {
DataProcessorInvocation,
DataSinkInvocation, Notification,
Pipeline,
- PipelineCanvasMetadata, PipelineEdgeValidation, PipelineModificationMessage,
+ PipelineCanvasMetadata, PipelineEdgeValidation, PipelineModification, PipelineModificationMessage,
PipelinePreviewModel,
SpDataSet,
SpDataStream
@@ -84,7 +84,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
@Output()
pipelineCacheRunningChanged: EventEmitter<boolean> = new EventEmitter<boolean>();
- plumbReady: boolean;
currentMouseOverElement: string;
currentPipelineModel: Pipeline;
idCounter: any;
@@ -109,7 +108,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
private dialogService: DialogService,
private dialog: MatDialog,
private ngZone: NgZone) {
- this.plumbReady = false;
this.currentMouseOverElement = '';
this.currentPipelineModel = new Pipeline();
this.idCounter = 0;
@@ -135,7 +133,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
ngOnDestroy() {
this.deletePipelineElementPreview(false);
this.jsplumbFactoryService.destroy(this.preview);
- this.plumbReady = false;
}
updateMouseover(elementId) {
@@ -247,8 +244,7 @@ export class PipelineComponent implements OnInit, OnDestroy {
this.validatePipeline();
this.triggerPipelineCacheUpdate();
}
-
- }); // End #assembly.droppable()
+ });
}
checkTopicModel(pipelineElementConfig: PipelineElementConfig) {
@@ -284,9 +280,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
}
initPlumb() {
-
- // this.JsplumbService.prepareJsplumb();
-
this.JsplumbBridge.unbind(EVENT_CONNECTION);
this.JsplumbBridge.bind(EVENT_CONNECTION_MOVED, (info) => {
@@ -336,7 +329,7 @@ export class PipelineComponent implements OnInit, OnDestroy {
if (currentConnectionValid) {
info.targetEndpoint.setType('token');
this.validatePipeline();
- this.modifyPipeline(pipelineModificationMessage.pipelineModifications);
+ this.modifyPipeline(pipelineModificationMessage);
if (this.jsplumbService.isFullyConnected(pe, this.preview)) {
const payload = pe.payload as InvocablePipelineElementUnion;
if ((payload.staticProperties && payload.staticProperties.length > 0) || this.isCustomOutput(pe)) {
@@ -364,10 +357,6 @@ export class PipelineComponent implements OnInit, OnDestroy {
window.onresize = () => {
this.JsplumbBridge.repaintEverything();
};
-
- setTimeout(() => {
- this.plumbReady = true;
- }, 100);
}
currentConnectionValid(pe: PipelineElementConfig,
@@ -387,9 +376,9 @@ export class PipelineComponent implements OnInit, OnDestroy {
}
- modifyPipeline(pipelineModifications) {
- if (pipelineModifications) {
- pipelineModifications.forEach(modification => {
+ modifyPipeline(pm: PipelineModificationMessage) {
+ if (pm.pipelineModifications) {
+ pm.pipelineModifications.forEach(modification => {
const id = modification.domId;
if (id !== 'undefined') {
const pe = this.objectProvider.findElement(id, this.rawPipelineModel);
@@ -465,17 +454,23 @@ export class PipelineComponent implements OnInit, OnDestroy {
if (c) {
pipelineElementConfig.settings.openCustomize = false;
(pipelineElementConfig.payload as InvocablePipelineElementUnion).configured = true;
- if (!(pipelineElementConfig.payload instanceof DataSinkInvocation)) {
- this.JsplumbBridge.activateEndpoint('out-' + pipelineElementConfig.payload.dom, pipelineElementConfig.settings.completed);
- }
- this.JsplumbBridge.getSourceEndpoint(pipelineElementConfig.payload.dom).toggleType('token');
- this.triggerPipelineCacheUpdate();
- this.announceConfiguredElement(pipelineElementConfig);
- if (this.previewModeActive) {
- this.deletePipelineElementPreview(true);
- }
+ this.currentPipelineModel = this.objectProvider.makePipeline(this.rawPipelineModel);
+ this.objectProvider.updatePipeline(this.currentPipelineModel).subscribe(pm => {
+ this.modifyPipeline(pm);
+ if (!(pipelineElementConfig.payload instanceof DataSinkInvocation)) {
+ this.JsplumbBridge.activateEndpoint('out-' + pipelineElementConfig.payload.dom, pipelineElementConfig.settings.completed);
+ }
+ this.JsplumbBridge.getSourceEndpoint(pipelineElementConfig.payload.dom).toggleType('token');
+ this.triggerPipelineCacheUpdate();
+ this.announceConfiguredElement(pipelineElementConfig);
+ if (this.previewModeActive) {
+ this.deletePipelineElementPreview(true);
+ }
+ this.validatePipeline();
+ });
+ } else {
+ this.validatePipeline();
}
- this.validatePipeline();
});
}