You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/06/25 15:29:12 UTC
[incubator-streampipes] branch edge-extensions updated: improved
tests and logging
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 9127a91 improved tests and logging
new f634610 Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions
9127a91 is described below
commit 9127a91b1ed6d551cd86fb80c5270488e3fbbdb1
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Jun 25 15:17:58 2021 +0200
improved tests and logging
---
.../logging/evaluation/EvaluationLogger.java | 22 +++++++++++-----------
.../EdgeExtensionsGenericPerformanceTest.java | 10 +++-------
.../performance/performancetest/GenericTest.java | 18 +++++++++++++-----
.../performance/performancetest/Test.java | 4 +++-
4 files changed, 30 insertions(+), 24 deletions(-)
diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 3317c7a..ddf62f6 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -21,14 +21,8 @@ import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.Date;
public class EvaluationLogger {
@@ -43,7 +37,10 @@ public class EvaluationLogger {
private EvaluationLogger(){
String logging_host = System.getenv("SP_LOGGING_MQTT_HOST");
- int logging_port = Integer.parseInt(System.getenv("SP_LOGGING_MQTT_PORT"));
+ String port = System.getenv("SP_LOGGING_MQTT_PORT");
+ if (port == null)
+ port = "1883";
+ int logging_port = Integer.parseInt(port);
mqtt = new MQTT();
try {
@@ -63,10 +60,13 @@ public class EvaluationLogger {
String message = "";
for(Object element:elements)
message += element + ",";
- try {
- connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
- } catch (Exception e) {
- e.printStackTrace();
+ if (message.length() > 0) {
+ message = message.substring(0, message.length()-1);
+ try {
+ connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
index 9c38672..d76d7b6 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
@@ -26,16 +26,12 @@ public class EdgeExtensionsGenericPerformanceTest {
public static void main(String ... args){
int numberOfRuns = Integer.parseInt(System.getenv("TEST_RUNS"));
- String testType = System.getenv("TEST_TYPE");
-
- EvaluationLogger logger = EvaluationLogger.getInstance();
-
Test test = TestFactory.getFromEnv();
for(int i = 1; i <= numberOfRuns; i++){
- Object[] line = {System.currentTimeMillis(), String.format("Run %d started", i), i};
- logger.logMQTT(testType, line);
- test.execute();
+ if (i==numberOfRuns)
+ test.setStopPipeline(true);
+ test.execute(i);
}
}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 5b11443..ee9c78c 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -30,7 +30,7 @@ import java.util.Optional;
public class GenericTest implements Test{
- private final boolean stopPipeline;
+ private boolean stopPipeline;
private final boolean shouldBeMigrated;
private final boolean shouldBeReconfigured;
private final long timeToSleepBeforeManipulation;
@@ -38,7 +38,7 @@ public class GenericTest implements Test{
private final StreamPipesClient client;
private final Pipeline pipeline;
private final EvaluationLogger evalLogger = EvaluationLogger.getInstance();
- private float reconfigurableValue = 0;
+ private float reconfigurableValue = 1;
public GenericTest(String pipelineName, boolean stopPipeline, boolean shouldBeMigrated, boolean shouldBeReconfigured,
long timeToSleepBeforeManipulation, long timeToSleepAfterManipulation){
@@ -60,7 +60,12 @@ public class GenericTest implements Test{
}
@Override
- public void execute() {
+ public void setStopPipeline(boolean stopPipeline){
+ this.stopPipeline = stopPipeline;
+ }
+
+ @Override
+ public void execute(int nrRuns) {
String testType = System.getenv("TEST_TYPE");
@@ -70,7 +75,7 @@ public class GenericTest implements Test{
PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
long deploymentDuration = System.nanoTime() - beforeStart;
if(testType.equals("Deployment")){
- Object[] line = {System.currentTimeMillis() ,"Deployment duration", deploymentDuration};
+ Object[] line = {System.currentTimeMillis() ,"Deployment duration", nrRuns, deploymentDuration, deploymentDuration/1000000000.0};
evalLogger.logMQTT(testType, line);
}
if (startMessage.isSuccess()) {
@@ -93,7 +98,7 @@ public class GenericTest implements Test{
PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline);
long migrationDuration = System.nanoTime() - beforeMigration;
if(testType.equals("Migration")){
- Object[] line = {System.currentTimeMillis(), "Migration duration", migrationDuration};
+ Object[] line = {System.currentTimeMillis(), "Migration duration", nrRuns, migrationDuration};
evalLogger.logMQTT(testType, line);
}
if (migrationMessage.isSuccess()) {
@@ -111,6 +116,9 @@ public class GenericTest implements Test{
sp.setValue(Float.toString(this.reconfigurableValue++));
}
}));
+ Object[] line = {System.currentTimeMillis(), "Reconfiguration triggered", nrRuns, (this.reconfigurableValue-1)};
+ evalLogger.logMQTT(testType, line);
+ System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
if (message.isSuccess()) {
System.out.println(message.getTitle());
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/Test.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/Test.java
index a2c2309..0a5c4a4 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/Test.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/Test.java
@@ -19,6 +19,8 @@ package org.apache.streampipes.performance.performancetest;
public interface Test {
- void execute();
+ void execute(int nrRuns);
+
+ void setStopPipeline(boolean stopPipeline);
}