You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/05/29 21:21:47 UTC

[incubator-streampipes-extensions] 01/02: Working on state buffer

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-149
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit bc1752e8ef0c0e4d9790cddbe011a544482a73b1
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Fri May 29 09:10:04 2020 +0200

    Working on state buffer
---
 .../jvm/processor/state/buffer/StateBuffer.java    |  59 ++++++++++++++
 .../state/buffer/StateBufferController.java        |  87 +++++++++++++++++++++
 .../state/buffer/StateBufferParameters.java        |  61 +++++++++++++++
 .../documentation.md                               |  50 ++++++++++++
 .../icon.png                                       | Bin 0 -> 15788 bytes
 .../strings.en                                     |  18 +++++
 6 files changed, 275 insertions(+)

diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
new file mode 100644
index 0000000..3849eed
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.buffer;
+
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StateBuffer implements EventProcessor<StateBufferParameters> {
+
+  private static Logger LOG;
+  private String timeProperty;
+  private String stateProperty;
+  private String sensorValueProperty;
+
+
+  @Override
+  public void onInvocation(StateBufferParameters stateBufferParameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) {
+    LOG = stateBufferParameters.getGraph().getLogger(StateBuffer.class);
+
+    this.timeProperty = stateBufferParameters.getTimeProperty();
+    this.stateProperty = stateBufferParameters.getStateProperty();
+    this.sensorValueProperty = stateBufferParameters.getSensorValueProperty();
+  }
+
+  @Override
+  public void onEvent(Event inputEvent, SpOutputCollector out) {
+      // TODO
+
+    out.collect(inputEvent);
+  }
+
+  @Override
+  public void onDetach() {
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
new file mode 100644
index 0000000..57cc747
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.buffer;
+
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.vocabulary.SPSensor;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.util.List;
+
+public class StateBufferController extends StandaloneEventProcessingDeclarer<StateBufferParameters> {
+
+  public static final String TIMESTAMP_FIELD_ID = "timestampId";
+  public static final String STATE_FIELD_ID = "stateId";
+  public static final String SENSOR_VALUE_FIELD_ID = "sensorValueFieldId";
+
+  public static final String VALUES = "values";
+  public static final String STATE = "state";
+
+  public static final String RESULT_RUNTIME_NAME = "current-state";
+  public static final String RESULT_STATE_FIELD_ID = "result-state";
+
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.buffer")
+            .withLocales(Locales.EN)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                    .requiredPropertyWithUnaryMapping(
+                            EpRequirements.timestampReq(),
+                            Labels.withId(TIMESTAMP_FIELD_ID),
+                            PropertyScope.HEADER_PROPERTY)
+                    .requiredPropertyWithUnaryMapping(
+                            EpRequirements.domainPropertyReq(SPSensor.STATE),
+                            Labels.withId(STATE_FIELD_ID),
+                            PropertyScope.NONE)
+                    .requiredPropertyWithUnaryMapping(
+                            EpRequirements.numberReq(),
+                            Labels.withId(SENSOR_VALUE_FIELD_ID),
+                            PropertyScope.MEASUREMENT_PROPERTY)
+                    .build()
+            )
+            .outputStrategy(OutputStrategies.fixed(
+                    EpProperties.listDoubleEp(Labels.withId(VALUES), RESULT_RUNTIME_NAME, SO.Number),
+                    EpProperties.stringEp(Labels.withId(STATE), RESULT_STATE_FIELD_ID, SPSensor.STATE)
+            ))
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<StateBufferParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+
+    String timeProperty = extractor.mappingPropertyValue(TIMESTAMP_FIELD_ID);
+    String stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
+    String sensorValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_FIELD_ID);
+
+    StateBufferParameters params = new StateBufferParameters(graph, timeProperty, stateProperty, sensorValueProperty);
+
+    return new ConfiguredEventProcessor<>(params, StateBuffer::new);
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferParameters.java
new file mode 100644
index 0000000..72a6bba
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferParameters.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.buffer;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+import java.util.List;
+
+public class StateBufferParameters extends EventProcessorBindingParams {
+    private String timeProperty;
+    private String stateProperty;
+    private String sensorValueProperty;
+
+    public StateBufferParameters(DataProcessorInvocation graph, String timeProperty, String stateProperty, String sensorValueProperty) {
+        super(graph);
+        this.timeProperty = timeProperty;
+        this.stateProperty = stateProperty;
+        this.sensorValueProperty = sensorValueProperty;
+    }
+
+    public String getTimeProperty() {
+        return timeProperty;
+    }
+
+    public void setTimeProperty(String timeProperty) {
+        this.timeProperty = timeProperty;
+    }
+
+    public String getStateProperty() {
+        return stateProperty;
+    }
+
+    public void setStateProperty(String stateProperty) {
+        this.stateProperty = stateProperty;
+    }
+
+    public String getSensorValueProperty() {
+        return sensorValueProperty;
+    }
+
+    public void setSensorValueProperty(String sensorValueProperty) {
+        this.sensorValueProperty = sensorValueProperty;
+    }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md
new file mode 100644
index 0000000..0f648de
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md
@@ -0,0 +1,50 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## Boolean Timer
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+This processor measures how long a boolean value does not change. Once the value is changes the event with the measured time is emitted.
+
+
+***
+
+## Required input
+
+A boolean value is required in the data stream.
+
+### Field
+
+The boolean field which is monitored for state changes.
+
+***
+
+## Configuration
+
+### Timer value
+Define whether it should be measured how long the value is true or how long the value is false.
+
+## Output
+Appends a field with the time how long the value did not change. Is emitted on the change of the boolean value. Runtime name: measured_time 
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png
new file mode 100644
index 0000000..e56351b
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/strings.en
new file mode 100644
index 0000000..a987448
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/strings.en
@@ -0,0 +1,18 @@
+org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.title=State Buffer
+org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.description=Buffers a sensor values during a state
+
+timestampId.title=Timestamp
+timestampId.description=Field with the timestamp of event
+
+stateId.title=State
+stateId.description=Field containing the state
+
+sensorValueFieldId.title=Sensor Value to Cache
+sensorValueFieldId.description=Select the sensor value that should be cached, while the state is active
+
+values.title=values
+values.description=
+
+state.title=state
+state.description=
+