You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/02 15:32:34 UTC
[incubator-streampipes-extensions] 02/02: Finish state buffer
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-149
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 7f7123adbaaa41688e06c16b0ec2e3d7373d04e0
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Jun 2 17:32:10 2020 +0200
Finish state buffer
---
.../jvm/processor/state/buffer/StateBuffer.java | 13 ++++++++++---
.../state/buffer/StateBufferController.java | 5 ++++-
.../icon.png | Bin 15788 -> 13154 bytes
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
index c0789d6..b398289 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
@@ -57,7 +57,6 @@ public class StateBuffer implements EventProcessor<StateBufferParameters> {
long timestamp = inputEvent.getFieldBySelector(this.timeProperty).getAsPrimitive().getAsLong();
List<String> states = inputEvent.getFieldBySelector(this.stateProperty).getAsList().parseAsSimpleType(String.class);
-// List<String> states = Arrays.asList(inputEvent.getFieldBySelector(this.stateProperty).getAsPrimitive().getAsString());
double value = inputEvent.getFieldBySelector(this.sensorValueProperty).getAsPrimitive().getAsDouble();
// add value to state buffer
@@ -65,20 +64,28 @@ public class StateBuffer implements EventProcessor<StateBufferParameters> {
if (stateBuffer.containsKey(state)) {
stateBuffer.get(state).add(value);
} else {
- stateBuffer.put(state, Arrays.asList(value));
+ List tmp = new ArrayList();
+ tmp.add(value);
+ stateBuffer.put(state, tmp);
}
}
// emit event if state is not in event anymore
+ List<String> keysToRemove = new ArrayList<>();
for (String key : stateBuffer.keySet()) {
if (!states.contains(key)) {
Event resultEvent = new Event();
resultEvent.addField(StateBufferController.VALUES, stateBuffer.get(key));
resultEvent.addField(StateBufferController.STATE, key);
- out.collect(resultEvent);
+ resultEvent.addField(StateBufferController.TIMESTAMP, timestamp);
+ out.collect(resultEvent);
+ keysToRemove.add(key);
}
}
+ for (String s : keysToRemove) {
+ stateBuffer.remove(s);
+ }
}
@Override
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
index 57cc747..bca42ee 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
@@ -41,6 +41,8 @@ public class StateBufferController extends StandaloneEventProcessingDeclarer<Sta
public static final String VALUES = "values";
public static final String STATE = "state";
+ public static final String TIMESTAMP = "timestamp";
+
public static final String RESULT_RUNTIME_NAME = "current-state";
public static final String RESULT_STATE_FIELD_ID = "result-state";
@@ -57,7 +59,7 @@ public class StateBufferController extends StandaloneEventProcessingDeclarer<Sta
Labels.withId(TIMESTAMP_FIELD_ID),
PropertyScope.HEADER_PROPERTY)
.requiredPropertyWithUnaryMapping(
- EpRequirements.domainPropertyReq(SPSensor.STATE),
+ EpRequirements.domainPropertyReqList(SPSensor.STATE),
Labels.withId(STATE_FIELD_ID),
PropertyScope.NONE)
.requiredPropertyWithUnaryMapping(
@@ -67,6 +69,7 @@ public class StateBufferController extends StandaloneEventProcessingDeclarer<Sta
.build()
)
.outputStrategy(OutputStrategies.fixed(
+ EpProperties.timestampProperty(TIMESTAMP),
EpProperties.listDoubleEp(Labels.withId(VALUES), RESULT_RUNTIME_NAME, SO.Number),
EpProperties.stringEp(Labels.withId(STATE), RESULT_STATE_FIELD_ID, SPSensor.STATE)
))
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png
index e56351b..fc5e913 100644
Binary files a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png differ