You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/06/11 09:36:36 UTC

[incubator-streampipes-extensions] branch edge-extensions updated: Added CPU Burner Processor

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new c6af8c4  Added CPU Burner Processor
c6af8c4 is described below

commit c6af8c45482a8dda1518af2eec1e8159e1eba23c
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Jun 11 11:36:10 2021 +0200

    Added CPU Burner Processor
---
 .../extensions/all/jvm/AllExtensionsInit.java      |   2 +
 .../pe/jvm/AllPipelineElementsInit.java            |   2 +
 .../processors/filters/jvm/FiltersJvmInit.java     |   2 +
 .../processor/cpuburner/CPUBurnerController.java   |  71 +++++++++++++
 .../jvm/processor/cpuburner/util/CPUBurner.java    | 115 +++++++++++++++++++++
 .../documentation.md                               |  43 ++++++++
 .../icon.png                                       | Bin 0 -> 10290 bytes
 .../strings.en                                     |  11 ++
 8 files changed, 246 insertions(+)

diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index cd1bffd..6f6b3c2 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -51,6 +51,7 @@ import org.apache.streampipes.container.extensions.ExtensionsModelSubmitter;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.extensions.all.jvm.config.AllExtensionsConfig;
 import org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure.LatencyMeasureController;
+import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
 import org.apache.streampipes.processors.filters.jvm.processor.dummy.DummyController;
 import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalController;
 import org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureController;
@@ -140,6 +141,7 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
                 .add(new JSEvalController())
                 //TODO: Remove after testing
                 .add(new LatencyMeasureController())
+                .add(new CPUBurnerController())
                 // streampipes-processors-filters-jvm
                 .add(new NumericalFilterController())
                 .add(new ThresholdDetectionController())
diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index c17da90..b349f04 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -32,6 +32,7 @@ import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalCon
 import org.apache.streampipes.processors.enricher.jvm.processor.latencymeasure.LatencyMeasureController;
 import org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureController;
 import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeController;
+import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
 import org.apache.streampipes.processors.filters.jvm.processor.dummy.DummyController;
 import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichController;
 import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
@@ -124,6 +125,7 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
             .add(new JSEvalController())
             //TODO: Remove after testing
             .add(new LatencyMeasureController())
+            .add(new CPUBurnerController())
             // streampipes-processors-filters-jvm
             .add(new NumericalFilterController())
             .add(new ThresholdDetectionController())
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index a5810c1..00fd323 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.filters.jvm.config.FiltersJvmConfig;
 import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeController;
+import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.CPUBurnerController;
 import org.apache.streampipes.processors.filters.jvm.processor.dummy.DummyController;
 import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichController;
 import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
