You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/06/25 13:32:35 UTC

[incubator-streampipes] 02/02: add docker stats resource collector for evaluation tests

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c820ff3d91a94f496e1240cac561cac3c30c711b
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Jun 25 15:32:15 2021 +0200

    add docker stats resource collector for evaluation tests
---
 .../logging/evaluation/EvaluationLogger.java       | 13 +--
 .../node/controller/config/EnvConfigParam.java     |  3 +-
 .../node/controller/config/NodeConfiguration.java  | 17 +++-
 .../container/DockerExtensionsContainer.java       |  5 +-
 .../management/NodeControllerSubmitter.java        |  5 ++
 .../offloading/OffloadingPolicyManager.java        |  8 +-
 .../orchestrator/docker/utils/DockerUtils.java     | 34 ++++++++
 .../statscollector/DockerStatsCollector.java       | 94 ++++++++++++++++++++++
 .../statscollector/DockerStatsUtils.java           | 67 +++++++++++++++
 streampipes-performance-tests/pom.xml              |  1 -
 10 files changed, 225 insertions(+), 22 deletions(-)

diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 3317c7a..d3a214e 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -21,17 +21,10 @@ import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.QoS;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 
 public class EvaluationLogger {
-
     private static EvaluationLogger instance = null;
     private final MQTT mqtt;
     private final BlockingConnection connection;
@@ -42,12 +35,10 @@ public class EvaluationLogger {
     }
 
     private EvaluationLogger(){
-        String logging_host = System.getenv("SP_LOGGING_MQTT_HOST");
-        int logging_port = Integer.parseInt(System.getenv("SP_LOGGING_MQTT_PORT"));
-
+        String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
         mqtt = new MQTT();
         try {
-            mqtt.setHost(logging_host, logging_port);
+            mqtt.setHost(loggingUrl);
         } catch (URISyntaxException e) {
             e.printStackTrace();
         }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
index 2c709de..bae3ced 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
@@ -50,7 +50,8 @@ public enum EnvConfigParam {
     CONSUL_LOCATION("CONSUL_LOCATION", "consul"),
     SUPPORTED_PIPELINE_ELEMENTS("SP_SUPPORTED_PIPELINE_ELEMENTS", ""),
     AUTO_OFFLOADING_STRATEGY("SP_AUTO_OFFLOADING_STRATEGY", "default"),
-    NODE_STORAGE_PATH("SP_NODE_STORAGE_PATH", "/var/lib/streampipes");
+    NODE_STORAGE_PATH("SP_NODE_STORAGE_PATH", "/var/lib/streampipes"),
+    LOGGING_MQTT_URL("SP_LOGGING_MQTT_URL", "tcp://localhost:1883");
 
     private final String environmentKey;
     private final String defaultValue;
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
index 44b1291..364fd18 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
@@ -32,7 +32,7 @@ import java.util.stream.Collectors;
 public final class NodeConfiguration {
     private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguration.class.getCanonicalName());
 
-    private static final String[] VALID_URL_SCHEMES = new String[]{"http","https"};
+    private static final String[] VALID_URL_SCHEMES = new String[]{"http","https", "tcp"};
 
     private static String nodeApiKey;
     private static String nodeHost;
@@ -60,6 +60,7 @@ public final class NodeConfiguration {
     private static String consulHost;
     private static OffloadingStrategyType autoOffloadingStrategy;
     private static String nodeStoragePath;
+    private static String loggingMqttUrl;
 
     private static HashMap<String, String> configMap;
 
@@ -291,6 +292,14 @@ public final class NodeConfiguration {
         NodeConfiguration.nodeStoragePath = nodeStoragePath;
     }
 
+    public static String getLoggingMqttUrl() {
+        return loggingMqttUrl;
+    }
+
+    public static void setLoggingMqttUrl(String loggingMqttUrl) {
+        NodeConfiguration.loggingMqttUrl = loggingMqttUrl;
+    }
+
     public static HashMap<String, String> getConfigMap() {
         return configMap;
     }
@@ -505,6 +514,12 @@ public final class NodeConfiguration {
                     configMap.put(envKey, value);
                     setNodeStoragePath(value);
                     break;
+                case LOGGING_MQTT_URL:
+                    if (isValidUrl(value)) {
+                        configMap.put(envKey, value);
+                        setLoggingMqttUrl(value);
+                    }
+                    break;
                 default:
                     throw new IllegalArgumentException("Invalid environment config param: " + configParam);
             }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
index b0c6cbd..6a041ac 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
@@ -42,12 +42,9 @@ public class DockerExtensionsContainer extends AbstractStreamPipesDockerContaine
                         .addNodeEnvs(generateStreamPipesNodeEnvs())
                         .add("SP_HOST", NodeConfiguration.getNodeHost())
                         .add("SP_PORT", "8090")
-                        .add("SP_LOGGING_FILE_PATH", "/var/log/streampipes/eval")
+                        .add("SP_LOGGING_MQTT_URL", NodeConfiguration.getLoggingMqttUrl())
                         .add("TZ", "Europe/Berlin")
                         .build())
-                .withVolumes(ContainerVolumesBuilder.create()
-                        .add("streampipes-eval", "/var/log/streampipes/eval", false)
-                        .build())
                 .withLabels(ContainerLabels.with(SP_SVC_EXTENSIONS_ID, retrieveNodeType(), ContainerType.EXTENSIONS))
                 .build();
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
index 741f96c..26b19d3 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.node.controller.management.node.NodeManager;
 import org.apache.streampipes.node.controller.management.orchestrator.DockerEngineManager;
 import org.apache.streampipes.node.controller.management.orchestrator.docker.DockerContainerDeclarerSingleton;
 import org.apache.streampipes.node.controller.management.resource.ResourceManager;
+import org.apache.streampipes.node.controller.management.statscollector.DockerStatsCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
@@ -79,6 +80,10 @@ public abstract class NodeControllerSubmitter {
 
                 LOG.info("Start janitor manager");
                 JanitorManager.getInstance().run();
+
+                // TODO: remove after evaluation tests
+                LOG.info("Start docker stats collector");
+                DockerStatsCollector.getInstance().run();
             }
         } else throw new SpRuntimeException("Could not register node controller at StreamPipes node management");
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 197dfe2..e705d3b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -47,7 +47,7 @@ public class OffloadingPolicyManager {
     private final List<InvocableStreamPipesEntity> unsuccessfullyTriedEntities = new ArrayList<>();
     private static OffloadingPolicyManager instance;
     private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
-    private static EvaluationLogger logger = EvaluationLogger.getInstance();
+    //private static EvaluationLogger logger = EvaluationLogger.getInstance();
 
     public static OffloadingPolicyManager getInstance(){
         if(instance == null){
@@ -71,7 +71,7 @@ public class OffloadingPolicyManager {
             // account
             //TODO: Remove Logger after debugging
             Object[] line = {System.currentTimeMillis() ,"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
-            logger.logMQTT("Offloading", line);
+            EvaluationLogger.getInstance().logMQTT("Offloading", line);
             triggerOffloading(violatedPolicies.get(0));
         }
         //Blacklist of entities is cleared when no policies were violated.
@@ -81,7 +81,7 @@ public class OffloadingPolicyManager {
     private void triggerOffloading(OffloadingStrategy strategy){
         InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
         Object[] line = {System.currentTimeMillis() ,"entity to offload selected"};
-        logger.logMQTT("Offloading", line);
+        EvaluationLogger.getInstance().logMQTT("Offloading", line);
         if(offloadEntity != null){
             Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
 
@@ -89,7 +89,7 @@ public class OffloadingPolicyManager {
             String pipelineName = offloadEntity.getCorrespondingPipeline();
 
             Object[] line_done = {System.currentTimeMillis() ,"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
-            logger.logMQTT("Offloading", line_done);
+            EvaluationLogger.getInstance().logMQTT("Offloading", line_done);
 
             if(resp.isSuccess()){
                 LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
index 387ad7c..abe22f7 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.node.controller.management.orchestrator.docker.utils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.spotify.docker.client.DefaultDockerClient;
@@ -29,7 +31,9 @@ import com.spotify.docker.client.shaded.com.google.common.collect.Lists;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.node.container.ContainerLabel;
 import org.apache.streampipes.model.node.container.DockerContainer;
+import org.apache.streampipes.node.controller.management.orchestrator.DockerEngineManager;
 import org.apache.streampipes.node.controller.management.orchestrator.docker.model.DockerInfo;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +43,10 @@ import java.io.InputStreamReader;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DockerUtils {
     private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class.getCanonicalName());
@@ -65,6 +72,33 @@ public class DockerUtils {
         return instance;
     }
 
+    public Map<String, ContainerStats> collectStats() {
+        List<Container> containers = getRunningStreamPipesContainer();
+        List<CompletableFuture<Map<String, ContainerStats>>> futures = new ArrayList<>();
+
+        for(Container container: containers) {
+            CompletableFuture<Map<String, ContainerStats>> statsFuture =
+                    CompletableFuture.supplyAsync(() -> {
+                        try {
+                            HashMap<String, ContainerStats> containerStats = new HashMap<>();
+                            containerStats.put(container.names().get(0).replace("/",""), docker.stats(container.id()));
+                            return containerStats;
+                        } catch (DockerException | InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                        return null;
+                    });
+            futures.add(statsFuture);
+        }
+
+        return futures.stream()
+                .map(CompletableFuture::join)
+                .collect(Collectors.toList())
+                .stream()
+                .flatMap(map -> map.entrySet().stream())
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
     private void init() {
         LOG.info("Initialize Docker client");
         try {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
new file mode 100644
index 0000000..ccad01c
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.management.statscollector;
+
+import com.spotify.docker.client.messages.ContainerStats;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.node.controller.management.orchestrator.docker.utils.DockerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class DockerStatsCollector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DockerStatsCollector.class.getCanonicalName());
+    private static final String LOGGING_TOPIC = "container/stats/" + NodeConfiguration.getNodeHost();
+
+    private static DockerStatsCollector instance = null;
+
+    private DockerStatsCollector() {}
+
+    public static DockerStatsCollector getInstance() {
+        if (instance == null) {
+            synchronized (DockerStatsCollector.class) {
+                if (instance == null)
+                    instance = new DockerStatsCollector();
+            }
+        }
+        return instance;
+    }
+
+    public void run() {
+        LOG.debug("Create Docker stats scheduler");
+        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+        scheduledExecutorService.scheduleAtFixedRate(collect, 0, 1, TimeUnit.SECONDS);
+
+        Object[] header = new Object[]{
+                "timestamp",
+                "containerName",
+                "cpuPercent",
+                "memPercent",
+                "memUsageInBytes",
+                "memUsageHumanReadable",
+                "memTotalInBytes",
+                "memTotalHumanReadable"};
+
+        EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, header);
+    }
+
+    private final Runnable collect = () -> {
+        LOG.debug("Collect Docker stats");
+        Map<String, ContainerStats> stats = DockerUtils.getInstance().collectStats();
+        long timestamp = System.currentTimeMillis();
+
+        stats.forEach((containerName, containerStats) -> {
+
+            float cpuPercent = DockerStatsUtils.getCpuPercent(containerStats);
+            double memUsageInBytes = DockerStatsUtils.getMemUsageInBytes(containerStats);
+            double memTotal = containerStats.memoryStats().limit();
+            double memPercent = (memUsageInBytes / memTotal) * 100.0;
+
+            Object[] collectedStats = new Object[]{
+                    timestamp,
+                    containerName,
+                    cpuPercent,
+                    memPercent,
+                    memUsageInBytes,
+                    DockerStatsUtils.humanReadableByteCountBin((long) memUsageInBytes),
+                    memTotal,
+                    DockerStatsUtils.humanReadableByteCountBin((long) memTotal)};
+
+            EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, collectedStats);
+        });
+    };
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java
new file mode 100644
index 0000000..b4c7be9
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.management.statscollector;
+
+import com.spotify.docker.client.messages.ContainerStats;
+
+import java.text.CharacterIterator;
+import java.text.StringCharacterIterator;
+
+public class DockerStatsUtils {
+
+    public static double getMemUsageInBytes(ContainerStats containerStats) {
+        return containerStats.memoryStats().usage().doubleValue();
+    }
+
+    public static float getCpuPercent(ContainerStats containerStats) {
+        return calculateCpuUsageInPercent(
+                containerStats.cpuStats().cpuUsage().totalUsage().floatValue(),
+                containerStats.precpuStats().cpuUsage().totalUsage().floatValue(),
+                containerStats.cpuStats().systemCpuUsage().floatValue(),
+                containerStats.precpuStats().systemCpuUsage().floatValue(),
+                containerStats.cpuStats().cpuUsage().percpuUsage().size());
+    }
+
+    public static float calculateCpuUsageInPercent(float cpuCurrentTotal, float cpuPreviousTotal,
+                                              float cpuSystemCurrentUsage,
+                                             float cpuSystemPreviousUsage, int numCores) {
+        float cpuPercent = 0.0f;
+        float cpuDelta = cpuCurrentTotal - cpuPreviousTotal;
+        float systemDelta = cpuSystemCurrentUsage - cpuSystemPreviousUsage;
+
+        if (cpuDelta > 0.0 && systemDelta > 0.0) {
+            cpuPercent = (cpuDelta / systemDelta) * (numCores * 100F);
+        }
+        return cpuPercent;
+    }
+
+    public static String humanReadableByteCountBin(long bytes) {
+        long absB = bytes == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(bytes);
+        if (absB < 1024) {
+            return bytes + " B";
+        }
+        long value = absB;
+        CharacterIterator ci = new StringCharacterIterator("KMGTPE");
+        for (int i = 40; i >= 0 && absB > 0xfffccccccccccccL >> i; i -= 10) {
+            value >>= 10;
+            ci.next();
+        }
+        value *= Long.signum(bytes);
+        return String.format("%.1f %ciB", value / 1024.0, ci.current());
+    }
+}
diff --git a/streampipes-performance-tests/pom.xml b/streampipes-performance-tests/pom.xml
index 2f5887f..e8c5acd 100644
--- a/streampipes-performance-tests/pom.xml
+++ b/streampipes-performance-tests/pom.xml
@@ -53,7 +53,6 @@
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-client</artifactId>
             <version>0.68.0-SNAPSHOT</version>
-            <scope>compile</scope>
         </dependency>
 
         <!-- External dependencies -->