You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/08/25 19:59:03 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-208] Add mapping property support for collections to SDK and UI

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new dd39bfa  [STREAMPIPES-208] Add mapping property support for collections to SDK and UI
dd39bfa is described below

commit dd39bfa2cbf4412113b630bf85f211585102138d
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue Aug 25 21:58:48 2020 +0200

    [STREAMPIPES-208] Add mapping property support for collections to SDK and UI
---
 .../matching/PipelineVerificationHandler.java      | 53 ++++++++++++----------
 .../apache/streampipes/sdk/StaticProperties.java   | 16 ++++++-
 .../builder/AbstractProcessingElementBuilder.java  |  2 +-
 .../sdk/extractor/AbstractParameterExtractor.java  | 17 +++++--
 .../streampipes/sdk/helpers/EpRequirements.java    |  5 ++
 .../sdk/helpers/RequirementsSelector.java          | 36 +++++++++++++++
 .../static-collection.component.ts                 |  9 +++-
 .../static-mapping-unary.component.html            |  2 +-
 .../static-property.component.html                 |  3 ++
 9 files changed, 112 insertions(+), 31 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
index acc5976..75f9cf8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandler.java
@@ -32,13 +32,14 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.client.connection.Connection;
 import org.apache.streampipes.model.client.exception.InvalidConnectionException;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineModification;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
 import org.apache.streampipes.model.constants.PropertySelectorConstants;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.message.PipelineModificationMessage;
 import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineModification;
 import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
 import org.apache.streampipes.model.staticproperty.MappingProperty;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
