You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/06/25 15:29:07 UTC
[incubator-streampipes-extensions] branch edge-extensions updated:
added sink for latency evaluation; removed latency processor
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 75cc3b8 added sink for latency evaluation; removed latency processor
75cc3b8 is described below
commit 75cc3b8f8a0224da0a39cbe1fc5602c5ead7b60f
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Jun 25 15:20:32 2021 +0200
added sink for latency evaluation; removed latency processor
---
.../extensions/all/jvm/AllExtensionsInit.java | 5 +-
.../pe/jvm/AllPipelineElementsInit.java | 5 +-
.../processor/latencymeasure/LatencyMeasure.java | 47 ----------------
.../latencymeasure/LatencyMeasureController.java | 62 ---------------------
.../latencymeasure/LatencyMeasureParameters.java | 35 ------------
.../icon.png | Bin 5118 -> 0 bytes
.../strings.en | 2 -
.../jvm/processor/dummy/DummyController.java | 4 +-
.../jvm/logger/LatencyMqttSinkController.java | 61 ++++++++++++++++++++
.../documentation.md | 16 +++---
.../icon.png | Bin 0 -> 12981 bytes
.../strings.en | 9 +++
12 files changed, 86 insertions(+), 160 deletions(-)
diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index 6f6b3c2..b1e8bae 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -50,7 +50,6 @@ import org.apache.streampipes.connect.protocol.stream.pulsar.PulsarProtocol;
import org.apache.streampipes.container.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.extensions.all.jvm.config.AllExtensionsConfig;
-import org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure.LatencyMeasureController;
import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
import org.apache.streampipes.processors.filters.jvm.processor.dummy.DummyController;
import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalController;
@@ -125,6 +124,7 @@ import org.apache.streampipes.sinks.databases.jvm.postgresql.PostgreSqlControlle
import org.apache.streampipes.sinks.databases.jvm.redis.RedisController;
import org.apache.streampipes.sinks.internal.jvm.dashboard.DashboardController;
import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeController;
+import org.apache.streampipes.sinks.internal.jvm.logger.LatencyMqttSinkController;
import org.apache.streampipes.sinks.internal.jvm.notification.NotificationController;
import org.apache.streampipes.sinks.notifications.jvm.email.EmailController;
import org.apache.streampipes.sinks.notifications.jvm.onesignal.OneSignalController;
@@ -140,7 +140,6 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
.add(new SizeMeasureController())
.add(new JSEvalController())
//TODO: Remove after testing
- .add(new LatencyMeasureController())
.add(new CPUBurnerController())
// streampipes-processors-filters-jvm
.add(new NumericalFilterController())
@@ -223,6 +222,8 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
.add(new NotificationController())
.add(new DataLakeController())
.add(new DashboardController())
+ //TODO: remove after testing
+ .add(new LatencyMqttSinkController())
// streampipes-sinks-notifications-jvm
.add(new EmailController())
.add(new TelegramController())
diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index b349f04..a0f50b3 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -29,7 +29,6 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.pe.jvm.config.AllPipelineElementsConfig;
import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumController;
import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalController;
-import org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure.LatencyMeasureController;
import org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureController;
import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeController;
import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
@@ -108,6 +107,7 @@ import org.apache.streampipes.sinks.databases.jvm.postgresql.PostgreSqlControlle
import org.apache.streampipes.sinks.databases.jvm.redis.RedisController;
import org.apache.streampipes.sinks.internal.jvm.dashboard.DashboardController;
import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeController;
+import org.apache.streampipes.sinks.internal.jvm.logger.LatencyMqttSinkController;
import org.apache.streampipes.sinks.internal.jvm.notification.NotificationController;
import org.apache.streampipes.sinks.notifications.jvm.email.EmailController;
import org.apache.streampipes.sinks.notifications.jvm.onesignal.OneSignalController;
@@ -124,7 +124,6 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
.add(new SizeMeasureController())
.add(new JSEvalController())
//TODO: Remove after testing
- .add(new LatencyMeasureController())
.add(new CPUBurnerController())
// streampipes-processors-filters-jvm
.add(new NumericalFilterController())
@@ -211,6 +210,8 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
.add(new NotificationController())
.add(new DataLakeController())
.add(new DashboardController())
+ // TODO: delete this one after testing
+ .add(new LatencyMqttSinkController())
// streampipes-sinks-notifications-jvm
.add(new EmailController())
.add(new TelegramController())
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasure.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasure.java
deleted file mode 100644
index 0a37add..0000000
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasure.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-public class LatencyMeasure implements EventProcessor<LatencyMeasureParameters> {
-
- private String timestampField;
-
- @Override
- public void onInvocation(LatencyMeasureParameters parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
- this.timestampField = parameters.getTimestampField();
- }
-
- @Override
- public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
- long timestamp = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
- long latency = System.currentTimeMillis() - timestamp;
- event.addField(LatencyMeasureController.EVENT_LATENCY, latency);
- collector.collect(event);
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
-
- }
-}
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureController.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureController.java
deleted file mode 100644
index 44f2349..0000000
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureController.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class LatencyMeasureController extends StandaloneEventProcessingDeclarer<LatencyMeasureParameters> {
-
- final static String EVENT_LATENCY = "eventLatency";
- private static final String TIMESTAMP = "timestamp";
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.enricher.jvm.latencymeasure")
- .category(DataProcessorType.ENRICH)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP),
- PropertyScope.NONE).build())
- .outputStrategy(OutputStrategies.append(EpProperties.doubleEp(
- Labels.withId(EVENT_LATENCY),
- EVENT_LATENCY,
- "https://schema.org/processingTime")))
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<LatencyMeasureParameters>
- onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
- String timestampField = extractor.mappingPropertyValue(TIMESTAMP);
- LatencyMeasureParameters staticParam = new LatencyMeasureParameters(graph, timestampField);
-
- return new ConfiguredEventProcessor<>(staticParam, LatencyMeasure::new);
- }
-}
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureParameters.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureParameters.java
deleted file mode 100644
index b92c3b2..0000000
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureParameters.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class LatencyMeasureParameters extends EventProcessorBindingParams {
-
- private String timestampField;
-
- public LatencyMeasureParameters(DataProcessorInvocation graph, String timestampField) {
- super(graph);
- this.timestampField = timestampField;
- }
-
- public String getTimestampField(){
- return this.timestampField;
- }
-}
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/icon.png b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/icon.png
deleted file mode 100644
index f39405d..0000000
Binary files a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/icon.png and /dev/null differ
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/strings.en b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/strings.en
deleted file mode 100644
index 7b2383b..0000000
--- a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/strings.en
+++ /dev/null
@@ -1,2 +0,0 @@
-org.apache.streampipes.processors.enricher.jvm.latencymeasure.title=Latency Measure
-org.apache.streampipes.processors.enricher.jvm.latencymeasure.description=Measures the Latency of an Event.
\ No newline at end of file
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/dummy/DummyController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/dummy/DummyController.java
index cab379d..250745d 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/dummy/DummyController.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/dummy/DummyController.java
@@ -74,6 +74,8 @@ public class DummyController extends StreamPipesReconfigurableProcessor {
// transform event
event.addField("appended-reconfigurable", reconfigurableValue);
collector.collect(event);
+ Object[] obs = {System.currentTimeMillis(), "raw event", "", reconfigurableValue};
+ EvaluationLogger.getInstance().logMQTT("Reconfiguration", obs);
}
@Override
@@ -84,7 +86,7 @@ public class DummyController extends StreamPipesReconfigurableProcessor {
@Override
public void onReconfigurationEvent(Event event) throws SpRuntimeException {
Object[] obs = {System.currentTimeMillis(), "processor reconfigured", nrRuns++, event.getFieldByRuntimeName("i-am-reconfigurable").getAsPrimitive().getAsDouble()};
- EvaluationLogger.getInstance().logMQTT("Reconfiguration", obs);
reconfigurableValue = event.getFieldByRuntimeName("i-am-reconfigurable").getAsPrimitive().getAsDouble();
+ EvaluationLogger.getInstance().logMQTT("Reconfiguration", obs);
}
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/logger/LatencyMqttSinkController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/logger/LatencyMqttSinkController.java
new file mode 100644
index 0000000..b59d19e
--- /dev/null
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/logger/LatencyMqttSinkController.java
@@ -0,0 +1,61 @@
+package org.apache.streampipes.sinks.internal.jvm.logger;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+public class LatencyMqttSinkController extends StreamPipesDataSink {
+
+ private EvaluationLogger logger;
+ private String topic;
+ private String timestampField;
+
+ private static final String LOGGER_TOPIC = "logger-topic-name";
+ private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
+
+ @Override
+ public DataSinkDescription declareModel() {
+ return DataSinkBuilder.create("org.apache.streampipes.sinks.internal.jvm.logger")
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .category(DataSinkType.FORWARD)
+ .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE).build())
+ .requiredTextParameter(Labels.withId(LOGGER_TOPIC))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ logger = EvaluationLogger.getInstance();
+ topic = parameters.extractor().singleValueParameter(LOGGER_TOPIC, String.class);
+ timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
+ }
+
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ long timestamp = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
+ long latency = System.currentTimeMillis() - timestamp;
+ Object[] obs = {System.currentTimeMillis(), "event received", "", latency};
+ logger.logMQTT(topic, obs);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/documentation.md b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/documentation.md
similarity index 76%
rename from streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/documentation.md
rename to streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/documentation.md
index d5d63d0..1d13dda 100644
--- a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/documentation.md
+++ b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/documentation.md
@@ -16,7 +16,7 @@
~
-->
-## Size Measure
+## MQTT logger
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,18 +26,16 @@
## Description
-Measures the latency of an incoming event and appends this number to the event by serializing it.
-
-***
-
-## Required input
-The latency measure processor requires a timestamp.
+Analyzes the latency of an event and sends it to MQTT logger.
***
## Configuration
--
+### Topic name
+
+The name of the topic the latency should be written to.
## Output
-The latency measure processor appends the latency of the event as a double. The rest of the event stays the same.
\ No newline at end of file
+
+(not applicable for data sinks)
diff --git a/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/icon.png b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/icon.png
new file mode 100644
index 0000000..1bbe0d5
Binary files /dev/null and b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/icon.png differ
diff --git a/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/strings.en b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/strings.en
new file mode 100644
index 0000000..9ca53f7
--- /dev/null
+++ b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/strings.en
@@ -0,0 +1,9 @@
+org.apache.streampipes.sinks.internal.jvm.logger.title=Latency MQTT Logger
+org.apache.streampipes.sinks.internal.jvm.logger.description=Calculates latency and sends events to MQTT Logger.
+
+
+timestamp_mapping.title=Timestamp Field
+timestamp_mapping.description=The value which contains a timestamp
+
+logger-topic-name.title=Topic name
+logger-topic-name.description=The name of the topic events should be send to.