@@ -55,6 +56,7 @@ public class FiltersJvmInit extends StandaloneModelSubmitter {
             .add(new NumericalTextFilterController())
             .add(new RateLimitController())
             //TODO: delete after testing
+            .add(new CPUBurnerController())
             .add(new DummyController());
 
     DeclarersSingleton.getInstance().registerDataFormats(
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/CPUBurnerController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/CPUBurnerController.java
new file mode 100644
index 0000000..b929545
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/CPUBurnerController.java
@@ -0,0 +1,71 @@
+package org.apache.streampipes.processors.filters.jvm.processor.cpuburner;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.filters.jvm.processor.cpuburner.util.CPUBurner;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesReconfigurableProcessor;
+
+public class CPUBurnerController extends StreamPipesReconfigurableProcessor {
+
+    private static double load;
+    private CPUBurner cpuBurner;
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.cpuburner")
+                .category(DataProcessorType.FILTER)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .withLocales(Locales.EN)
+                .requiredStream(StreamRequirementsBuilder.any())
+                .requiredIntegerParameter(Labels.withId("ramp-up-duration"))
+                .requiredIntegerParameter(Labels.withId("ramp-up-delay"))
+                .requiredReconfigurableFloatParameter(Labels.withId("load"))
+                .outputStrategy(OutputStrategies.keep())
+                .build();
+    }
+
+    @Override
+    public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
+                             EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+        load = parameters.extractor().singleValueParameter("load", Double.class);
+        long rampUpDuration = parameters.extractor().singleValueParameter("ramp-up-duration", Integer.class);
+        long rampUpDelay = parameters.extractor().singleValueParameter("ramp-up-delay", Integer.class);
+        cpuBurner = new CPUBurner(Runtime.getRuntime().availableProcessors(), load, rampUpDuration, rampUpDelay);
+        if(load>0) cpuBurner.startBurner();
+    }
+
+    @Override
+    public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+        // do nothing with event
+        collector.collect(event);
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        cpuBurner.stopBurners();
+    }
+
+    @Override
+    public void onReconfigurationEvent(Event event) throws SpRuntimeException {
+        load = event.getFieldByRuntimeName("load").getAsPrimitive().getAsDouble();
+        if (cpuBurner.isRunning()){
+            if (load == 0)
+                cpuBurner.stopBurners();
+            else
+                cpuBurner.updateLoad(load);
+        }else {
+            cpuBurner.startBurner(load);
+        }
+    }
+}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/util/CPUBurner.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/util/CPUBurner.java
new file mode 100644
index 0000000..aff7909
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/cpuburner/util/CPUBurner.java
@@ -0,0 +1,115 @@
+package org.apache.streampipes.processors.filters.jvm.processor.cpuburner.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class CPUBurner {
+    private double load;
+    private boolean isRunning;
+    private final long rampUpDuration;
+    private final long rampUpDelay;
+    private final List<BusyThread> threads;
+
+    public CPUBurner(int numCores, double load, long rampUpDuration, long rampUpDelay) {
+        this.load = load;
+        this.rampUpDuration = rampUpDuration;
+        this.rampUpDelay = rampUpDelay;
+        this.isRunning = false;
+        threads = new ArrayList<>();
+        for (int i  = 1; i<=numCores; i++){
+            threads.add(new BusyThread("CPUBurner " + i, 0));
+        }
+    }
+
+    public boolean isRunning(){
+        return this.isRunning;
+    }
+
+    public void startBurner(double desiredLoad){
+        this.load = desiredLoad;
+        startBurner();
+    }
+
+    public void startBurner(){
+        CompletableFuture.runAsync(()->{
+            this.threads.forEach(Thread::start);
+            double defaultLoad = this.load;
+            this.load = 0;
+            this.isRunning = true;
+            updateLoad(defaultLoad);
+        });
+    }
+
+    public void updateLoad(double desiredLoad){
+        long startTime = System.currentTimeMillis();
+        long endTime = startTime + this.rampUpDuration;
+        while (System.currentTimeMillis() < endTime){
+            double interpolatedLoad = ((System.currentTimeMillis() - startTime)/(double)this.rampUpDuration)*(desiredLoad-this.load) + this.load;
+            this.threads.forEach(t -> t.setLoad(interpolatedLoad));
+            try {
+                Thread.sleep(this.rampUpDelay);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        this.threads.forEach(t -> t.setLoad(desiredLoad));
+        this.load = desiredLoad;
+    }
+
+    public void stopBurners(){
+        this.load = 0;
+        this.isRunning = false;
+        this.threads.forEach(BusyThread::stopThread);
+    }
+
+
+
+    private static class BusyThread extends Thread {
+        //Modified version of: https://gist.github.com/SriramKeerthi/0f1513a62b3b09fecaeb
+        private volatile double load;
+        private volatile boolean running;
+
+        /**
+         * Constructor which creates the thread
+         *
+         * @param name     Name of this thread
+         * @param load     Load % that this thread should generate
+         */
+        public BusyThread(String name, double load) {
+            super(name);
+            this.load = load;
+        }
+
+        public void stopThread(){
+            //System.out.println("Stopped thread " + this.getName());
+            this.running = false;
+        }
+
+        public void setLoad(double load){
+            //System.out.println("Updated thread " + this.getName() +" with load " + load);
+            this.load = load;
+        }
+
+        /**
+         * Generates the load when run
+         */
+        @Override
+        public void run() {
+            //System.out.println("Started thread " + this.getName());
+            this.running = true;
+            try {
+                // Loop until stopped
+                while (running) {
+                    // Every 100ms, sleep for the percentage of unladen time
+                    if (System.currentTimeMillis() % 100 == 0) {
+                        Thread.sleep((long) Math.floor((1 - load) * 100));
+                    }
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}
diff --git a/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/documentation.md b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/documentation.md
new file mode 100644
index 0000000..586823d
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/documentation.md
@@ -0,0 +1,43 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## CPU Burner
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Burns a specified amount of CPU.
+
+***
+
+## Required input
+The CPU Burner processor does not have any specific input requirements.
+
+***
+
+## Configuration
+
+(no further configuration required)
+
+## Output
+The compose processor has a configurable output that can be selected by the user at pipeline modeling time.
\ No newline at end of file
diff --git a/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/icon.png b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/icon.png
new file mode 100644
index 0000000..458f434
Binary files /dev/null and b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/icon.png differ
diff --git a/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/strings.en b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/strings.en
new file mode 100644
index 0000000..4a833c8
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.cpuburner/strings.en
@@ -0,0 +1,11 @@
+org.apache.streampipes.processors.filters.jvm.cpuburner.title=CPU Burner
+org.apache.streampipes.processors.filters.jvm.cpuburner.description=Simulates load on CPU
+
+load.title=load
+load.description=relative amount of CPU that should be burned
+
+ramp-up-duration.title=ramp up duration (ms)
+ramp-up-duration.description=Time over which CPU load should be ramped up
+
+ramp-up-delay.title=ramp up delay (ms)
+ramp-up-delay.description=time between ramp up steps
\ No newline at end of file