You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/06/25 15:29:07 UTC

[incubator-streampipes-extensions] branch edge-extensions updated: added sink for latency evaluation; removed latency processor

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 75cc3b8  added sink for latency evaluation; removed latency processor
75cc3b8 is described below

commit 75cc3b8f8a0224da0a39cbe1fc5602c5ead7b60f
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Jun 25 15:20:32 2021 +0200

    added sink for latency evaluation; removed latency processor
---
 .../extensions/all/jvm/AllExtensionsInit.java      |   5 +-
 .../pe/jvm/AllPipelineElementsInit.java            |   5 +-
 .../processor/latencymeasure/LatencyMeasure.java   |  47 ----------------
 .../latencymeasure/LatencyMeasureController.java   |  62 ---------------------
 .../latencymeasure/LatencyMeasureParameters.java   |  35 ------------
 .../icon.png                                       | Bin 5118 -> 0 bytes
 .../strings.en                                     |   2 -
 .../jvm/processor/dummy/DummyController.java       |   4 +-
 .../jvm/logger/LatencyMqttSinkController.java      |  61 ++++++++++++++++++++
 .../documentation.md                               |  16 +++---
 .../icon.png                                       | Bin 0 -> 12981 bytes
 .../strings.en                                     |   9 +++
 12 files changed, 86 insertions(+), 160 deletions(-)

diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index 6f6b3c2..b1e8bae 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -50,7 +50,6 @@ import org.apache.streampipes.connect.protocol.stream.pulsar.PulsarProtocol;
 import org.apache.streampipes.container.extensions.ExtensionsModelSubmitter;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.extensions.all.jvm.config.AllExtensionsConfig;
-import org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure.LatencyMeasureController;
 import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
 import org.apache.streampipes.processors.filters.jvm.processor.dummy.DummyController;
 import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalController;
@@ -125,6 +124,7 @@ import org.apache.streampipes.sinks.databases.jvm.postgresql.PostgreSqlControlle
 import org.apache.streampipes.sinks.databases.jvm.redis.RedisController;
 import org.apache.streampipes.sinks.internal.jvm.dashboard.DashboardController;
 import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeController;
+import org.apache.streampipes.sinks.internal.jvm.logger.LatencyMqttSinkController;
 import org.apache.streampipes.sinks.internal.jvm.notification.NotificationController;
 import org.apache.streampipes.sinks.notifications.jvm.email.EmailController;
 import org.apache.streampipes.sinks.notifications.jvm.onesignal.OneSignalController;
@@ -140,7 +140,6 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
                 .add(new SizeMeasureController())
                 .add(new JSEvalController())
                 //TODO: Remove after testing
-                .add(new LatencyMeasureController())
                 .add(new CPUBurnerController())
                 // streampipes-processors-filters-jvm
                 .add(new NumericalFilterController())
@@ -223,6 +222,8 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
                 .add(new NotificationController())
                 .add(new DataLakeController())
                 .add(new DashboardController())
+                //TODO: remove after testing
+                .add(new LatencyMqttSinkController())
                 // streampipes-sinks-notifications-jvm
                 .add(new EmailController())
                 .add(new TelegramController())
diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index b349f04..a0f50b3 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -29,7 +29,6 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.pe.jvm.config.AllPipelineElementsConfig;
 import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumController;
 import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalController;
-import org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure.LatencyMeasureController;
 import org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureController;
 import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeController;
 import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
@@ -108,6 +107,7 @@ import org.apache.streampipes.sinks.databases.jvm.postgresql.PostgreSqlControlle
 import org.apache.streampipes.sinks.databases.jvm.redis.RedisController;
 import org.apache.streampipes.sinks.internal.jvm.dashboard.DashboardController;
 import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeController;
+import org.apache.streampipes.sinks.internal.jvm.logger.LatencyMqttSinkController;
 import org.apache.streampipes.sinks.internal.jvm.notification.NotificationController;
 import org.apache.streampipes.sinks.notifications.jvm.email.EmailController;
 import org.apache.streampipes.sinks.notifications.jvm.onesignal.OneSignalController;
