You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/22 11:30:41 UTC

[incubator-streampipes] 01/03: introduce new reconfiguration test

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit d70a0491fc653d1066014e0e79e4b2b2ff5403c9
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 20 18:40:50 2021 +0200

    introduce new reconfiguration test
---
 .../performance/performancetest/GenericTest.java   | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 892a5c6..c890557 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -70,6 +70,11 @@ public class GenericTest implements Test{
 
         String testType = System.getenv("TEST_TYPE");
         Object[] line = null;
+
+        if (testType.equals("Reconfiguration") && nrRuns == 0){
+            executeOffloading();
+            return;
+        }
         //Start Pipeline
         if (!pipeline.isRunning()) {
             long beforeStart = System.nanoTime();
@@ -169,6 +174,43 @@ public class GenericTest implements Test{
         }
     }
 
+    private void executeOffloading(){
+        if (!pipeline.isRunning()) {
+            PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
+            System.out.println(startMessage.getTitle());
+            if (startMessage.isSuccess()) {
+                pipeline.setRunning(true);
+            }
+        }
+
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        float[] reconfigurationValues = {0, 0, 0, 0, 0, 1, 2, 3, 4, 5};
+
+        for(int i = 0; i<200; i++){
+
+            float reconfigurationValue = reconfigurationValues[i%10];
+            pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+                    .filter(FreeTextStaticProperty.class::isInstance)
+                    .map(FreeTextStaticProperty.class::cast)
+                    .filter(FreeTextStaticProperty::isReconfigurable)
+                    .forEach(sp -> {
+                        if (sp.getInternalName().equals("i-am-reconfigurable")) {
+                            sp.setValue(Float.toString(reconfigurationValue));
+                        }
+                    }));
+
+            PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+            System.out.println(message.getTitle());
+        }
+
+
+    }
+
 
     private Tuple2<String, String> prepareMigration(){
         String nodeFrom = "";