You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/22 11:30:41 UTC
[incubator-streampipes] 01/03: introduce new reconfiguration test
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit d70a0491fc653d1066014e0e79e4b2b2ff5403c9
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 20 18:40:50 2021 +0200
introduce new reconfiguration test
---
.../performance/performancetest/GenericTest.java | 42 ++++++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 892a5c6..c890557 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -70,6 +70,11 @@ public class GenericTest implements Test{
String testType = System.getenv("TEST_TYPE");
Object[] line = null;
+
+ if (testType.equals("Reconfiguration") && nrRuns == 0){
+ executeOffloading();
+ return;
+ }
//Start Pipeline
if (!pipeline.isRunning()) {
long beforeStart = System.nanoTime();
@@ -169,6 +174,43 @@ public class GenericTest implements Test{
}
}
+ private void executeOffloading(){
+ if (!pipeline.isRunning()) {
+ PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
+ System.out.println(startMessage.getTitle());
+ if (startMessage.isSuccess()) {
+ pipeline.setRunning(true);
+ }
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ float[] reconfigurationValues = {0, 0, 0, 0, 0, 1, 2, 3, 4, 5};
+
+ for(int i = 0; i<200; i++){
+
+ float reconfigurationValue = reconfigurationValues[i%10];
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(sp -> {
+ if (sp.getInternalName().equals("i-am-reconfigurable")) {
+ sp.setValue(Float.toString(reconfigurationValue));
+ }
+ }));
+
+ PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+ System.out.println(message.getTitle());
+ }
+
+
+ }
+
private Tuple2<String, String> prepareMigration(){
String nodeFrom = "";