You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/02 15:32:34 UTC

[incubator-streampipes-extensions] 02/02: Finish state buffer

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-149
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 7f7123adbaaa41688e06c16b0ec2e3d7373d04e0
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Jun 2 17:32:10 2020 +0200

    Finish state buffer
---
 .../jvm/processor/state/buffer/StateBuffer.java     |  13 ++++++++++---
 .../state/buffer/StateBufferController.java         |   5 ++++-
 .../icon.png                                        | Bin 15788 -> 13154 bytes
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
index c0789d6..b398289 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
@@ -57,7 +57,6 @@ public class StateBuffer implements EventProcessor<StateBufferParameters> {
 
     long timestamp = inputEvent.getFieldBySelector(this.timeProperty).getAsPrimitive().getAsLong();
     List<String> states = inputEvent.getFieldBySelector(this.stateProperty).getAsList().parseAsSimpleType(String.class);
-//    List<String> states = Arrays.asList(inputEvent.getFieldBySelector(this.stateProperty).getAsPrimitive().getAsString());
     double value = inputEvent.getFieldBySelector(this.sensorValueProperty).getAsPrimitive().getAsDouble();
 
     // add value to state buffer
@@ -65,20 +64,28 @@ public class StateBuffer implements EventProcessor<StateBufferParameters> {
       if (stateBuffer.containsKey(state)) {
         stateBuffer.get(state).add(value);
       } else {
-        stateBuffer.put(state, Arrays.asList(value));
+        List tmp = new ArrayList();
+        tmp.add(value);
+        stateBuffer.put(state, tmp);
       }
     }
 
     // emit event if state is not in event anymore
+    List<String> keysToRemove = new ArrayList<>();
     for (String key : stateBuffer.keySet()) {
       if (!states.contains(key)) {
           Event resultEvent  = new Event();
           resultEvent.addField(StateBufferController.VALUES, stateBuffer.get(key));
           resultEvent.addField(StateBufferController.STATE, key);
-          out.collect(resultEvent);
+          resultEvent.addField(StateBufferController.TIMESTAMP, timestamp);
+        out.collect(resultEvent);
+          keysToRemove.add(key);
       }
     }
 
+    for (String s : keysToRemove) {
+      stateBuffer.remove(s);
+    }
   }
 
   @Override
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
index 57cc747..bca42ee 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
@@ -41,6 +41,8 @@ public class StateBufferController extends StandaloneEventProcessingDeclarer<Sta
 
   public static final String VALUES = "values";
   public static final String STATE = "state";
+  public static final String TIMESTAMP = "timestamp";
+
 
   public static final String RESULT_RUNTIME_NAME = "current-state";
   public static final String RESULT_STATE_FIELD_ID = "result-state";
@@ -57,7 +59,7 @@ public class StateBufferController extends StandaloneEventProcessingDeclarer<Sta
                             Labels.withId(TIMESTAMP_FIELD_ID),
                             PropertyScope.HEADER_PROPERTY)
                     .requiredPropertyWithUnaryMapping(
-                            EpRequirements.domainPropertyReq(SPSensor.STATE),
+                            EpRequirements.domainPropertyReqList(SPSensor.STATE),
                             Labels.withId(STATE_FIELD_ID),
                             PropertyScope.NONE)
                     .requiredPropertyWithUnaryMapping(
@@ -67,6 +69,7 @@ public class StateBufferController extends StandaloneEventProcessingDeclarer<Sta
                     .build()
             )
             .outputStrategy(OutputStrategies.fixed(
+                    EpProperties.timestampProperty(TIMESTAMP),
                     EpProperties.listDoubleEp(Labels.withId(VALUES), RESULT_RUNTIME_NAME, SO.Number),
                     EpProperties.stringEp(Labels.withId(STATE), RESULT_STATE_FIELD_ID, SPSensor.STATE)
             ))
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png
index e56351b..fc5e913 100644
Binary files a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png differ