You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/06/11 09:36:36 UTC
[incubator-streampipes-extensions] branch edge-extensions updated:
Added CPU Burner Processor
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new c6af8c4 Added CPU Burner Processor
c6af8c4 is described below
commit c6af8c45482a8dda1518af2eec1e8159e1eba23c
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Jun 11 11:36:10 2021 +0200
Added CPU Burner Processor
---
.../extensions/all/jvm/AllExtensionsInit.java | 2 +
.../pe/jvm/AllPipelineElementsInit.java | 2 +
.../processors/filters/jvm/FiltersJvmInit.java | 2 +
.../processor/cpuburner/CPUBurnerController.java | 71 +++++++++++++
.../jvm/processor/cpuburner/util/CPUBurner.java | 115 +++++++++++++++++++++
.../documentation.md | 43 ++++++++
.../icon.png | Bin 0 -> 10290 bytes
.../strings.en | 11 ++
8 files changed, 246 insertions(+)
diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index cd1bffd..6f6b3c2 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -51,6 +51,7 @@ import org.apache.streampipes.container.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.extensions.all.jvm.config.AllExtensionsConfig;
import org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure.LatencyMeasureController;
+import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
import org.apache.streampipes.processors.filters.jvm.processor.dummy.DummyController;
import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalController;
import org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureController;
@@ -140,6 +141,7 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
.add(new JSEvalController())
//TODO: Remove after testing
.add(new LatencyMeasureController())
+ .add(new CPUBurnerController())
// streampipes-processors-filters-jvm
.add(new NumericalFilterController())
.add(new ThresholdDetectionController())
diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index c17da90..b349f04 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -32,6 +32,7 @@ import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalCon
import org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure.LatencyMeasureController;
import org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureController;
import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeController;
+import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
import org.apache.streampipes.processors.filters.jvm.processor.dummy.DummyController;
import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichController;
import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
@@ -124,6 +125,7 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
.add(new JSEvalController())
//TODO: Remove after testing
.add(new LatencyMeasureController())
+ .add(new CPUBurnerController())
// streampipes-processors-filters-jvm
.add(new NumericalFilterController())
.add(new ThresholdDetectionController())
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index a5810c1..00fd323 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.processors.filters.jvm.config.FiltersJvmConfig;
import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeController;
+import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
import org.apache.streampipes.processors.filters.jvm.processor.dummy.DummyController;
import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichController;
import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
@@ -55,6 +56,7 @@ public class FiltersJvmInit extends StandaloneModelSubmitter {
.add(new NumericalTextFilterController())
.add(new RateLimitController())
//TODO: delete after testing
+ .add(new CPUBurnerController())
.add(new DummyController());
DeclarersSingleton.getInstance().registerDataFormats(
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/CPUBurnerController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/CPUBurnerController.java
new file mode 100644
index 0000000..b929545
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/CPUBurnerController.java
@@ -0,0 +1,71 @@
+package org.apache.streampipes.processors.filters.jvm.processor.cpuburner;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.util.CPUBurner;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesReconfigurableProcessor;
+
+public class CPUBurnerController extends StreamPipesReconfigurableProcessor {
+
+ private static double load;
+ private CPUBurner cpuBurner;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.cpuburner")
+ .category(DataProcessorType.FILTER)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder.any())
+ .requiredIntegerParameter(Labels.withId("ramp-up-duration"))
+ .requiredIntegerParameter(Labels.withId("ramp-up-delay"))
+ .requiredReconfigurableFloatParameter(Labels.withId("load"))
+ .outputStrategy(OutputStrategies.keep())
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+ load = parameters.extractor().singleValueParameter("load", Double.class);
+ long rampUpDuration = parameters.extractor().singleValueParameter("ramp-up-duration", Integer.class);
+ long rampUpDelay = parameters.extractor().singleValueParameter("ramp-up-delay", Integer.class);
+ cpuBurner = new CPUBurner(Runtime.getRuntime().availableProcessors(), load, rampUpDuration, rampUpDelay);
+ if(load>0) cpuBurner.startBurner();
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+ // do nothing with event
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ cpuBurner.stopBurners();
+ }
+
+ @Override
+ public void onReconfigurationEvent(Event event) throws SpRuntimeException {
+ load = event.getFieldByRuntimeName("load").getAsPrimitive().getAsDouble();
+ if (cpuBurner.isRunning()){
+ if (load == 0)
+ cpuBurner.stopBurners();
+ else
+ cpuBurner.updateLoad(load);
+ }else {
+ cpuBurner.startBurner(load);
+ }
+ }
+}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/util/CPUBurner.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/util/CPUBurner.java
new file mode 100644
index 0000000..aff7909
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/util/CPUBurner.java
@@ -0,0 +1,115 @@
+package org.apache.streampipes.processors.filters.jvm.processor.cpuburner.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class CPUBurner {
+ private double load;
+ private boolean isRunning;
+ private final long rampUpDuration;
+ private final long rampUpDelay;
+ private final List<BusyThread> threads;
+
+ public CPUBurner(int numCores, double load, long rampUpDuration, long rampUpDelay) {
+ this.load = load;
+ this.rampUpDuration = rampUpDuration;
+ this.rampUpDelay = rampUpDelay;
+ this.isRunning = false;
+ threads = new ArrayList<>();
+ for (int i = 1; i<=numCores; i++){
+ threads.add(new BusyThread("CPUBurner " + i, 0));
+ }
+ }
+
+ public boolean isRunning(){
+ return this.isRunning;
+ }
+
+ public void startBurner(double desiredLoad){
+ this.load = desiredLoad;
+ startBurner();
+ }
+
+ public void startBurner(){
+ CompletableFuture.runAsync(()->{
+ this.threads.forEach(Thread::start);
+ double defaultLoad = this.load;
+ this.load = 0;
+ this.isRunning = true;
+ updateLoad(defaultLoad);
+ });
+ }
+
+ public void updateLoad(double desiredLoad){
+ long startTime = System.currentTimeMillis();
+ long endTime = startTime + this.rampUpDuration;
+ while (System.currentTimeMillis() < endTime){
+ double interpolatedLoad = ((System.currentTimeMillis() - startTime)/(double)this.rampUpDuration)*(desiredLoad-this.load) + this.load;
+ this.threads.forEach(t -> t.setLoad(interpolatedLoad));
+ try {
+ Thread.sleep(this.rampUpDelay);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ this.threads.forEach(t -> t.setLoad(desiredLoad));
+ this.load = desiredLoad;
+ }
+
+ public void stopBurners(){
+ this.load = 0;
+ this.isRunning = false;
+ this.threads.forEach(BusyThread::stopThread);
+ }
+
+
+
+ private static class BusyThread extends Thread {
+ //Modified version of: https://gist.github.com/SriramKeerthi/0f1513a62b3b09fecaeb
+ private volatile double load;
+ private volatile boolean running;
+
+ /**
+ * Constructor which creates the thread
+ *
+ * @param name Name of this thread
+ * @param load Load % that this thread should generate
+ */
+ public BusyThread(String name, double load) {
+ super(name);
+ this.load = load;
+ }
+
+ public void stopThread(){
+ //System.out.println("Stopped thread " + this.getName());
+ this.running = false;
+ }
+
+ public void setLoad(double load){
+ //System.out.println("Updated thread " + this.getName() +" with load " + load);
+ this.load = load;
+ }
+
+ /**
+ * Generates the load when run
+ */
+ @Override
+ public void run() {
+ //System.out.println("Started thread " + this.getName());
+ this.running = true;
+ try {
+ // Loop until stopped
+ while (running) {
+ // Every 100ms, sleep for the percentage of unladen time
+ if (System.currentTimeMillis() % 100 == 0) {
+ Thread.sleep((long) Math.floor((1 - load) * 100));
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/documentation.md b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/documentation.md
new file mode 100644
index 0000000..586823d
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/documentation.md
@@ -0,0 +1,43 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## CPU Burner
+
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Burns a specified amount of CPU.
+
+***
+
+## Required input
+The CPU Burner processor does not have any specific input requirements.
+
+***
+
+## Configuration
+
+(no further configuration required)
+
+## Output
+The compose processor has a configurable output that can be selected by the user at pipeline modeling time.
\ No newline at end of file
diff --git a/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/icon.png b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/icon.png
new file mode 100644
index 0000000..458f434
Binary files /dev/null and b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/icon.png differ
diff --git a/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/strings.en b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/strings.en
new file mode 100644
index 0000000..4a833c8
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/strings.en
@@ -0,0 +1,11 @@
+org.apache.streampipes.processors.filters.jvm.cpuburner.title=CPU Burner
+org.apache.streampipes.processors.filters.jvm.cpuburner.description=Simulates load on CPU
+
+load.title=load
+load.description=relative amount of CPU that should be burned
+
+ramp-up-duration.title=ramp up duration (ms)
+ramp-up-duration.description=Time over which CPU load should be ramped up
+
+ramp-up-delay.title=ramp up delay (ms)
+ramp-up-delay.description=time between ramp up steps
\ No newline at end of file