@@ -124,7 +124,6 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
             .add(new SizeMeasureController())
             .add(new JSEvalController())
             //TODO: Remove after testing
-            .add(new LatencyMeasureController())
             .add(new CPUBurnerController())
             // streampipes-processors-filters-jvm
             .add(new NumericalFilterController())
@@ -211,6 +210,8 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
             .add(new NotificationController())
             .add(new DataLakeController())
             .add(new DashboardController())
+            // TODO: delete this one after testing
+            .add(new LatencyMqttSinkController())
             // streampipes-sinks-notifications-jvm
             .add(new EmailController())
             .add(new TelegramController())
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasure.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasure.java
deleted file mode 100644
index 0a37add..0000000
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasure.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-public class LatencyMeasure implements EventProcessor<LatencyMeasureParameters> {
-
-    private String timestampField;
-
-    @Override
-    public void onInvocation(LatencyMeasureParameters parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-        this.timestampField = parameters.getTimestampField();
-    }
-
-    @Override
-    public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
-        long timestamp = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
-        long latency = System.currentTimeMillis() - timestamp;
-        event.addField(LatencyMeasureController.EVENT_LATENCY, latency);
-        collector.collect(event);
-    }
-
-    @Override
-    public void onDetach() throws SpRuntimeException {
-
-    }
-}
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureController.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureController.java
deleted file mode 100644
index 44f2349..0000000
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureController.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class LatencyMeasureController extends StandaloneEventProcessingDeclarer<LatencyMeasureParameters> {
-
-    final static String EVENT_LATENCY = "eventLatency";
-    private static final String TIMESTAMP = "timestamp";
-
-    @Override
-    public DataProcessorDescription declareModel() {
-        return ProcessingElementBuilder.create("org.apache.streampipes.processors.enricher.jvm.latencymeasure")
-                .category(DataProcessorType.ENRICH)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .withLocales(Locales.EN)
-                .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
-                        EpRequirements.timestampReq(),
-                        Labels.withId(TIMESTAMP),
-                        PropertyScope.NONE).build())
-                .outputStrategy(OutputStrategies.append(EpProperties.doubleEp(
-                        Labels.withId(EVENT_LATENCY),
-                        EVENT_LATENCY,
-                        "https://schema.org/processingTime")))
-                .build();
-    }
-
-    @Override
-    public ConfiguredEventProcessor<LatencyMeasureParameters>
-    onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-        String timestampField = extractor.mappingPropertyValue(TIMESTAMP);
-        LatencyMeasureParameters staticParam = new LatencyMeasureParameters(graph, timestampField);
-
-        return new ConfiguredEventProcessor<>(staticParam, LatencyMeasure::new);
-    }
-}
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureParameters.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureParameters.java
deleted file mode 100644
index b92c3b2..0000000
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/latencymeasure/LatencyMeasureParameters.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class LatencyMeasureParameters extends EventProcessorBindingParams {
-
-    private String timestampField;
-
-    public LatencyMeasureParameters(DataProcessorInvocation graph, String timestampField) {
-        super(graph);
-        this.timestampField = timestampField;
-    }
-
-    public String getTimestampField(){
-        return this.timestampField;
-    }
-}
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/icon.png b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/icon.png
deleted file mode 100644
index f39405d..0000000
Binary files a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/icon.png and /dev/null differ
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/strings.en b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/strings.en
deleted file mode 100644
index 7b2383b..0000000
--- a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/strings.en
+++ /dev/null
@@ -1,2 +0,0 @@
-org.apache.streampipes.processors.enricher.jvm.latencymeasure.title=Latency Measure
-org.apache.streampipes.processors.enricher.jvm.latencymeasure.description=Measures the Latency of an Event.
\ No newline at end of file
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/dummy/DummyController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/dummy/DummyController.java
index cab379d..250745d 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/dummy/DummyController.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/dummy/DummyController.java
@@ -74,6 +74,8 @@ public class DummyController extends StreamPipesReconfigurableProcessor {
     // transform event
     event.addField("appended-reconfigurable", reconfigurableValue);
     collector.collect(event);
+    Object[] obs = {System.currentTimeMillis(), "raw event", "", reconfigurableValue};
+    EvaluationLogger.getInstance().logMQTT("Reconfiguration", obs);
   }
 
   @Override
