You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/06/25 15:29:12 UTC

[incubator-streampipes] branch edge-extensions updated: improved tests and logging

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 9127a91  improved tests and logging
     new f634610  Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions
9127a91 is described below

commit 9127a91b1ed6d551cd86fb80c5270488e3fbbdb1
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Jun 25 15:17:58 2021 +0200

    improved tests and logging
---
 .../logging/evaluation/EvaluationLogger.java       | 22 +++++++++++-----------
 .../EdgeExtensionsGenericPerformanceTest.java      | 10 +++-------
 .../performance/performancetest/GenericTest.java   | 18 +++++++++++++-----
 .../performance/performancetest/Test.java          |  4 +++-
 4 files changed, 30 insertions(+), 24 deletions(-)

diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 3317c7a..ddf62f6 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -21,14 +21,8 @@ import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.QoS;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 
 public class EvaluationLogger {
 
@@ -43,7 +37,10 @@ public class EvaluationLogger {
 
     private EvaluationLogger(){
         String logging_host = System.getenv("SP_LOGGING_MQTT_HOST");
-        int logging_port = Integer.parseInt(System.getenv("SP_LOGGING_MQTT_PORT"));
+        String port = System.getenv("SP_LOGGING_MQTT_PORT");
+        if (port == null)
+            port = "1883";
+        int logging_port = Integer.parseInt(port);
 
         mqtt = new MQTT();
         try {
@@ -63,10 +60,13 @@ public class EvaluationLogger {
         String message = "";
         for(Object element:elements)
             message += element + ",";
-        try {
-            connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
-        } catch (Exception e) {
-            e.printStackTrace();
+        if (message.length() > 0) {
+            message = message.substring(0, message.length()-1);
+            try {
+                connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
         }
     }
 
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
index 9c38672..d76d7b6 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
@@ -26,16 +26,12 @@ public class EdgeExtensionsGenericPerformanceTest {
     public static void main(String ... args){
 
         int numberOfRuns = Integer.parseInt(System.getenv("TEST_RUNS"));
-        String testType = System.getenv("TEST_TYPE");
-
-        EvaluationLogger logger = EvaluationLogger.getInstance();
-
         Test test = TestFactory.getFromEnv();
 
         for(int i = 1; i <= numberOfRuns; i++){
-            Object[] line = {System.currentTimeMillis(), String.format("Run %d started", i), i};
-            logger.logMQTT(testType, line);
-            test.execute();
+            if (i==numberOfRuns)
+                test.setStopPipeline(true);
+            test.execute(i);
         }
     }
 
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 5b11443..ee9c78c 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -30,7 +30,7 @@ import java.util.Optional;
 
 public class GenericTest implements Test{
 
-    private final boolean stopPipeline;
+    private boolean stopPipeline;
     private final boolean shouldBeMigrated;
     private final boolean shouldBeReconfigured;
     private final long timeToSleepBeforeManipulation;
@@ -38,7 +38,7 @@ public class GenericTest implements Test{
     private final StreamPipesClient client;
     private final Pipeline pipeline;
     private final EvaluationLogger evalLogger = EvaluationLogger.getInstance();
-    private float reconfigurableValue = 0;
+    private float reconfigurableValue = 1;
 
     public GenericTest(String pipelineName, boolean stopPipeline, boolean shouldBeMigrated, boolean shouldBeReconfigured,
                        long timeToSleepBeforeManipulation, long timeToSleepAfterManipulation){
@@ -60,7 +60,12 @@ public class GenericTest implements Test{
     }
 
     @Override
-    public void execute() {
+    public void setStopPipeline(boolean stopPipeline){
+        this.stopPipeline = stopPipeline;
+    }
+
+    @Override
+    public void execute(int nrRuns) {
 
         String testType = System.getenv("TEST_TYPE");
 
@@ -70,7 +75,7 @@ public class GenericTest implements Test{
             PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
             long deploymentDuration = System.nanoTime() - beforeStart;
             if(testType.equals("Deployment")){
-                Object[] line = {System.currentTimeMillis() ,"Deployment duration", deploymentDuration};
+                Object[] line = {System.currentTimeMillis() ,"Deployment duration", nrRuns, deploymentDuration, deploymentDuration/1000000000.0};
                 evalLogger.logMQTT(testType, line);
             }
             if (startMessage.isSuccess()) {
@@ -93,7 +98,7 @@ public class GenericTest implements Test{
             PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline);
             long migrationDuration = System.nanoTime() - beforeMigration;
             if(testType.equals("Migration")){
-                Object[] line = {System.currentTimeMillis(), "Migration duration", migrationDuration};
+                Object[] line = {System.currentTimeMillis(), "Migration duration", nrRuns, migrationDuration};
                 evalLogger.logMQTT(testType, line);
             }
             if (migrationMessage.isSuccess()) {
@@ -111,6 +116,9 @@ public class GenericTest implements Test{
                             sp.setValue(Float.toString(this.reconfigurableValue++));
                         }
                     }));
+            Object[] line = {System.currentTimeMillis(), "Reconfiguration triggered", nrRuns, (this.reconfigurableValue-1)};
+            evalLogger.logMQTT(testType, line);
+            System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
             PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
             if (message.isSuccess()) {
                 System.out.println(message.getTitle());
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/Test.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/Test.java
index a2c2309..0a5c4a4 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/Test.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/Test.java
@@ -19,6 +19,8 @@ package org.apache.streampipes.performance.performancetest;
 
 public interface Test {
 
-    void execute();
+    void execute(int nrRuns);
+
+    void setStopPipeline(boolean stopPipeline);
 
 }