You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/03 09:06:09 UTC

[incubator-streampipes-extensions] branch dev updated (85b24e1 -> 8c9a90e)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git.


    from 85b24e1  Merge pull request #22 from bossenti/extension_modbus_adapter
     add bc1752e  Working on state buffer
     add c913f4d  First simple version of state buffer
     add f9a4ed7  Merge branch 'dev' into STREAMPIPES-149
     add 7f7123a  Finish state buffer
     new 04f2068  [STREAMPIPES-149][STREAMPIPES-156] Add new processors for state processing
     new 8c9a90e  Merge branch 'STREAMPIPES-149' into dev

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../transformation/jvm/TransformationJvmInit.java  |   4 +
 .../booloperator/state/BooleanToState.java         |   7 +-
 .../state/BooleanToStateController.java            |  10 +-
 .../state/BooleanToStateParameters.java            |  12 ++-
 .../jvm/processor/state/buffer/StateBuffer.java    |  94 ++++++++++++++++
 .../buffer/StateBufferController.java}             |  66 ++++++------
 .../buffer/StateBufferParameters.java}             |  42 +++++---
 .../jvm/processor/state/labeler/StateLabeler.java  | 120 +++++++++++++++++++++
 .../state/labeler/StateLabelerController.java      |  99 +++++++++++++++++
 .../state/labeler/StateLabelerParameters.java      |  87 +++++++++++++++
 .../processor/state/labeler/model/Statement.java   | 104 ++++++++++++++++++
 .../documentation.md                               |   4 +-
 .../icon.png                                       | Bin 13037 -> 12518 bytes
 .../strings.en                                     |   8 +-
 .../documentation.md                               |  24 ++---
 .../icon.png                                       | Bin 0 -> 9466 bytes
 .../strings.en                                     |  18 ++++
 .../documentation.md                               |  63 +++++++++++
 .../icon.png                                       | Bin 0 -> 9118 bytes
 .../strings.en                                     |  23 ++++
 20 files changed, 718 insertions(+), 67 deletions(-)
 create mode 100644 streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
 copy streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/{stringoperator/timer/StringTimerController.java => state/buffer/StateBufferController.java} (50%)
 copy streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/{array/split/SplitArrayParameters.java => state/buffer/StateBufferParameters.java} (53%)
 create mode 100644 streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java
 create mode 100644 streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
 create mode 100644 streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
 create mode 100644 streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
 copy {streampipes-processors-enricher-flink/src/main/resources/org.apache.streampipes.processors.enricher.flink.processor.math.mathop => streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer}/documentation.md (64%)
 create mode 100644 streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png
 create mode 100644 streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/strings.en
 create mode 100644 streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
 create mode 100644 streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png
 create mode 100644 streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en


[incubator-streampipes-extensions] 01/02: [STREAMPIPES-149][STREAMPIPES-156] Add new processors for state processing

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 04f2068330c64f34d70faa75c31efdcabf8ba97d
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Jun 3 11:04:15 2020 +0200

    [STREAMPIPES-149][STREAMPIPES-156] Add new processors for state
    processing
---
 .../transformation/jvm/TransformationJvmInit.java  |   2 +
 .../booloperator/state/BooleanToState.java         |   7 +-
 .../state/BooleanToStateController.java            |  11 +-
 .../state/BooleanToStateParameters.java            |  12 ++-
 .../jvm/processor/state/buffer/StateBuffer.java    |   2 +-
 .../state/buffer/StateBufferController.java        |   7 +-
 .../jvm/processor/state/labeler/StateLabeler.java  | 120 +++++++++++++++++++++
 .../state/labeler/StateLabelerController.java      |  99 +++++++++++++++++
 .../state/labeler/StateLabelerParameters.java      |  87 +++++++++++++++
 .../processor/state/labeler/model/Statement.java   | 104 ++++++++++++++++++
 .../documentation.md                               |   4 +-
 .../icon.png                                       | Bin 13037 -> 12518 bytes
 .../strings.en                                     |   8 +-
 .../documentation.md                               |  24 ++---
 .../icon.png                                       | Bin 13154 -> 9466 bytes
 .../documentation.md                               |  63 +++++++++++
 .../icon.png                                       | Bin 0 -> 9118 bytes
 .../strings.en                                     |  23 ++++
 18 files changed, 542 insertions(+), 31 deletions(-)

diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 40b84dc..8512165 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -36,6 +36,7 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperat
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentController;
 import org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.StateBufferController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.StateLabelerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
 import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
@@ -65,6 +66,7 @@ public class TransformationJvmInit extends StandaloneModelSubmitter {
             .add(new StringTimerController())
             .add(new BooleanToStateController())
             .add(new StateBufferController())
+            .add(new StateLabelerController())
             .add(new StringCounterController());
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToState.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToState.java
index 9f4207f..a714e15 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToState.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToState.java
@@ -32,6 +32,7 @@ public class BooleanToState implements EventProcessor<BooleanToStateParameters>
   private static Logger LOG;
 
   private List<String> stateFields;
+  private String  defaultState;
 
   @Override
   public void onInvocation(BooleanToStateParameters booleanInverterParameters,
@@ -39,6 +40,7 @@ public class BooleanToState implements EventProcessor<BooleanToStateParameters>
                            EventProcessorRuntimeContext runtimeContext) {
     LOG = booleanInverterParameters.getGraph().getLogger(BooleanToState.class);
     this.stateFields = booleanInverterParameters.getStateFields();
+    this.defaultState = booleanInverterParameters.getDefaultState();
   }
 
   @Override
@@ -51,7 +53,10 @@ public class BooleanToState implements EventProcessor<BooleanToStateParameters>
       }
     }
 
-    inputEvent.addField(BooleanToStateController.RESULT_RUNTIME_NAME, states.toArray());
+    if (states.size() == 0) {
+      states.add(this.defaultState);
+    }
+    inputEvent.addField(BooleanToStateController.CURRENT_STATE, states.toArray());
     out.collect(inputEvent);
   }
 
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateController.java
index acfe071..8112731 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateController.java
@@ -40,10 +40,10 @@ public class BooleanToStateController extends StandaloneEventProcessingDeclarer<
   public static final String GROUP_ID = "group-id";
   public static final String STATE_NAME_ID = "state-name-id";
   public static final String STATE_MAPPING_ID = "state-mapping-id";
-  public static final String STATE_RESULT_FIELD_ID = "state-result-field";
   public static final String BOOLEAN_STATE_FIELD = "boolean_state_field";
+  public static final String DEFAULT_STATE_ID = "default-state-id";
 
-  public static final String RESULT_RUNTIME_NAME = "current_state";
+  public static final String CURRENT_STATE = "current_state";
 
 
   @Override
@@ -61,9 +61,9 @@ public class BooleanToStateController extends StandaloneEventProcessingDeclarer<
 //                            Labels.withId(GROUP_ID),
 //                            StaticProperties.stringFreeTextProperty(Labels.withId(STATE_NAME_ID)),
 //                            StaticProperties.mappingPropertyUnary(Labels.withId(STATE_MAPPING_ID), PropertyScope.NONE)))
+            .requiredTextParameter(Labels.withId(DEFAULT_STATE_ID))
             .outputStrategy(OutputStrategies.append(
-                    EpProperties.listStringEp(Labels.withId(STATE_RESULT_FIELD_ID), RESULT_RUNTIME_NAME, SPSensor.STATE)
-//                    EpProperties.stringEp(Labels.withId(STATE_RESULT_FIELD_ID), RESULT_RUNTIME_NAME, SPSensor.STATE)
+                    EpProperties.listStringEp(Labels.withId(CURRENT_STATE), CURRENT_STATE, SPSensor.STATE)
             ))
             .build();
   }