@@ -84,7 +86,7 @@ public class DummyController extends StreamPipesReconfigurableProcessor {
   @Override
   public void onReconfigurationEvent(Event event) throws SpRuntimeException {
     Object[] obs = {System.currentTimeMillis(), "processor reconfigured", nrRuns++, event.getFieldByRuntimeName("i-am-reconfigurable").getAsPrimitive().getAsDouble()};
-    EvaluationLogger.getInstance().logMQTT("Reconfiguration", obs);
     reconfigurableValue = event.getFieldByRuntimeName("i-am-reconfigurable").getAsPrimitive().getAsDouble();
+    EvaluationLogger.getInstance().logMQTT("Reconfiguration", obs);
   }
 }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/logger/LatencyMqttSinkController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/logger/LatencyMqttSinkController.java
new file mode 100644
index 0000000..b59d19e
--- /dev/null
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/logger/LatencyMqttSinkController.java
@@ -0,0 +1,61 @@
+package org.apache.streampipes.sinks.internal.jvm.logger;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+public class LatencyMqttSinkController extends StreamPipesDataSink {
+
+    private EvaluationLogger logger;
+    private String topic;
+    private String timestampField;
+
+    private static final String LOGGER_TOPIC = "logger-topic-name";
+    private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
+
+    @Override
+    public DataSinkDescription declareModel() {
+        return DataSinkBuilder.create("org.apache.streampipes.sinks.internal.jvm.logger")
+                .withLocales(Locales.EN)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .category(DataSinkType.FORWARD)
+                .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+                        EpRequirements.timestampReq(),
+                        Labels.withId(TIMESTAMP_MAPPING_KEY),
+                        PropertyScope.NONE).build())
+                .requiredTextParameter(Labels.withId(LOGGER_TOPIC))
+                .build();
+    }
+
+    @Override
+    public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+        logger = EvaluationLogger.getInstance();
+        topic = parameters.extractor().singleValueParameter(LOGGER_TOPIC, String.class);
+        timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
+    }
+
+    @Override
+    public void onEvent(Event event) throws SpRuntimeException {
+        long timestamp = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
+        long latency = System.currentTimeMillis() - timestamp;
+        Object[] obs = {System.currentTimeMillis(), "event received", "", latency};
+        logger.logMQTT(topic, obs);
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+
+    }
+}
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/documentation.md b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/documentation.md
similarity index 76%
rename from streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/documentation.md
rename to streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/documentation.md
index d5d63d0..1d13dda 100644
--- a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.latencymeasure/documentation.md
+++ b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## Size Measure
+## MQTT logger
 
 <p align="center"> 
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,18 +26,16 @@
 
 ## Description
 
-Measures the latency of an incoming event and appends this number to the event by serializing it.
-
-***
-
-## Required input
-The latency measure processor requires a timestamp.
+Analyzes the latency of an event and sends it to MQTT logger.
 
 ***
 
 ## Configuration
 
--
+### Topic name
+
+The name of the topic the latency should be written to.
 
 ## Output
-The latency measure processor appends the latency of the event as a double. The rest of the event stays the same.
\ No newline at end of file
+
+(not applicable for data sinks)
diff --git a/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/icon.png b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/icon.png
new file mode 100644
index 0000000..1bbe0d5
Binary files /dev/null and b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/icon.png differ
diff --git a/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/strings.en b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/strings.en
new file mode 100644
index 0000000..9ca53f7
--- /dev/null
+++ b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.logger/strings.en
@@ -0,0 +1,9 @@
+org.apache.streampipes.sinks.internal.jvm.logger.title=Latency MQTT Logger
+org.apache.streampipes.sinks.internal.jvm.logger.description=Calculates latency and sends events to MQTT Logger.
+
+
+timestamp_mapping.title=Timestamp Field
+timestamp_mapping.description=The value which contains a timestamp
+
+logger-topic-name.title=Topic name
+logger-topic-name.description=The name of the topic events should be send to.