@@ -50,11 +51,11 @@ public class PipelineVerificationHandler {
   private Pipeline pipeline;
   private PipelineModificationMessage pipelineModificationMessage;
   private List<InvocableStreamPipesEntity> invocationGraphs;
-  private InvocableStreamPipesEntity rdfRootElement;
+  private InvocableStreamPipesEntity rootPipelineElement;
 
   public PipelineVerificationHandler(Pipeline pipeline) throws NoSepaInPipelineException {
     this.pipeline = pipeline;
-    this.rdfRootElement = PipelineVerificationUtils.getRootNode(pipeline);
+    this.rootPipelineElement = PipelineVerificationUtils.getRootNode(pipeline);
     this.invocationGraphs = new ArrayList<>();
     this.pipelineModificationMessage = new PipelineModificationMessage();
   }
@@ -63,8 +64,8 @@ public class PipelineVerificationHandler {
 
     ElementVerification verifier = new ElementVerification();
     boolean verified = true;
-    InvocableStreamPipesEntity rightElement = rdfRootElement;
-    List<String> connectedTo = rdfRootElement.getConnectedTo();
+    InvocableStreamPipesEntity rightElement = rootPipelineElement;
+    List<String> connectedTo = rootPipelineElement.getConnectedTo();
 
     for (String domId : connectedTo) {
       NamedStreamPipesEntity element = TreeUtils.findSEPAElement(domId, pipeline.getSepas(), pipeline.getStreams());
@@ -95,13 +96,13 @@ public class PipelineVerificationHandler {
    * @return PipelineValidationHandler
    */
   public PipelineVerificationHandler computeMappingProperties() {
-    List<String> connectedTo = rdfRootElement.getConnectedTo();
-    String domId = rdfRootElement.getDOM();
+    List<String> connectedTo = rootPipelineElement.getConnectedTo();
+    String domId = rootPipelineElement.getDOM();
 
     List<SpDataStream> tempStreams = new ArrayList<>();
 
     for (int i = 0; i < connectedTo.size(); i++) {
-      NamedStreamPipesEntity element = TreeUtils.findSEPAElement(rdfRootElement
+      NamedStreamPipesEntity element = TreeUtils.findSEPAElement(rootPipelineElement
               .getConnectedTo().get(i), pipeline.getSepas(), pipeline
               .getStreams());
 
@@ -118,16 +119,16 @@ public class PipelineVerificationHandler {
         }
 
         tempStreams.add(incomingStream);
-        if (rdfRootElement.getStreamRequirements().size() - 1 == i) {
+        if (rootPipelineElement.getStreamRequirements().size() - 1 == i) {
           updateStaticProperties(tempStreams);
           PipelineModification modification = new PipelineModification(
                   domId,
-                  rdfRootElement.getElementId(),
-                  rdfRootElement.getStaticProperties());
+                  rootPipelineElement.getElementId(),
+                  rootPipelineElement.getStaticProperties());
           modification.setInputStreams(tempStreams);
           updateOutputStrategy(tempStreams);
-          if (rdfRootElement instanceof DataProcessorInvocation) {
-            modification.setOutputStrategies(((DataProcessorInvocation) rdfRootElement).getOutputStrategies());
+          if (rootPipelineElement instanceof DataProcessorInvocation) {
+            modification.setOutputStrategies(((DataProcessorInvocation) rootPipelineElement).getOutputStrategies());
           }
           pipelineModificationMessage.addPipelineModification(modification);
         }
@@ -138,13 +139,19 @@ public class PipelineVerificationHandler {
 
   private void updateStaticProperties(List<SpDataStream> inputStreams) {
 
-    rdfRootElement
+    rootPipelineElement
             .getStaticProperties()
             .stream()
-            .filter(property -> property instanceof MappingProperty)
+            .filter(property -> (property instanceof MappingProperty
+                    || ((property instanceof CollectionStaticProperty) && ((CollectionStaticProperty) property).getStaticPropertyTemplate() instanceof MappingProperty)))
             .forEach(property -> {
+              MappingProperty mappingProperty;
 
-              MappingProperty mappingProperty = (MappingProperty) property;
+              if (property instanceof MappingProperty) {
+                mappingProperty = (MappingProperty) property;
+              } else {
+                mappingProperty = (MappingProperty) ((CollectionStaticProperty) property).getStaticPropertyTemplate();
+              }
 
               if (!mappingProperty.getRequirementSelector().equals("")) {
                 mappingProperty.setMapsFromOptions(generateSelectorsFromRequirement
@@ -162,7 +169,7 @@ public class PipelineVerificationHandler {
             (requirementSelector);
 
     EventProperty propertyRequirement = selector.findPropertyRequirement
-            (rdfRootElement.getStreamRequirements());
+            (rootPipelineElement.getStreamRequirements());
     SpDataStream inputStream = selector.getAffectedStream(inputStreams);
 
     List<String> availablePropertySelectors = new PropertySelectorGenerator(inputStream
@@ -189,8 +196,8 @@ public class PipelineVerificationHandler {
 
   private void updateOutputStrategy(List<SpDataStream> inputStreams) {
 
-    if (rdfRootElement instanceof DataProcessorInvocation) {
-      ((DataProcessorInvocation) rdfRootElement)
+    if (rootPipelineElement instanceof DataProcessorInvocation) {
+      ((DataProcessorInvocation) rootPipelineElement)
               .getOutputStrategies()
               .stream()
               .filter(strategy -> strategy instanceof CustomOutputStrategy)
@@ -211,7 +218,7 @@ public class PipelineVerificationHandler {
   }
 
   public PipelineVerificationHandler storeConnection() {
-    String fromId = rdfRootElement.getConnectedTo().get(rdfRootElement.getConnectedTo().size() - 1);
+    String fromId = rootPipelineElement.getConnectedTo().get(rootPipelineElement.getConnectedTo().size() - 1);
     NamedStreamPipesEntity sepaElement = TreeUtils.findSEPAElement(fromId, pipeline.getSepas(), pipeline.getStreams());
     String sourceId;
     if (sepaElement instanceof SpDataStream) {
@@ -219,7 +226,7 @@ public class PipelineVerificationHandler {
     } else {
       sourceId = ((InvocableStreamPipesEntity) sepaElement).getBelongsTo();
     }
-    Connection connection = new Connection(sourceId, rdfRootElement.getBelongsTo());
+    Connection connection = new Connection(sourceId, rootPipelineElement.getBelongsTo());
     StorageDispatcher.INSTANCE.getNoSqlStore().getConnectionStorageApi().addConnection(connection);
     return this;
   }
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
index 762f923..034c4f0 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
@@ -18,8 +18,10 @@
 
 package org.apache.streampipes.sdk;
 
+import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.model.staticproperty.*;
 import org.apache.streampipes.sdk.helpers.Label;
+import org.apache.streampipes.sdk.helpers.RequirementsSelector;
 import org.apache.streampipes.sdk.utils.Datatypes;
 
 import java.net.URI;
@@ -28,6 +30,18 @@ import java.util.List;
 
 public class StaticProperties {
 
+  public static MappingPropertyUnary mappingPropertyUnary(Label label, RequirementsSelector requirementsSelector, PropertyScope propertyScope) {
+    MappingPropertyUnary mp = new MappingPropertyUnary(label.getInternalId(), label
+            .getInternalId(),
+            label.getLabel(),
+            label.getDescription());
+
+    mp.setRequirementSelector(requirementsSelector.toSelector(label.getInternalId()));
+    mp.setPropertyScope(propertyScope.name());
+
+    return mp;
+  }
+
   public static FreeTextStaticProperty stringFreeTextProperty(Label label) {
     return freeTextProperty(label, Datatypes.String);
   }
@@ -118,7 +132,7 @@ public class StaticProperties {
       setHorizontalRendering(staticProperty);
     }
 
-    if (sp.length > 0) {
+    if (sp.length > 1) {
       StaticPropertyGroup group = StaticProperties.group(label);
       group.setHorizontalRendering(true);
       group.setStaticProperties(Arrays.asList(sp));
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
index 0e66770..8b12102 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
@@ -94,7 +94,7 @@ public abstract class AbstractProcessingElementBuilder<BU extends
   }
 
   private List<MappingProperty> rewrite(List<MappingProperty> mappingProperties, int index) {
-    mappingProperties.stream().forEach(mp -> mp.setRequirementSelector
+    mappingProperties.forEach(mp -> mp.setRequirementSelector
             (getIndex(index) + PropertySelectorConstants.PROPERTY_DELIMITER + mp
                     .getRequirementSelector()));
     return mappingProperties;
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
index ab2c51d..15b0f47 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
@@ -33,10 +33,7 @@ import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.model.staticproperty.*;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesEntity> {
@@ -153,6 +150,7 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
     return collection
             .getMembers()
             .stream()
+            .sorted(Comparator.comparingInt(StaticProperty::getIndex))
             .map(sp -> (FreeTextStaticProperty) sp)
             .map(FreeTextStaticProperty::getValue)
             .map(v -> typeParser.parse(v, targetClass))
@@ -211,6 +209,17 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
     return getPropertySelectorFromUnaryMapping(staticPropertyName);
   }
 
+  public List<String> getUnaryMappingsFromCollection(String collectionStaticPropertyName) {
+    CollectionStaticProperty collection = getStaticPropertyByName(collectionStaticPropertyName, CollectionStaticProperty.class);
+    return collection
+            .getMembers()
+            .stream()
+            .sorted(Comparator.comparingInt(StaticProperty::getIndex))
+            .map(sp -> (MappingPropertyUnary) sp)
+            .map(MappingPropertyUnary::getSelectedProperty)
+            .collect(Collectors.toList());
+  }
+
   public List<String> mappingPropertyValues(String staticPropertyName) {
     return getPropertySelectorsFromNaryMapping(staticPropertyName);
   }
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/EpRequirements.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/EpRequirements.java
index 47b4fb9..7f633cb 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/EpRequirements.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/EpRequirements.java
@@ -33,6 +33,11 @@ public class EpRequirements {
     return new EventPropertyList();
   }
 
+  public static EventProperty withMappingPropertyId(String internalId, EventProperty requirement) {
+    requirement.setRuntimeName(internalId);
+    return requirement;
+  }
+
   public static EventPropertyList nestedListRequirement() {
     EventPropertyList listEp = new EventPropertyList();
     listEp.setEventProperty(new EventPropertyNested());
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/RequirementsSelector.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/RequirementsSelector.java
new file mode 100644
index 0000000..5292473
--- /dev/null
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/RequirementsSelector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.sdk.helpers;
+
+import org.apache.streampipes.model.constants.PropertySelectorConstants;
+
+public enum RequirementsSelector {
+
+  FIRST_INPUT_STREAM(PropertySelectorConstants.FIRST_REQUIREMENT_PREFIX),
+  SECOND_INPUT_STREAM(PropertySelectorConstants.SECOND_REQUIREMENT_PREFIX);
+
+  private String requirementPrefix;
+
+  RequirementsSelector(String prefix) {
+    this.requirementPrefix = prefix;
+  }
+
+  public String toSelector(String internalId) {
+    return requirementPrefix + PropertySelectorConstants.PROPERTY_DELIMITER + internalId;
+  }
+}
diff --git a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts
index fe4d118..d8b3636 100644
--- a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts
@@ -38,7 +38,6 @@ export class StaticCollectionComponent
     ngOnInit() {
     }
 
-
     emitUpdate(valid) {
         this.updateEmitter.emit(new ConfigurationInfo(this.staticProperty.internalName, valid));
     }
@@ -49,10 +48,18 @@ export class StaticCollectionComponent
         }
         let clone = this.staticPropertyUtil.clone(this.staticProperty.staticPropertyTemplate);
         this.staticProperty.members.push(clone);
+        this.updateIndex();
     }
 
     remove(i) {
         this.staticProperty.members.splice(i,1).slice(0);
+        this.updateIndex();
+    }
+
+    updateIndex() {
+        this.staticProperty.members.forEach((property, index) => {
+            property.index = index;
+        })
     }
 
     onStatusChange(status: any) {
diff --git a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.html b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.html
index 3075fcf..84bfde6 100644
--- a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.html
+++ b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.html
@@ -20,7 +20,7 @@
     <div fxFlex="100" fxLayout="row">
             <mat-form-field class="example-full-width">
                 <!--<mat-form-field class="example-full-width">-->
-                <mat-select formControlName="{{staticProperty.internalName}}" [placeholder]="staticProperty.label">
+                <mat-select formControlName="{{fieldName}}" [placeholder]="staticProperty.label">
                     <mat-option *ngFor="let property of availableProperties | displayRecommendedPipe: staticProperty.propertyScope: displayRecommended" [value]="property.propertySelector">
                         {{property.runtimeName}}
                     </mat-option>
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html
index ecd7b49..e86d68f 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.html
+++ b/ui/src/app/core-ui/static-properties/static-property.component.html
@@ -97,6 +97,8 @@
 
             <app-static-mapping-nary *ngIf="isMappingNaryProperty(staticProperty)"
                                      [eventSchemas]="eventSchemas"
+                                     [parentForm]="parentForm"
+                                     [fieldName]="fieldName"
                                      [staticProperty]="staticProperty"
                                      [displayRecommended]="displayRecommended"
                                      (inputEmitter)="valueChange($event)">
@@ -126,6 +128,7 @@
                                    [adapterId]="adapterId"
                                    [eventSchemas]="eventSchemas"
                                    [parentForm]="parentForm"
+                                   [displayRecommended]="displayRecommended"
                                    [staticProperty]="staticProperty" class="test fullWidth"
                                    (updateEmitter)="emitUpdate($event)">
             </app-static-collection>