You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/09 09:42:17 UTC

[incubator-streampipes-extensions] branch dev updated: [STREAMPIPES-159] Processor: Detect Signal Edge

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 416201e  [STREAMPIPES-159] Processor: Detect  Signal Edge
416201e is described below

commit 416201e0bb33b54308d125e33543289daa03e87b
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Jun 9 11:41:38 2020 +0200

    [STREAMPIPES-159] Processor: Detect  Signal Edge
---
 .../transformation/jvm/TransformationJvmInit.java  |   2 +
 .../booloperator/edge/SignalEdgeFilter.java        | 123 +++++++++++++++++++++
 .../edge/SignalEdgeFilterController.java           |  77 +++++++++++++
 .../edge/SignalEdgeFilterParameters.java           |  72 ++++++++++++
 .../documentation.md                               |  51 +++++++++
 .../icon.png                                       | Bin 0 -> 7919 bytes
 .../strings.en                                     |  15 +++
 7 files changed, 340 insertions(+)

diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 8512165..792468a 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.processors.transformation.jvm.config.Transformatio
 import org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayController;
 import org.apache.streampipes.processors.transformation.jvm.processor.array.split.SplitArrayController;
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.counter.BooleanCounterController;
+import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge.SignalEdgeFilterController;
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.inverter.BooleanInverterController;
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state.BooleanToStateController;
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping.BooleanTimekeepingController;
@@ -64,6 +65,7 @@ public class TransformationJvmInit extends StandaloneModelSubmitter {
             .add(new BooleanInverterController())
             .add(new TransformToBooleanController())
             .add(new StringTimerController())
+            .add(new SignalEdgeFilterController())
             .add(new BooleanToStateController())
             .add(new StateBufferController())
             .add(new StateLabelerController())
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java
new file mode 100644
index 0000000..ed38c8a
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
+
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SignalEdgeFilter implements EventProcessor<SignalEdgeFilterParameters> {
+
+  private static Logger LOG;
+
+  private String booleanSignalField;
+  private String flank;
+  private Integer delay;
+  private String eventSelection;
+
+  private boolean lastValue;
+  private int delayCount;
+  private List<Event> resultEvents;
+  private boolean edgeDetected;
+
+  @Override
+  public void onInvocation(SignalEdgeFilterParameters booleanInverterParameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) {
+    LOG = booleanInverterParameters.getGraph().getLogger(SignalEdgeFilter.class);
+    this.booleanSignalField = booleanInverterParameters.getBooleanSignalField();
+    this.flank = booleanInverterParameters.getFlank();
+    this.delay = booleanInverterParameters.getDelay();
+    this.eventSelection = booleanInverterParameters.getEventSelection();
+
+    this.lastValue = false;
+    this.delayCount = 0;
+    this.resultEvents = new ArrayList<>();
+    this.edgeDetected = false;
+  }
+
+  @Override
+  public void onEvent(Event inputEvent, SpOutputCollector out) {
+
+    boolean value = inputEvent.getFieldBySelector(this.booleanSignalField).getAsPrimitive().getAsBoolean();
+
+    // Detect edges in signal
+    if (detectEdge(value, lastValue)) {
+      this.edgeDetected = true;
+      this.resultEvents = new ArrayList<>();
+      this.delayCount = 0;
+    }
+
+    if (edgeDetected) {
+        // Buffer event(s) according to user configuration
+        addResultEvent(inputEvent);
+
+      // Detect if the delay has been waited for
+      if (this.delay == delayCount) {
+        for (Event event : this.resultEvents){
+          out.collect(event);
+        }
+
+        this.edgeDetected = false;
+
+      } else {
+        this.delayCount++;
+      }
+
+
+    }
+
+    this.lastValue = value;
+  }
+
+  @Override
+  public void onDetach() {
+  }
+
+  private boolean detectEdge(boolean value, boolean lastValue) {
+    if (this.flank.equals(SignalEdgeFilterController.FLANK_UP)) {
+      return lastValue == false && value == true;
+    } else if (this.flank.equals(SignalEdgeFilterController.FLANK_DOWN)) {
+      return lastValue == true && value == false;
+    } else if (this.flank.equals(SignalEdgeFilterController.BOTH))  {
+        return value != lastValue;
+    }
+
+    return false;
+  }
+
+  private void addResultEvent(Event event) {
+    if (this.eventSelection.equals(SignalEdgeFilterController.OPTION_FIRST)) {
+      if (this.resultEvents.size() == 0) {
+        this.resultEvents.add(event);
+      }
+    } else if (this.eventSelection.equals(SignalEdgeFilterController.OPTION_LAST)) {
+        this.resultEvents = new ArrayList<>();
+        this.resultEvents.add(event);
+    } else if (this.eventSelection.equals(SignalEdgeFilterController.OPTION_ALL)) {
+      this.resultEvents.add(event);
+    }
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java
new file mode 100644
index 0000000..1b5a130
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
+
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SPSensor;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.util.List;
+
+public class SignalEdgeFilterController extends StandaloneEventProcessingDeclarer<SignalEdgeFilterParameters> {
+
+  public static final String BOOLEAN_SIGNAL_FIELD = "boolean_signal_field";
+  public static final String FLANK_ID = "flank";
+  public static final String DELAY_ID = "delay";
+  private static final String EVENT_SELECTION_ID = "event-selection-id";
+
+  public static final String FLANK_UP = "FALSE -> TRUE";
+  public static final String FLANK_DOWN = "TRUE -> FALSE";
+  public static final String BOTH = "BOTH";
+  public static final String OPTION_FIRST = "First";
+  public static final String OPTION_LAST = "Last";
+  public static final String OPTION_ALL = "All";
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge")
+            .withLocales(Locales.EN)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                    .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(), Labels.withId(BOOLEAN_SIGNAL_FIELD), PropertyScope.NONE)
+                    .build())
+            .requiredSingleValueSelection(Labels.withId(FLANK_ID), Options.from(BOTH, FLANK_UP, FLANK_DOWN))
+            .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
+            .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
+                    Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
+            .outputStrategy(OutputStrategies.keep())
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<SignalEdgeFilterParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+
+    String booleanSignalField = extractor.mappingPropertyValue(BOOLEAN_SIGNAL_FIELD);
+    String flank = extractor.selectedSingleValue(FLANK_ID, String.class);
+    Integer delay = extractor.singleValueParameter(DELAY_ID, Integer.class);
+    String eventSelection = extractor.selectedSingleValue(EVENT_SELECTION_ID, String.class);
+
+    SignalEdgeFilterParameters params = new SignalEdgeFilterParameters(graph, booleanSignalField, flank, delay, eventSelection);
+
+    return new ConfiguredEventProcessor<>(params, SignalEdgeFilter::new);
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java
new file mode 100644
index 0000000..4db913d
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.eclipse.rdf4j.query.algebra.In;
+
+import java.util.List;
+
+public class SignalEdgeFilterParameters extends EventProcessorBindingParams {
+    private String booleanSignalField;
+    private String flank;
+    private Integer delay;
+    private String eventSelection;
+
+    public SignalEdgeFilterParameters(DataProcessorInvocation graph, String booleanSignalField, String flank, Integer delay, String eventSelection) {
+        super(graph);
+        this.booleanSignalField = booleanSignalField;
+        this.flank = flank;
+        this.delay = delay;
+        this.eventSelection = eventSelection;
+    }
+
+    public String getBooleanSignalField() {
+        return booleanSignalField;
+    }
+
+    public void setBooleanSignalField(String booleanSignalField) {
+        this.booleanSignalField = booleanSignalField;
+    }
+
+    public String getFlank() {
+        return flank;
+    }
+
+    public void setFlank(String flank) {
+        this.flank = flank;
+    }
+
+    public Integer getDelay() {
+        return delay;
+    }
+
+    public void setDelay(Integer delay) {
+        this.delay = delay;
+    }
+
+    public String getEventSelection() {
+        return eventSelection;
+    }
+
+    public void setEventSelection(String eventSelection) {
+        this.eventSelection = eventSelection;
+    }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/documentation.md
new file mode 100644
index 0000000..433f25d
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/documentation.md
@@ -0,0 +1,51 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+##  Signal Edge Filter
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Observes a boolean value and forwards the event when a signal edge is detected
+
+***
+
+## Required input
+
+### Boolean Field
+Boolean field that is observed
+
+***
+
+## Configuration
+### Kind of edge
+* Detect rising edges 
+* Detect falling edges 
+* Detect both
+    
+### Delay
+Defines for how many events the signal must be stable before result is emitted.
+(E.g. if set to 2, the result is not emitted if value toggles between true and false, it fires when two consecutive events are detected after the flank)
+
+## Output
+Emits input event, when the signal edge is detected
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/icon.png
new file mode 100644
index 0000000..902e8da
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/strings.en
new file mode 100644
index 0000000..c7a1bc8
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge/strings.en
@@ -0,0 +1,15 @@
+org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge.title=Signal Edge Filter
+org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge.description=Forwards the event when a signal edge is detected
+
+boolean_state_field.title=Boolean Signal
+boolean_state_field.description=The signal to observe and detect the signal edge
+
+flank.title=Signal Edge
+flank.description=What kind of signal edge should be detected
+
+delay.title=Delay
+delay.description=How many events should be waited to forward the result?
+
+event-selection-id.title=Output Event Selection
+event-selection-id.description=This specifies the event(s) that are selected to be emitted.
+