You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 14:51:12 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-501] Add throughput monitoring processor

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2b9134d  [STREAMPIPES-501] Add throughput monitoring processor
2b9134d is described below

commit 2b9134d344bc92ab4599b7b376c36960adb21c3a
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 15:51:02 2022 +0100

    [STREAMPIPES-501] Add throughput monitoring processor
---
 .../processors/filters/jvm/FiltersJvmInit.java     |  2 +
 .../throughputmon/ThroughputMonitorProcessor.java  | 97 ++++++++++++++++++++++
 .../documentation.md                               | 50 +++++++++++
 .../strings.en                                     | 21 +++++
 4 files changed, 170 insertions(+)

diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index 001446b..25af677 100644
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -35,6 +35,7 @@ import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitPr
 import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.NumericalFilterProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
+import org.apache.streampipes.processors.filters.jvm.processor.throughputmon.ThroughputMonitorProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.projection.ProjectionProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.schema.MergeBySchemaProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.textfilter.TextFilterProcessor;
@@ -57,6 +58,7 @@ public class FiltersJvmInit extends StandaloneModelSubmitter {
                     new TextFilterProcessor(),
                     new NumericalFilterProcessor(),
                     new ThresholdDetectionProcessor(),
+                    new ThroughputMonitorProcessor(),
                     new ProjectionProcessor(),
                     new MergeByEnrichProcessor(),
                     new MergeByTimeProcessor(),
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessor.java
new file mode 100644
index 0000000..63d436d
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessor.java
@@ -0,0 +1,97 @@
+package org.apache.streampipes.processors.filters.jvm.processor.throughputmon;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+public class ThroughputMonitorProcessor extends StreamPipesDataProcessor {
+
+  private final static String BATCH_WINDOW_KEY = "batch-window-key";
+
+  private final static String TIMESTAMP_FIELD = "timestamp";
+  private final static String START_TIME_FIELD = "starttime";
+  private final static String END_TIME_FIELD = "endtime";
+  private final static String DURATION_FIELD = "duration";
+  private final static String EVENT_COUNT_FIELD = "eventcount";
+  private final static String THROUGHPUT_FIELD = "throughput";
+
+  private int batchSize;
+
+  private StopWatch stopWatch;
+  private long eventCount = 0;
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.throughputmon")
+            .category(DataProcessorType.FILTER)
+            .withAssets(Assets.DOCUMENTATION)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                    .create()
+                    .requiredProperty(EpRequirements.anyProperty()).build())
+            .outputStrategy(OutputStrategies.fixed(
+                    EpProperties.timestampProperty(TIMESTAMP_FIELD),
+                    EpProperties.longEp(Labels.withId(START_TIME_FIELD), START_TIME_FIELD, SO.DateTime),
+                    EpProperties.longEp(Labels.withId(END_TIME_FIELD), END_TIME_FIELD, SO.DateTime),
+                    EpProperties.longEp(Labels.withId(DURATION_FIELD), DURATION_FIELD, SO.Number),
+                    EpProperties.longEp(Labels.withId(EVENT_COUNT_FIELD), EVENT_COUNT_FIELD, SO.Number),
+                    EpProperties.doubleEp(Labels.withId(THROUGHPUT_FIELD), THROUGHPUT_FIELD, SO.Number)))
+            .requiredIntegerParameter(Labels.withId(BATCH_WINDOW_KEY))
+            .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+    this.batchSize = parameters.extractor().singleValueParameter(BATCH_WINDOW_KEY, Integer.class);
+    this.stopWatch = new StopWatch();
+    this.restartTimer();
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+    this.eventCount++;
+    if (this.eventCount == this.batchSize) {
+      this.stopWatch.stop();
+      Event outEvent = buildEvent();
+      collector.collect(outEvent);
+      this.eventCount = 0;
+      this.restartTimer();
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    this.stopWatch.stop();
+  }
+
+  private Event buildEvent() {
+    Event event = new Event();
+    event.addField(TIMESTAMP_FIELD, System.currentTimeMillis());
+    event.addField(START_TIME_FIELD, this.stopWatch.getStartTime());
+    event.addField(END_TIME_FIELD, this.stopWatch.getStopTime());
+    event.addField(DURATION_FIELD, this.stopWatch.getTime());
+    event.addField(EVENT_COUNT_FIELD, this.eventCount);
+    event.addField(THROUGHPUT_FIELD, (this.eventCount / (((double) this.stopWatch.getTime()) / 1000)));
+
+    return event;
+  }
+
+  private void restartTimer() {
+    this.stopWatch.reset();
+    this.eventCount = 0;
+    this.stopWatch.start();
+  }
+}
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/documentation.md b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/documentation.md
new file mode 100644
index 0000000..bf71467
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/documentation.md
@@ -0,0 +1,50 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## Throughput monitoring
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+The Throughput Monitoring processor computes throughput statistics.
+
+***
+
+## Required Input
+The processor works with any input event.
+
+***
+
+## Configuration
+
+### Batch Window Size
+Specifies the number of events that should be used for calculating throughput statistics.
+
+
+## Output
+The processor outputs a new event containing:
+* The current timestamp (timestamp)
+* The start time of the batch window (starttime)
+* The end time of the batch window (endtime)
+* The duration between both windows (duration)
+* The number of events collected in the window (should be equal to batch size)
+* The throughput in events per second
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/strings.en b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/strings.en
new file mode 100644
index 0000000..785975e
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.throughputmon/strings.en
@@ -0,0 +1,21 @@
+org.apache.streampipes.processors.filters.jvm.throughputmon.title=Throughput Monitor
+org.apache.streampipes.processors.filters.jvm.throughputmon.description=Monitors and forwards event throughput statistics
+
+batch-window-key.title=Batch Window Size
+batch-window-key.description=The number of events used for calculating the statistics
+
+startTime.title=Start Time
+startTime.description=
+
+endTime.title=End Time
+endTime.description=
+
+duration.title=Duration
+duration.description=
+
+eventCount.title=Event Count
+eventCount.description=
+
+throughput.title=Throughput
+throughput.description=
+