You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/05/29 21:21:47 UTC
[incubator-streampipes-extensions] 01/02: Working on state buffer
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-149
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit bc1752e8ef0c0e4d9790cddbe011a544482a73b1
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Fri May 29 09:10:04 2020 +0200
Working on state buffer
---
.../jvm/processor/state/buffer/StateBuffer.java | 59 ++++++++++++++
.../state/buffer/StateBufferController.java | 87 +++++++++++++++++++++
.../state/buffer/StateBufferParameters.java | 61 +++++++++++++++
.../documentation.md | 50 ++++++++++++
.../icon.png | Bin 0 -> 15788 bytes
.../strings.en | 18 +++++
6 files changed, 275 insertions(+)
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
new file mode 100644
index 0000000..3849eed
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBuffer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.buffer;
+
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StateBuffer implements EventProcessor<StateBufferParameters> {
+
+ private static Logger LOG;
+ private String timeProperty;
+ private String stateProperty;
+ private String sensorValueProperty;
+
+
+ @Override
+ public void onInvocation(StateBufferParameters stateBufferParameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
+ LOG = stateBufferParameters.getGraph().getLogger(StateBuffer.class);
+
+ this.timeProperty = stateBufferParameters.getTimeProperty();
+ this.stateProperty = stateBufferParameters.getStateProperty();
+ this.sensorValueProperty = stateBufferParameters.getSensorValueProperty();
+ }
+
+ @Override
+ public void onEvent(Event inputEvent, SpOutputCollector out) {
+ // TODO
+
+ out.collect(inputEvent);
+ }
+
+ @Override
+ public void onDetach() {
+ }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
new file mode 100644
index 0000000..57cc747
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferController.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.buffer;
+
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.vocabulary.SPSensor;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.util.List;
+
+public class StateBufferController extends StandaloneEventProcessingDeclarer<StateBufferParameters> {
+
+ public static final String TIMESTAMP_FIELD_ID = "timestampId";
+ public static final String STATE_FIELD_ID = "stateId";
+ public static final String SENSOR_VALUE_FIELD_ID = "sensorValueFieldId";
+
+ public static final String VALUES = "values";
+ public static final String STATE = "state";
+
+ public static final String RESULT_RUNTIME_NAME = "current-state";
+ public static final String RESULT_STATE_FIELD_ID = "result-state";
+
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.buffer")
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_FIELD_ID),
+ PropertyScope.HEADER_PROPERTY)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.domainPropertyReq(SPSensor.STATE),
+ Labels.withId(STATE_FIELD_ID),
+ PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(SENSOR_VALUE_FIELD_ID),
+ PropertyScope.MEASUREMENT_PROPERTY)
+ .build()
+ )
+ .outputStrategy(OutputStrategies.fixed(
+ EpProperties.listDoubleEp(Labels.withId(VALUES), RESULT_RUNTIME_NAME, SO.Number),
+ EpProperties.stringEp(Labels.withId(STATE), RESULT_STATE_FIELD_ID, SPSensor.STATE)
+ ))
+ .build();
+ }
+
+ @Override
+ public ConfiguredEventProcessor<StateBufferParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+
+ String timeProperty = extractor.mappingPropertyValue(TIMESTAMP_FIELD_ID);
+ String stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
+ String sensorValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_FIELD_ID);
+
+ StateBufferParameters params = new StateBufferParameters(graph, timeProperty, stateProperty, sensorValueProperty);
+
+ return new ConfiguredEventProcessor<>(params, StateBuffer::new);
+ }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferParameters.java
new file mode 100644
index 0000000..72a6bba
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferParameters.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.buffer;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+import java.util.List;
+
+public class StateBufferParameters extends EventProcessorBindingParams {
+ private String timeProperty;
+ private String stateProperty;
+ private String sensorValueProperty;
+
+ public StateBufferParameters(DataProcessorInvocation graph, String timeProperty, String stateProperty, String sensorValueProperty) {
+ super(graph);
+ this.timeProperty = timeProperty;
+ this.stateProperty = stateProperty;
+ this.sensorValueProperty = sensorValueProperty;
+ }
+
+ public String getTimeProperty() {
+ return timeProperty;
+ }
+
+ public void setTimeProperty(String timeProperty) {
+ this.timeProperty = timeProperty;
+ }
+
+ public String getStateProperty() {
+ return stateProperty;
+ }
+
+ public void setStateProperty(String stateProperty) {
+ this.stateProperty = stateProperty;
+ }
+
+ public String getSensorValueProperty() {
+ return sensorValueProperty;
+ }
+
+ public void setSensorValueProperty(String sensorValueProperty) {
+ this.sensorValueProperty = sensorValueProperty;
+ }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md
new file mode 100644
index 0000000..0f648de
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/documentation.md
@@ -0,0 +1,50 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## Boolean Timer
+
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+This processor measures how long a boolean value does not change. Once the value is changes the event with the measured time is emitted.
+
+
+***
+
+## Required input
+
+A boolean value is required in the data stream.
+
+### Field
+
+The boolean field which is monitored for state changes.
+
+***
+
+## Configuration
+
+### Timer value
+Define whether it should be measured how long the value is true or how long the value is false.
+
+## Output
+Appends a field with the time how long the value did not change. Is emitted on the change of the boolean value. Runtime name: measured_time
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png
new file mode 100644
index 0000000..e56351b
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/strings.en
new file mode 100644
index 0000000..a987448
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.buffer/strings.en
@@ -0,0 +1,18 @@
+org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.title=State Buffer
+org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.description=Buffers a sensor values during a state
+
+timestampId.title=Timestamp
+timestampId.description=Field with the timestamp of event
+
+stateId.title=State
+stateId.description=Field containing the state
+
+sensorValueFieldId.title=Sensor Value to Cache
+sensorValueFieldId.description=Select the sensor value that should be cached, while the state is active
+
+values.title=values
+values.description=
+
+state.title=state
+state.description=
+