You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/08/25 19:59:03 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-208] Add
mapping property support for collections to SDK and UI
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new dd39bfa [STREAMPIPES-208] Add mapping property support for collections to SDK and UI
dd39bfa is described below
commit dd39bfa2cbf4412113b630bf85f211585102138d
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue Aug 25 21:58:48 2020 +0200
[STREAMPIPES-208] Add mapping property support for collections to SDK and UI
---
.../matching/PipelineVerificationHandler.java | 53 ++++++++++++----------
.../apache/streampipes/sdk/StaticProperties.java | 16 ++++++-
.../builder/AbstractProcessingElementBuilder.java | 2 +-
.../sdk/extractor/AbstractParameterExtractor.java | 17 +++++--
.../streampipes/sdk/helpers/EpRequirements.java | 5 ++
.../sdk/helpers/RequirementsSelector.java | 36 +++++++++++++++
.../static-collection.component.ts | 9 +++-
.../static-mapping-unary.component.html | 2 +-
.../static-property.component.html | 3 ++
9 files changed, 112 insertions(+), 31 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
index acc5976..75f9cf8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
@@ -32,13 +32,14 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.client.connection.Connection;
import org.apache.streampipes.model.client.exception.InvalidConnectionException;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineModification;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
import org.apache.streampipes.model.constants.PropertySelectorConstants;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.message.PipelineModificationMessage;
import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineModification;
import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
import org.apache.streampipes.model.staticproperty.MappingProperty;
import org.apache.streampipes.storage.management.StorageDispatcher;
@@ -50,11 +51,11 @@ public class PipelineVerificationHandler {
private Pipeline pipeline;
private PipelineModificationMessage pipelineModificationMessage;
private List<InvocableStreamPipesEntity> invocationGraphs;
- private InvocableStreamPipesEntity rdfRootElement;
+ private InvocableStreamPipesEntity rootPipelineElement;
public PipelineVerificationHandler(Pipeline pipeline) throws NoSepaInPipelineException {
this.pipeline = pipeline;
- this.rdfRootElement = PipelineVerificationUtils.getRootNode(pipeline);
+ this.rootPipelineElement = PipelineVerificationUtils.getRootNode(pipeline);
this.invocationGraphs = new ArrayList<>();
this.pipelineModificationMessage = new PipelineModificationMessage();
}
@@ -63,8 +64,8 @@ public class PipelineVerificationHandler {
ElementVerification verifier = new ElementVerification();
boolean verified = true;
- InvocableStreamPipesEntity rightElement = rdfRootElement;
- List<String> connectedTo = rdfRootElement.getConnectedTo();
+ InvocableStreamPipesEntity rightElement = rootPipelineElement;
+ List<String> connectedTo = rootPipelineElement.getConnectedTo();
for (String domId : connectedTo) {
NamedStreamPipesEntity element = TreeUtils.findSEPAElement(domId, pipeline.getSepas(), pipeline.getStreams());
@@ -95,13 +96,13 @@ public class PipelineVerificationHandler {
* @return PipelineValidationHandler
*/
public PipelineVerificationHandler computeMappingProperties() {
- List<String> connectedTo = rdfRootElement.getConnectedTo();
- String domId = rdfRootElement.getDOM();
+ List<String> connectedTo = rootPipelineElement.getConnectedTo();
+ String domId = rootPipelineElement.getDOM();
List<SpDataStream> tempStreams = new ArrayList<>();
for (int i = 0; i < connectedTo.size(); i++) {
- NamedStreamPipesEntity element = TreeUtils.findSEPAElement(rdfRootElement
+ NamedStreamPipesEntity element = TreeUtils.findSEPAElement(rootPipelineElement
.getConnectedTo().get(i), pipeline.getSepas(), pipeline
.getStreams());
@@ -118,16 +119,16 @@ public class PipelineVerificationHandler {
}
tempStreams.add(incomingStream);
- if (rdfRootElement.getStreamRequirements().size() - 1 == i) {
+ if (rootPipelineElement.getStreamRequirements().size() - 1 == i) {
updateStaticProperties(tempStreams);
PipelineModification modification = new PipelineModification(
domId,
- rdfRootElement.getElementId(),
- rdfRootElement.getStaticProperties());
+ rootPipelineElement.getElementId(),
+ rootPipelineElement.getStaticProperties());
modification.setInputStreams(tempStreams);
updateOutputStrategy(tempStreams);
- if (rdfRootElement instanceof DataProcessorInvocation) {
- modification.setOutputStrategies(((DataProcessorInvocation) rdfRootElement).getOutputStrategies());
+ if (rootPipelineElement instanceof DataProcessorInvocation) {
+ modification.setOutputStrategies(((DataProcessorInvocation) rootPipelineElement).getOutputStrategies());
}
pipelineModificationMessage.addPipelineModification(modification);
}
@@ -138,13 +139,19 @@ public class PipelineVerificationHandler {
private void updateStaticProperties(List<SpDataStream> inputStreams) {
- rdfRootElement
+ rootPipelineElement
.getStaticProperties()
.stream()
- .filter(property -> property instanceof MappingProperty)
+ .filter(property -> (property instanceof MappingProperty
+ || ((property instanceof CollectionStaticProperty) && ((CollectionStaticProperty) property).getStaticPropertyTemplate() instanceof MappingProperty)))
.forEach(property -> {
+ MappingProperty mappingProperty;
- MappingProperty mappingProperty = (MappingProperty) property;
+ if (property instanceof MappingProperty) {
+ mappingProperty = (MappingProperty) property;
+ } else {
+ mappingProperty = (MappingProperty) ((CollectionStaticProperty) property).getStaticPropertyTemplate();
+ }
if (!mappingProperty.getRequirementSelector().equals("")) {
mappingProperty.setMapsFromOptions(generateSelectorsFromRequirement
@@ -162,7 +169,7 @@ public class PipelineVerificationHandler {
(requirementSelector);
EventProperty propertyRequirement = selector.findPropertyRequirement
- (rdfRootElement.getStreamRequirements());
+ (rootPipelineElement.getStreamRequirements());
SpDataStream inputStream = selector.getAffectedStream(inputStreams);
List<String> availablePropertySelectors = new PropertySelectorGenerator(inputStream
@@ -189,8 +196,8 @@ public class PipelineVerificationHandler {
private void updateOutputStrategy(List<SpDataStream> inputStreams) {
- if (rdfRootElement instanceof DataProcessorInvocation) {
- ((DataProcessorInvocation) rdfRootElement)
+ if (rootPipelineElement instanceof DataProcessorInvocation) {
+ ((DataProcessorInvocation) rootPipelineElement)
.getOutputStrategies()
.stream()
.filter(strategy -> strategy instanceof CustomOutputStrategy)
@@ -211,7 +218,7 @@ public class PipelineVerificationHandler {
}
public PipelineVerificationHandler storeConnection() {
- String fromId = rdfRootElement.getConnectedTo().get(rdfRootElement.getConnectedTo().size() - 1);
+ String fromId = rootPipelineElement.getConnectedTo().get(rootPipelineElement.getConnectedTo().size() - 1);
NamedStreamPipesEntity sepaElement = TreeUtils.findSEPAElement(fromId, pipeline.getSepas(), pipeline.getStreams());
String sourceId;
if (sepaElement instanceof SpDataStream) {
@@ -219,7 +226,7 @@ public class PipelineVerificationHandler {
} else {
sourceId = ((InvocableStreamPipesEntity) sepaElement).getBelongsTo();
}
- Connection connection = new Connection(sourceId, rdfRootElement.getBelongsTo());
+ Connection connection = new Connection(sourceId, rootPipelineElement.getBelongsTo());
StorageDispatcher.INSTANCE.getNoSqlStore().getConnectionStorageApi().addConnection(connection);
return this;
}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
index 762f923..034c4f0 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
@@ -18,8 +18,10 @@
package org.apache.streampipes.sdk;
+import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.model.staticproperty.*;
import org.apache.streampipes.sdk.helpers.Label;
+import org.apache.streampipes.sdk.helpers.RequirementsSelector;
import org.apache.streampipes.sdk.utils.Datatypes;
import java.net.URI;
@@ -28,6 +30,18 @@ import java.util.List;
public class StaticProperties {
+ public static MappingPropertyUnary mappingPropertyUnary(Label label, RequirementsSelector requirementsSelector, PropertyScope propertyScope) {
+ MappingPropertyUnary mp = new MappingPropertyUnary(label.getInternalId(), label
+ .getInternalId(),
+ label.getLabel(),
+ label.getDescription());
+
+ mp.setRequirementSelector(requirementsSelector.toSelector(label.getInternalId()));
+ mp.setPropertyScope(propertyScope.name());
+
+ return mp;
+ }
+
public static FreeTextStaticProperty stringFreeTextProperty(Label label) {
return freeTextProperty(label, Datatypes.String);
}
@@ -118,7 +132,7 @@ public class StaticProperties {
setHorizontalRendering(staticProperty);
}
- if (sp.length > 0) {
+ if (sp.length > 1) {
StaticPropertyGroup group = StaticProperties.group(label);
group.setHorizontalRendering(true);
group.setStaticProperties(Arrays.asList(sp));
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
index 0e66770..8b12102 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
@@ -94,7 +94,7 @@ public abstract class AbstractProcessingElementBuilder<BU extends
}
private List<MappingProperty> rewrite(List<MappingProperty> mappingProperties, int index) {
- mappingProperties.stream().forEach(mp -> mp.setRequirementSelector
+ mappingProperties.forEach(mp -> mp.setRequirementSelector
(getIndex(index) + PropertySelectorConstants.PROPERTY_DELIMITER + mp
.getRequirementSelector()));
return mappingProperties;
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
index ab2c51d..15b0f47 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
@@ -33,10 +33,7 @@ import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.model.staticproperty.*;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
import java.util.stream.Collectors;
public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesEntity> {
@@ -153,6 +150,7 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
return collection
.getMembers()
.stream()
+ .sorted(Comparator.comparingInt(StaticProperty::getIndex))
.map(sp -> (FreeTextStaticProperty) sp)
.map(FreeTextStaticProperty::getValue)
.map(v -> typeParser.parse(v, targetClass))
@@ -211,6 +209,17 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
return getPropertySelectorFromUnaryMapping(staticPropertyName);
}
+ public List<String> getUnaryMappingsFromCollection(String collectionStaticPropertyName) {
+ CollectionStaticProperty collection = getStaticPropertyByName(collectionStaticPropertyName, CollectionStaticProperty.class);
+ return collection
+ .getMembers()
+ .stream()
+ .sorted(Comparator.comparingInt(StaticProperty::getIndex))
+ .map(sp -> (MappingPropertyUnary) sp)
+ .map(MappingPropertyUnary::getSelectedProperty)
+ .collect(Collectors.toList());
+ }
+
public List<String> mappingPropertyValues(String staticPropertyName) {
return getPropertySelectorsFromNaryMapping(staticPropertyName);
}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/EpRequirements.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/EpRequirements.java
index 47b4fb9..7f633cb 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/EpRequirements.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/EpRequirements.java
@@ -33,6 +33,11 @@ public class EpRequirements {
return new EventPropertyList();
}
+ public static EventProperty withMappingPropertyId(String internalId, EventProperty requirement) {
+ requirement.setRuntimeName(internalId);
+ return requirement;
+ }
+
public static EventPropertyList nestedListRequirement() {
EventPropertyList listEp = new EventPropertyList();
listEp.setEventProperty(new EventPropertyNested());
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/RequirementsSelector.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/RequirementsSelector.java
new file mode 100644
index 0000000..5292473
--- /dev/null
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/RequirementsSelector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.sdk.helpers;
+
+import org.apache.streampipes.model.constants.PropertySelectorConstants;
+
+public enum RequirementsSelector {
+
+ FIRST_INPUT_STREAM(PropertySelectorConstants.FIRST_REQUIREMENT_PREFIX),
+ SECOND_INPUT_STREAM(PropertySelectorConstants.SECOND_REQUIREMENT_PREFIX);
+
+ private String requirementPrefix;
+
+ RequirementsSelector(String prefix) {
+ this.requirementPrefix = prefix;
+ }
+
+ public String toSelector(String internalId) {
+ return requirementPrefix + PropertySelectorConstants.PROPERTY_DELIMITER + internalId;
+ }
+}
diff --git a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts
index fe4d118..d8b3636 100644
--- a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts
@@ -38,7 +38,6 @@ export class StaticCollectionComponent
ngOnInit() {
}
-
emitUpdate(valid) {
this.updateEmitter.emit(new ConfigurationInfo(this.staticProperty.internalName, valid));
}
@@ -49,10 +48,18 @@ export class StaticCollectionComponent
}
let clone = this.staticPropertyUtil.clone(this.staticProperty.staticPropertyTemplate);
this.staticProperty.members.push(clone);
+ this.updateIndex();
}
remove(i) {
this.staticProperty.members.splice(i,1).slice(0);
+ this.updateIndex();
+ }
+
+ updateIndex() {
+ this.staticProperty.members.forEach((property, index) => {
+ property.index = index;
+ })
}
onStatusChange(status: any) {
diff --git a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.html b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.html
index 3075fcf..84bfde6 100644
--- a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.html
+++ b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.html
@@ -20,7 +20,7 @@
<div fxFlex="100" fxLayout="row">
<mat-form-field class="example-full-width">
<!--<mat-form-field class="example-full-width">-->
- <mat-select formControlName="{{staticProperty.internalName}}" [placeholder]="staticProperty.label">
+ <mat-select formControlName="{{fieldName}}" [placeholder]="staticProperty.label">
<mat-option *ngFor="let property of availableProperties | displayRecommendedPipe: staticProperty.propertyScope: displayRecommended" [value]="property.propertySelector">
{{property.runtimeName}}
</mat-option>
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html
index ecd7b49..e86d68f 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.html
+++ b/ui/src/app/core-ui/static-properties/static-property.component.html
@@ -97,6 +97,8 @@
<app-static-mapping-nary *ngIf="isMappingNaryProperty(staticProperty)"
[eventSchemas]="eventSchemas"
+ [parentForm]="parentForm"
+ [fieldName]="fieldName"
[staticProperty]="staticProperty"
[displayRecommended]="displayRecommended"
(inputEmitter)="valueChange($event)">
@@ -126,6 +128,7 @@
[adapterId]="adapterId"
[eventSchemas]="eventSchemas"
[parentForm]="parentForm"
+ [displayRecommended]="displayRecommended"
[staticProperty]="staticProperty" class="test fullWidth"
(updateEmitter)="emitUpdate($event)">
</app-static-collection>