You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/05/29 21:21:48 UTC

[incubator-streampipes-extensions] 02/02: First simple version of state buffer

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-149
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit c913f4d25b2f494d28f8f04fac0db0b3d9383bfa
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Fri May 29 23:21:15 2020 +0200

    First simple version of state buffer
---
 .../transformation/jvm/TransformationJvmInit.java  |  2 ++
 .../state/BooleanToStateController.java            |  1 +
 .../jvm/processor/state/buffer/StateBuffer.java    | 36 +++++++++++++++++++---
 3 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index f0590b7..40b84dc 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -35,6 +35,7 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperat
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping.BooleanTimekeepingController;
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.StateBufferController;
 import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
 import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
@@ -63,6 +64,7 @@ public class TransformationJvmInit extends StandaloneModelSubmitter {
             .add(new TransformToBooleanController())
             .add(new StringTimerController())
             .add(new BooleanToStateController())
+            .add(new StateBufferController())
             .add(new StringCounterController());
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateController.java
index 2fd42e5..acfe071 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateController.java
@@ -63,6 +63,7 @@ public class BooleanToStateController extends StandaloneEventProcessingDeclarer<
 //                            StaticProperties.mappingPropertyUnary(Labels.withId(STATE_MAPPING_ID), PropertyScope.NONE)))
             .outputStrategy(OutputStrategies.append(
                     EpProperties.listStringEp(Labels.withId(STATE_RESULT_FIELD_ID), RESULT_RUNTIME_NAME, SPSensor.STATE)
+//                    EpProperties.stringEp(Labels.withId(STATE_RESULT_FIELD_ID), RESULT_RUNTIME_NAME, SPSensor.STATE)
             ))
             .build();
   }
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
index 3849eed..c0789d6 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
@@ -20,12 +20,16 @@ package org.apache.streampipes.processors.transformation.jvm.processor.state.buf
 
 import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.AbstractField;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.vocabulary.SPSensor;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 public class StateBuffer implements EventProcessor<StateBufferParameters> {
 
@@ -34,6 +38,7 @@ public class StateBuffer implements EventProcessor<StateBufferParameters> {
   private String stateProperty;
   private String sensorValueProperty;
 
+  private Map<String, List> stateBuffer;
 
   @Override
   public void onInvocation(StateBufferParameters stateBufferParameters,
@@ -44,13 +49,36 @@ public class StateBuffer implements EventProcessor<StateBufferParameters> {
     this.timeProperty = stateBufferParameters.getTimeProperty();
     this.stateProperty = stateBufferParameters.getStateProperty();
     this.sensorValueProperty = stateBufferParameters.getSensorValueProperty();
+    this.stateBuffer = new HashMap<>();
   }
 
   @Override
   public void onEvent(Event inputEvent, SpOutputCollector out) {
-      // TODO
 
-    out.collect(inputEvent);
+    long timestamp = inputEvent.getFieldBySelector(this.timeProperty).getAsPrimitive().getAsLong();
+    List<String> states = inputEvent.getFieldBySelector(this.stateProperty).getAsList().parseAsSimpleType(String.class);
+//    List<String> states = Arrays.asList(inputEvent.getFieldBySelector(this.stateProperty).getAsPrimitive().getAsString());
+    double value = inputEvent.getFieldBySelector(this.sensorValueProperty).getAsPrimitive().getAsDouble();
+
+    // add value to state buffer
+    for (String state : states) {
+      if (stateBuffer.containsKey(state)) {
+        stateBuffer.get(state).add(value);
+      } else {
+        stateBuffer.put(state, Arrays.asList(value));
+      }
+    }
+
+    // emit event if state is not in event anymore
+    for (String key : stateBuffer.keySet()) {
+      if (!states.contains(key)) {
+          Event resultEvent  = new Event();
+          resultEvent.addField(StateBufferController.VALUES, stateBuffer.get(key));
+          resultEvent.addField(StateBufferController.STATE, key);
+          out.collect(resultEvent);
+      }
+    }
+
   }
 
   @Override