You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 14:51:12 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-501] Add throughput monitoring processor
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 2b9134d [STREAMPIPES-501] Add throughput monitoring processor
2b9134d is described below
commit 2b9134d344bc92ab4599b7b376c36960adb21c3a
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 15:51:02 2022 +0100
[STREAMPIPES-501] Add throughput monitoring processor
---
.../processors/filters/jvm/FiltersJvmInit.java | 2 +
.../throughputmon/ThroughputMonitorProcessor.java | 97 ++++++++++++++++++++++
.../documentation.md | 50 +++++++++++
.../strings.en | 21 +++++
4 files changed, 170 insertions(+)
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index 001446b..25af677 100644
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -35,6 +35,7 @@ import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitPr
import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.NumericalFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
+import org.apache.streampipes.processors.filters.jvm.processor.throughputmon.ThroughputMonitorProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.projection.ProjectionProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.schema.MergeBySchemaProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.textfilter.TextFilterProcessor;
@@ -57,6 +58,7 @@ public class FiltersJvmInit extends StandaloneModelSubmitter {
new TextFilterProcessor(),
new NumericalFilterProcessor(),
new ThresholdDetectionProcessor(),
+ new ThroughputMonitorProcessor(),
new ProjectionProcessor(),
new MergeByEnrichProcessor(),
new MergeByTimeProcessor(),
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessor.java
new file mode 100644
index 0000000..63d436d
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessor.java
@@ -0,0 +1,97 @@
+package org.apache.streampipes.processors.filters.jvm.processor.throughputmon;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+public class ThroughputMonitorProcessor extends StreamPipesDataProcessor {
+
+ private final static String BATCH_WINDOW_KEY = "batch-window-key";
+
+ private final static String TIMESTAMP_FIELD = "timestamp";
+ private final static String START_TIME_FIELD = "starttime";
+ private final static String END_TIME_FIELD = "endtime";
+ private final static String DURATION_FIELD = "duration";
+ private final static String EVENT_COUNT_FIELD = "eventcount";
+ private final static String THROUGHPUT_FIELD = "throughput";
+
+ private int batchSize;
+
+ private StopWatch stopWatch;
+ private long eventCount = 0;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.throughputmon")
+ .category(DataProcessorType.FILTER)
+ .withAssets(Assets.DOCUMENTATION)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty()).build())
+ .outputStrategy(OutputStrategies.fixed(
+ EpProperties.timestampProperty(TIMESTAMP_FIELD),
+ EpProperties.longEp(Labels.withId(START_TIME_FIELD), START_TIME_FIELD, SO.DateTime),
+ EpProperties.longEp(Labels.withId(END_TIME_FIELD), END_TIME_FIELD, SO.DateTime),
+ EpProperties.longEp(Labels.withId(DURATION_FIELD), DURATION_FIELD, SO.Number),
+ EpProperties.longEp(Labels.withId(EVENT_COUNT_FIELD), EVENT_COUNT_FIELD, SO.Number),
+ EpProperties.doubleEp(Labels.withId(THROUGHPUT_FIELD), THROUGHPUT_FIELD, SO.Number)))
+ .requiredIntegerParameter(Labels.withId(BATCH_WINDOW_KEY))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+ this.batchSize = parameters.extractor().singleValueParameter(BATCH_WINDOW_KEY, Integer.class);
+ this.stopWatch = new StopWatch();
+ this.restartTimer();
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+ this.eventCount++;
+ if (this.eventCount == this.batchSize) {
+ this.stopWatch.stop();
+ Event outEvent = buildEvent();
+ collector.collect(outEvent);
+ this.eventCount = 0;
+ this.restartTimer();
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ this.stopWatch.stop();
+ }
+
+ private Event buildEvent() {
+ Event event = new Event();
+ event.addField(TIMESTAMP_FIELD, System.currentTimeMillis());
+ event.addField(START_TIME_FIELD, this.stopWatch.getStartTime());
+ event.addField(END_TIME_FIELD, this.stopWatch.getStopTime());
+ event.addField(DURATION_FIELD, this.stopWatch.getTime());
+ event.addField(EVENT_COUNT_FIELD, this.eventCount);
+ event.addField(THROUGHPUT_FIELD, (this.eventCount / (((double) this.stopWatch.getTime()) / 1000)));
+
+ return event;
+ }
+
+ private void restartTimer() {
+ this.stopWatch.reset();
+ this.eventCount = 0;
+ this.stopWatch.start();
+ }
+}
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/documentation.md b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/documentation.md
new file mode 100644
index 0000000..bf71467
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/documentation.md
@@ -0,0 +1,50 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## Throughput monitoring
+
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+The Throughput Monitoring processor computes throughput statistics.
+
+***
+
+## Required Input
+The processor works with any input event.
+
+***
+
+## Configuration
+
+### Batch Window Size
+Specifies the number of events that should be used for calculating throughput statistics.
+
+
+## Output
+The processor outputs a new event containing:
+* The current timestamp (timestamp)
+* The start time of the batch window (starttime)
+* The end time of the batch window (endtime)
+* The duration between both windows (duration)
+* The number of events collected in the window (should be equal to batch size)
+* The throughput in events per second
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/strings.en b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/strings.en
new file mode 100644
index 0000000..785975e
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/strings.en
@@ -0,0 +1,21 @@
+org.apache.streampipes.processors.filters.jvm.throughputmon.title=Throughput Monitor
+org.apache.streampipes.processors.filters.jvm.throughputmon.description=Monitors and forwards event throughput statistics
+
+batch-window-key.title=Batch Window Size
+batch-window-key.description=The number of events used for calculating the statistics
+
+startTime.title=Start Time
+startTime.description=
+
+endTime.title=End Time
+endTime.description=
+
+duration.title=Duration
+duration.description=
+
+eventCount.title=Event Count
+eventCount.description=
+
+throughput.title=Throughput
+throughput.description=
+