You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/06 09:05:47 UTC
[incubator-streampipes] 01/03: generified evaluation logging
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 613242c95a0329c7cb3b9d5db54de14153e5b0cb
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 6 10:59:37 2021 +0200
generified evaluation logging
---
.../logging/evaluation/EvaluationLogger.java | 30 +++++++++++++++++-----
.../controller/api/InvocableEntityResource.java | 4 +--
.../offloading/OffloadingPolicyManager.java | 6 ++---
.../ThresholdViolationOffloadingPolicy.java | 5 ++++
.../performance/performancetest/GenericTest.java | 2 +-
.../pipeline/PipelineMigrationExecutor.java | 10 ++++----
6 files changed, 40 insertions(+), 17 deletions(-)
diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 9790b0e..19ccb4b 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -28,6 +28,7 @@ public class EvaluationLogger {
private static EvaluationLogger instance = null;
private final MQTT mqtt;
private final BlockingConnection connection;
+ private final String deviceId;
public static EvaluationLogger getInstance(){
if(instance==null) instance = new EvaluationLogger();
@@ -36,6 +37,12 @@ public class EvaluationLogger {
private EvaluationLogger(){
String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
+ String nodeId = System.getenv("SP_LOGGING_MQTT_URL");
+ if (nodeId != null){
+ this.deviceId = nodeId;
+ }else {
+ this.deviceId = "default";
+ }
mqtt = new MQTT();
try {
mqtt.setHost(loggingUrl);
@@ -51,16 +58,27 @@ public class EvaluationLogger {
}
public void logMQTT(String topic, Object[] elements){
+ String message = System.currentTimeMillis() + "," + this.deviceId + ",";
+ for(Object element:elements)
+ message += element + ",";
+ message = message.substring(0, message.length()-1);
+ publish(topic, message);
+ }
+
+ public void logHeader(String topic, Object[] elements){
String message = "";
for(Object element:elements)
message += element + ",";
- if (message.length() > 0) {
+ if (message.length() > 0)
message = message.substring(0, message.length()-1);
- try {
- connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ publish(topic, message);
+ }
+
+ private void publish(String topic, String message){
+ try {
+ connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index 324efb5..b005ab7 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -84,7 +84,7 @@ public class InvocableEntityResource extends AbstractResource {
//TODO: Remove Logger after debugging
InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
EvaluationLogger logger = EvaluationLogger.getInstance();
- Object[] line = {System.currentTimeMillis() ,"Element detached"};
+ Object[] line = {"Element detached"};
logger.logMQTT("Offloading", line);
Response resp = PipelineElementManager.getInstance().detach(graph, runningInstanceId);
RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
@@ -114,7 +114,7 @@ public class InvocableEntityResource extends AbstractResource {
.get(0))
.getValue();
}
- Object[] line = {System.currentTimeMillis() ,"reconfiguration request received", nrRuns++, value};
+ Object[] line = {"reconfiguration request received", nrRuns++, value};
logger.logMQTT("Reconfiguration", line);
InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
return ok(PipelineElementManager.getInstance().reconfigure(graph, reconfigurationEntity));
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index e705d3b..73ae24e 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -70,7 +70,7 @@ public class OffloadingPolicyManager {
//Currently uses the first violated policy. Could be extended to take the degree of policy violation into
// account
//TODO: Remove Logger after debugging
- Object[] line = {System.currentTimeMillis() ,"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
+ Object[] line = {"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
EvaluationLogger.getInstance().logMQTT("Offloading", line);
triggerOffloading(violatedPolicies.get(0));
}
@@ -80,7 +80,7 @@ public class OffloadingPolicyManager {
private void triggerOffloading(OffloadingStrategy strategy){
InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
- Object[] line = {System.currentTimeMillis() ,"entity to offload selected"};
+ Object[] line = {"entity to offload selected"};
EvaluationLogger.getInstance().logMQTT("Offloading", line);
if(offloadEntity != null){
Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
@@ -88,7 +88,7 @@ public class OffloadingPolicyManager {
String appId = offloadEntity.getAppId();
String pipelineName = offloadEntity.getCorrespondingPipeline();
- Object[] line_done = {System.currentTimeMillis() ,"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
+ Object[] line_done = {"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
EvaluationLogger.getInstance().logMQTT("Offloading", line_done);
if(resp.isSuccess()){
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index aa30b3e..39965d9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.node.controller.management.offloading.strategies.policies;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +65,8 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
for(T value : this.history){
if(value.compareTo(this.threshold) > 0){
numViolations++;
+ Object[] line = {"policy violation #" + numViolations};
+ EvaluationLogger.getInstance().logMQTT("Offloading", line);
}
}
break;
@@ -71,6 +74,8 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
for(T value : this.history){
if(value.compareTo(this.threshold) < 0){
numViolations++;
+ Object[] line = {"policy violation #" + numViolations};
+ EvaluationLogger.getInstance().logMQTT("Offloading", line);
}
}
break;
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 6d6ed70..55ce48b 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -117,7 +117,7 @@ public class GenericTest implements Test{
sp.setValue(Float.toString(this.reconfigurableValue++));
}
}));
- line = new Object[]{System.currentTimeMillis(), "Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
+ line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
System.out.println(message.getTitle());
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index 8b0631d..2de5bc3 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -103,7 +103,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
long start_target_duration = System.nanoTime() - before_start_target;
- Object[] line_start_target = {System.currentTimeMillis(), "start target element","",start_target_duration,start_target_duration/1000000000.0};
+ Object[] line_start_target = {"start target element","",start_target_duration,start_target_duration/1000000000.0};
logger.logMQTT("Migration", line_start_target);
// Stop relays from origin predecessor
@@ -114,7 +114,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
long stop_relays_origin_duration = System.nanoTime() - downtime_beginning;
- Object[] line_stop_relay = {System.currentTimeMillis(), "stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
+ Object[] line_stop_relay = {"stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
logger.logMQTT("Migration", line_stop_relay);
// Start relays to target after migration
@@ -125,11 +125,11 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
long start_relay_target_duration = System.nanoTime() - before_start_relay_target;
- Object[] line_start_relay = {System.currentTimeMillis(), "start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
+ Object[] line_start_relay = {"start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
logger.logMQTT("Migration", line_start_relay);
long downtime = System.nanoTime() - downtime_beginning;
- Object[] line_downtime = {System.currentTimeMillis() ,"downtime", "", downtime, downtime/1000000000.0};
+ Object[] line_downtime = {"downtime", "", downtime, downtime/1000000000.0};
logger.logMQTT("Migration", line_downtime);
//Stop origin and associated relay
@@ -140,7 +140,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
long stop_origin_duration = System.nanoTime() - before_stop_origin;
- Object[] line_stop_origin = {System.currentTimeMillis(), "stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0};
+ Object[] line_stop_origin = {"stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0};
logger.logMQTT("Migration", line_stop_origin);
List<InvocableStreamPipesEntity> graphs = new ArrayList<>();