@@ -72,8 +72,9 @@ public class BooleanToStateController extends StandaloneEventProcessingDeclarer<
   public ConfiguredEventProcessor<BooleanToStateParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
 
     List<String> stateFields = extractor.mappingPropertyValues(BOOLEAN_STATE_FIELD);
+    String defaultState = extractor.singleValueParameter(DEFAULT_STATE_ID, String.class);
 
-    BooleanToStateParameters params = new BooleanToStateParameters(graph, stateFields);
+    BooleanToStateParameters params = new BooleanToStateParameters(graph, stateFields, defaultState);
 
     return new ConfiguredEventProcessor<>(params, BooleanToState::new);
   }
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateParameters.java
index 63209e6..4ff3a0a 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateParameters.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateParameters.java
@@ -25,10 +25,12 @@ import java.util.List;
 
 public class BooleanToStateParameters extends EventProcessorBindingParams {
     private List<String> stateFields;
+    private String defaultState;
 
-    public BooleanToStateParameters(DataProcessorInvocation graph, List<String> stateFields) {
+    public BooleanToStateParameters(DataProcessorInvocation graph, List<String> stateFields, String defaultState) {
         super(graph);
         this.stateFields = stateFields;
+        this.defaultState = defaultState;
     }
 
     public List<String> getStateFields() {
@@ -38,4 +40,12 @@ public class BooleanToStateParameters extends EventProcessorBindingParams {
     public void setStateFields(List<String> stateFields) {
         this.stateFields = stateFields;
     }
+
+    public String getDefaultState() {
+        return defaultState;
+    }
+
+    public void setDefaultState(String defaultState) {
+        this.defaultState = defaultState;
+    }
 }
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
index b398289..05dfe06 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
@@ -76,7 +76,7 @@ public class StateBuffer implements EventProcessor<StateBufferParameters> {
       if (!states.contains(key)) {
           Event resultEvent  = new Event();
           resultEvent.addField(StateBufferController.VALUES, stateBuffer.get(key));
-          resultEvent.addField(StateBufferController.STATE, key);
+          resultEvent.addField(StateBufferController.STATE, Arrays.asList(key));
           resultEvent.addField(StateBufferController.TIMESTAMP, timestamp);
         out.collect(resultEvent);
           keysToRemove.add(key);
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
index bca42ee..3ea3583 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
@@ -44,9 +44,6 @@ public class StateBufferController extends StandaloneEventProcessingDeclarer<Sta
   public static final String TIMESTAMP = "timestamp";
 
 
-  public static final String RESULT_RUNTIME_NAME = "current-state";
-  public static final String RESULT_STATE_FIELD_ID = "result-state";
-
 
   @Override
   public DataProcessorDescription declareModel() {
@@ -70,8 +67,8 @@ public class StateBufferController extends StandaloneEventProcessingDeclarer<Sta
             )
             .outputStrategy(OutputStrategies.fixed(
                     EpProperties.timestampProperty(TIMESTAMP),
-                    EpProperties.listDoubleEp(Labels.withId(VALUES), RESULT_RUNTIME_NAME, SO.Number),
-                    EpProperties.stringEp(Labels.withId(STATE), RESULT_STATE_FIELD_ID, SPSensor.STATE)
+                    EpProperties.listDoubleEp(Labels.withId(VALUES), VALUES, SO.Number),
+                    EpProperties.listStringEp(Labels.withId(STATE), STATE, SPSensor.STATE)
             ))
             .build();
   }
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java
new file mode 100644
index 0000000..3bdc3c7
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+
+import com.google.common.math.Stats;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.Statement;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StateLabeler implements EventProcessor<StateLabelerParameters> {
+
+  private static Logger LOG;
+  private String sensorListValueProperty;
+  private String stateProperty;
+  private String stateFilter;
+  private String selectedOperation;
+  private List<Statement> statements;
+
+  @Override
+  public void onInvocation(StateLabelerParameters stateBufferParameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+    LOG = stateBufferParameters.getGraph().getLogger(StateLabeler.class);
+
+    this.sensorListValueProperty = stateBufferParameters.getSensorListValueProperty();
+    this.stateProperty = stateBufferParameters.getStateProperty();
+    this.stateFilter = stateBufferParameters.getStateFilter();
+    this.selectedOperation = stateBufferParameters.getSelectedOperation();
+
+    this.statements = new ArrayList<>();
+
+    for (String s : stateBufferParameters.getStatementsStrings()) {
+      Statement statement = Statement.getStatement(s);
+      if (statement == null) {
+        throw new SpRuntimeException("Statement: " + s + " is not correctly formatted");
+      }
+      this.statements.add(statement);
+    }
+    Collections.reverse(this.statements);
+
+  }
+
+  @Override
+  public void onEvent(Event inputEvent, SpOutputCollector out) {
+
+    List<Double> values = inputEvent.getFieldBySelector(this.sensorListValueProperty).getAsList().parseAsSimpleType(Double.class);
+    List<String> states = inputEvent.getFieldBySelector(this.stateProperty).getAsList().parseAsSimpleType(String.class);
+
+    if (states.contains(this.stateFilter) || this.stateFilter.equals("*"))  {
+      double calculatedValue;
+
+      if (StateLabelerController.MAXIMUM.equals(this.selectedOperation)) {
+        calculatedValue = Stats.of(values).max();
+      } else if (StateLabelerController.MINIMUM.equals(this.selectedOperation)) {
+        calculatedValue = Stats.of(values).min();
+      } else {
+        calculatedValue = Stats.of(values).mean();
+      }
+
+      String label = getLabel(calculatedValue);
+      if (label != null) {
+        inputEvent.addField(StateLabelerController.LABEL, label);
+        out.collect(inputEvent);
+      } else {
+        LOG.info("No condition of statements was fulfilled, add a default case (*) to the statements");
+      }
+
+    }
+  }
+
+  @Override
+  public void onDetach() {
+  }
+
+  private String getLabel(double calculatedValue) {
+    for (Statement statement : this.statements) {
+      if (condition(statement, calculatedValue)) {
+        return statement.getLabel();
+      }
+    }
+    return null;
+  }
+
+  private boolean condition(Statement statement, double calculatedValue) {
+    if (">".equals(statement.getOperator())) {
+      return calculatedValue > statement.getValue();
+    } else if ("<".equals(statement.getOperator())) {
+      return calculatedValue < statement.getValue();
+    } else if ("=".equals(statement.getOperator())) {
+      return calculatedValue == statement.getValue();
+    } else {
+      return true;
+    }
+
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
new file mode 100644
index 0000000..f367e61
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SPSensor;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.util.List;
+
+public class StateLabelerController extends StandaloneEventProcessingDeclarer<StateLabelerParameters> {
+
+  public static final String STATE_FILTER_ID = "stateFilterId";
+  public static final String STATE_FIELD_ID = "stateFieldId";
+  public static final String OPERATIONS_ID = "operationsId";
+  public static final String SENSOR_VALUE_ID = "sensorValueId";
+  public static final String LABEL_COLLECTION_ID = "labelCollectionId";
+  public static final String LABEL_STRING_ID = "labelStringId";
+
+  public static final String LABEL = "label";
+
+  public static final String MINIMUM = "minimum";
+  public static final String MAXIMUM = "maximum";
+  public static final String AVERAGE = "average";
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler")
+            .withLocales(Locales.EN)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                    .requiredPropertyWithUnaryMapping(
+                            EpRequirements.listRequirement(),
+                            Labels.withId(SENSOR_VALUE_ID),
+                            PropertyScope.NONE)
+                    .requiredPropertyWithUnaryMapping(
+                            EpRequirements.domainPropertyReqList(SPSensor.STATE),
+                            Labels.withId(STATE_FIELD_ID),
+                            PropertyScope.NONE)
+                    .build())
+            .requiredTextParameter(Labels.withId(STATE_FILTER_ID))
+            .requiredSingleValueSelection(Labels.withId(OPERATIONS_ID),
+                    Options.from(MINIMUM, MAXIMUM, AVERAGE))
+            .requiredParameterAsCollection(
+                    Labels.withId(LABEL_COLLECTION_ID),
+                      StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID)))
+
+//            StaticProperties.collection(Labels.withId(PLC_NODES),
+//                StaticProperties.stringFreeTextProperty(Labels.withId(PLC_NODE_RUNTIME_NAME)),
+//                StaticProperties.stringFreeTextProperty(Labels.withId(PLC_NODE_NAME)),
+//                StaticProperties.singleValueSelection(Labels.withId(PLC_NODE_TYPE),
+//                        Options.from("Bool",  "Byte", "Int", "Word", "Real"))))
+
+            .outputStrategy(OutputStrategies.append(
+                    EpProperties.stringEp(Labels.withId(LABEL), LABEL, SPSensor.STATE)
+            ))
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<StateLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+
+    String sensorListValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_ID);
+    String stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
+    String stateFilter = extractor.singleValueParameter(STATE_FILTER_ID, String.class);
+    String selectedOperation = extractor.selectedSingleValue(OPERATIONS_ID, String.class);
+
+    List<String> statementStrings = extractor.singleValueParameterFromCollection(LABEL_COLLECTION_ID, String.class);
+
+    StateLabelerParameters params = new StateLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, statementStrings);
+
+    return new ConfiguredEventProcessor<>(params, StateLabeler::new);
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
new file mode 100644
index 0000000..885a722
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+import java.util.List;
+
+public class StateLabelerParameters extends EventProcessorBindingParams {
+
+    private String sensorListValueProperty;
+    private String stateProperty;
+    private String stateFilter;
+    private String selectedOperation;
+    private List<String> statementsStrings;
+
+    public StateLabelerParameters(DataProcessorInvocation graph,
+                                  String sensorListValueProperty,
+                                  String stateProperty,
+                                  String stateFilter,
+                                  String selectedOperation,
+                                  List<String> statementsStrings) {
+        super(graph);
+        this.sensorListValueProperty = sensorListValueProperty;
+        this.stateProperty = stateProperty;
+        this.stateFilter = stateFilter;
+        this.selectedOperation = selectedOperation;
+        this.statementsStrings = statementsStrings;
+    }
+
+    public String getSensorListValueProperty() {
+        return sensorListValueProperty;
+    }
+
+    public void setSensorListValueProperty(String sensorListValueProperty) {
+        this.sensorListValueProperty = sensorListValueProperty;
+    }
+
+    public String getStateFilter() {
+        return stateFilter;
+    }
+
+    public void setStateFilter(String stateFilter) {
+        this.stateFilter = stateFilter;
+    }
+
+    public String getSelectedOperation() {
+        return selectedOperation;
+    }
+
+    public void setSelectedOperation(String selectedOperation) {
+        this.selectedOperation = selectedOperation;
+    }
+
+    public List<String> getStatementsStrings() {
+        return statementsStrings;
+    }
+
+    public void setStatementsStrings(List<String> statementsStrings) {
+        this.statementsStrings = statementsStrings;
+    }
+
+    public String getStateProperty() {
+        return stateProperty;
+    }
+
+    public void setStateProperty(String stateProperty) {
+        this.stateProperty = stateProperty;
+    }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
new file mode 100644
index 0000000..42ae687
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
@@ -0,0 +1,104 @@
+/*
+Copyright 2020 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model;
+
+public class Statement {
+    private String operator;
+    private double value;
+    private String label;
+
+    public Statement() {
+    }
+
+    /**
+     * This method checks if the user input is correct. When not null is returned
+     * @param s
+     * @return
+     */
+    public static Statement getStatement(String s) {
+        Statement result = new Statement();
+
+        String[] parts  = s.split(";");
+        // default case
+        if (parts.length == 2) {
+            if (parts[0].equals("*")) {
+                result.setOperator(parts[0]);
+                result.setLabel(parts[1]);
+                return result;
+            } else {
+                return null;
+            }
+        }
+
+        // all other valid cases
+        if (parts.length ==  3) {
+
+            if (parts[0].equals(">") || parts[0].equals("<") || parts[0].equals("=")) {
+                result.setOperator(parts[0]);
+            } else {
+                return null;
+            }
+
+            if (isNumeric(parts[1].replaceAll("-", ""))) {
+                result.setValue(Double.parseDouble(parts[1]));
+            } else {
+                return null;
+            }
+
+            result.setLabel(parts[2]);
+
+            return result;
+        } else {
+            return null;
+        }
+    }
+
+    private static boolean isNumeric(final String str) {
+
+        // null or empty
+        if (str == null || str.length() == 0) {
+            return false;
+        }
+
+        return str.chars().allMatch(Character::isDigit);
+
+    }
+
+    public String getOperator() {
+        return operator;
+    }
+
+    public void setOperator(String operator) {
+        this.operator = operator;
+    }
+
+    public double getValue() {
+        return value;
+    }
+
+    public void setValue(double value) {
+        this.value = value;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void setLabel(String label) {
+        this.label = label;
+    }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/documentation.md
index 6328e29..f08fca7 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/documentation.md
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/documentation.md
@@ -26,7 +26,7 @@
 
 ## Description
 
-With this processors boolean values can be converted to a state field representing the current state of the system.
+Convert boolean fields to a state string representing the current state of the system.
 This processor requires one or multiple boolean values in the data stream.
 For each of the selected values which are true, the runtime name is added to the states field.
 ***
@@ -36,6 +36,8 @@ For each of the selected values which are true, the runtime name is added to the
 ### Boolean Fields
 Boolean fields that are converted to the state when true
 
+### Default State
+When all boolean values are false, a default  state can be defined
 ***
 
 ## Configuration
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/icon.png
index bed5d6b..1541c54 100644
Binary files a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/icon.png and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/strings.en
index 82ee552..b5ebeeb 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/strings.en
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state/strings.en
@@ -16,8 +16,8 @@ state-name-id.description=Enter the name of the state
 state-mapping-id.title=State Mapping
 state-mapping-id.description=When boolean variable is true, the state is set to active
 
-state-result-field.title=Result Field
-state-result-field.description=Test
+default-state-id.title=Default State
+default-state-id.description=Define a default state, when all values are false
 
-boolean_state_field.title=Result Field
-boolean_state_field.description=Test
+current_state.title=Current State
+boolean_state_field.description=
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md
index 0f648de..ef2099b 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## Boolean Timer
+## State Buffer
 
 <p align="center"> 
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,25 +26,23 @@
 
 ## Description
 
-This processor measures how long a boolean value does not change. Once the value is changes the event with the measured time is emitted.
-
+Buffers values of a sensor, while state does not change.
+Select a state field in the event. Events are buffered as long as state field does not change. When it changes result event is emitted.
 
 ***
 
 ## Required input
 
-A boolean value is required in the data stream.
-
-### Field
+Define the state and sensor value field
 
-The boolean field which is monitored for state changes.
-
-***
+### Timestamp
+A mapping  property for a timestamp field
 
-## Configuration
+### State
+Select the field representing the state 
 
-### Timer value
-Define whether it should be measured how long the value is true or how long the value is false.
+### Sensor value to cache
+Select the field with the numerical values to buffer
 
 ## Output
-Appends a field with the time how long the value did not change. Is emitted on the change of the boolean value. Runtime name: measured_time 
+Emits a new event on state change, with the fields `timestamp`, `state`, and a list containing all `sensor values`.
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png
index fc5e913..6ebf5ea 100644
Binary files a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
new file mode 100644
index 0000000..ffe7545
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
@@ -0,0 +1,63 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## State Labeler
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Apply a rule to a time-series recorded during a state of a machine. (E.g. when minimum value is lower then 10, add label `not ok` else add label `ok`)
+
+
+***
+
+## Required input
+
+Requires a list with sensor values and a field defining the state
+
+### Sensor values
+
+An array representing sensor values recorded during the state.
+
+### State field
+
+A field representing the state when the sensor values where recorded.
+
+***
+
+## Configuration
+
+### Select a specific state
+When you are interested in the values of a specific state add it here. All other states will be ignored. To get results of all states enter `*`
+
+### Operation
+Operation that will be performed on the sensor values (calculate `maximim`, or `average`, or `minimum`) 
+
+### Condition
+Define a rule which label to add. Example: `<;5;nok` means when the calculated value is smaller then 5 add label ok.
+The default label can be defined with `*;nok`.
+The first rule that is true defines the label. Rules are applied in the same order as defined here.
+
+
+## Output
+Appends a new field  with the label defined in the Condition Configuration
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png
new file mode 100644
index 0000000..b323e75
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en
new file mode 100644
index 0000000..5b77578
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en
@@ -0,0 +1,23 @@
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.title=State Labeler
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.description=Adds a label based on a  user defined rule to a sensor time-series
+
+stateFilterId.title=Select a specific state
+stateFilterId.description=Add a filter to define which states to evaluate. Add '*' to select all states.
+
+stateFieldId.title=State field
+stateFieldId.description=Select the field containing the state of the event
+
+operationsId.title=Operation
+operationsId.description=Define the operation that should be applied on sensor values
+
+sensorValueId.title=Sensor values
+sensorValueId.description=The array containing the sensor values to evaluate
+
+labelCollectionId.title=Condition
+labelCollectionId.description=Add a condition with the following scheme (Order is important) '<;5;ok', '<;10;ok' or '*;nok' as a default label
+
+label.title=Label
+label.description=User defined label for data
+
+labelStringId.title=Condition
+labelStringId.description=Add a condition with the following scheme (Order is important) '<;5;ok', '<;10;ok' or '*;nok' as a default label


[incubator-streampipes-extensions] 02/02: Merge branch 'STREAMPIPES-149' into dev

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 8c9a90ea03af676e862195c2cfc0f7bbc44e8c94
Merge: 85b24e1 04f2068
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Jun 3 11:05:53 2020 +0200

    Merge branch 'STREAMPIPES-149' into dev

 .../transformation/jvm/TransformationJvmInit.java  |   4 +
 .../booloperator/state/BooleanToState.java         |   7 +-
 .../state/BooleanToStateController.java            |  10 +-
 .../state/BooleanToStateParameters.java            |  12 ++-
 .../jvm/processor/state/buffer/StateBuffer.java    |  94 ++++++++++++++++
 .../state/buffer/StateBufferController.java        |  87 +++++++++++++++
 .../buffer/StateBufferParameters.java}             |  38 +++++--
 .../jvm/processor/state/labeler/StateLabeler.java  | 120 +++++++++++++++++++++
 .../state/labeler/StateLabelerController.java      |  99 +++++++++++++++++
 .../state/labeler/StateLabelerParameters.java      |  87 +++++++++++++++
 .../processor/state/labeler/model/Statement.java   | 104 ++++++++++++++++++
 .../documentation.md                               |   4 +-
 .../icon.png                                       | Bin 13037 -> 12518 bytes
 .../strings.en                                     |   8 +-
 .../documentation.md                               |  23 ++--
 .../icon.png                                       | Bin 0 -> 9466 bytes
 .../strings.en                                     |  18 ++++
 .../documentation.md                               |  63 +++++++++++
 .../icon.png                                       | Bin 0 -> 9118 bytes
 .../strings.en                                     |  23 ++++
 20 files changed, 771 insertions(+), 30 deletions(-)