You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/06/25 13:32:35 UTC
[incubator-streampipes] 02/02: add docker stats resource collector
for evaluation tests
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit c820ff3d91a94f496e1240cac561cac3c30c711b
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Jun 25 15:32:15 2021 +0200
add docker stats resource collector for evaluation tests
---
.../logging/evaluation/EvaluationLogger.java | 13 +--
.../node/controller/config/EnvConfigParam.java | 3 +-
.../node/controller/config/NodeConfiguration.java | 17 +++-
.../container/DockerExtensionsContainer.java | 5 +-
.../management/NodeControllerSubmitter.java | 5 ++
.../offloading/OffloadingPolicyManager.java | 8 +-
.../orchestrator/docker/utils/DockerUtils.java | 34 ++++++++
.../statscollector/DockerStatsCollector.java | 94 ++++++++++++++++++++++
.../statscollector/DockerStatsUtils.java | 67 +++++++++++++++
streampipes-performance-tests/pom.xml | 1 -
10 files changed, 225 insertions(+), 22 deletions(-)
diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 3317c7a..d3a214e 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -21,17 +21,10 @@ import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.Date;
public class EvaluationLogger {
-
private static EvaluationLogger instance = null;
private final MQTT mqtt;
private final BlockingConnection connection;
@@ -42,12 +35,10 @@ public class EvaluationLogger {
}
private EvaluationLogger(){
- String logging_host = System.getenv("SP_LOGGING_MQTT_HOST");
- int logging_port = Integer.parseInt(System.getenv("SP_LOGGING_MQTT_PORT"));
-
+ String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
mqtt = new MQTT();
try {
- mqtt.setHost(logging_host, logging_port);
+ mqtt.setHost(loggingUrl);
} catch (URISyntaxException e) {
e.printStackTrace();
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
index 2c709de..bae3ced 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
@@ -50,7 +50,8 @@ public enum EnvConfigParam {
CONSUL_LOCATION("CONSUL_LOCATION", "consul"),
SUPPORTED_PIPELINE_ELEMENTS("SP_SUPPORTED_PIPELINE_ELEMENTS", ""),
AUTO_OFFLOADING_STRATEGY("SP_AUTO_OFFLOADING_STRATEGY", "default"),
- NODE_STORAGE_PATH("SP_NODE_STORAGE_PATH", "/var/lib/streampipes");
+ NODE_STORAGE_PATH("SP_NODE_STORAGE_PATH", "/var/lib/streampipes"),
+ LOGGING_MQTT_URL("SP_LOGGING_MQTT_URL", "tcp://localhost:1883");
private final String environmentKey;
private final String defaultValue;
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
index 44b1291..364fd18 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
@@ -32,7 +32,7 @@ import java.util.stream.Collectors;
public final class NodeConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguration.class.getCanonicalName());
- private static final String[] VALID_URL_SCHEMES = new String[]{"http","https"};
+ private static final String[] VALID_URL_SCHEMES = new String[]{"http","https", "tcp"};
private static String nodeApiKey;
private static String nodeHost;
@@ -60,6 +60,7 @@ public final class NodeConfiguration {
private static String consulHost;
private static OffloadingStrategyType autoOffloadingStrategy;
private static String nodeStoragePath;
+ private static String loggingMqttUrl;
private static HashMap<String, String> configMap;
@@ -291,6 +292,14 @@ public final class NodeConfiguration {
NodeConfiguration.nodeStoragePath = nodeStoragePath;
}
+ public static String getLoggingMqttUrl() {
+ return loggingMqttUrl;
+ }
+
+ public static void setLoggingMqttUrl(String loggingMqttUrl) {
+ NodeConfiguration.loggingMqttUrl = loggingMqttUrl;
+ }
+
public static HashMap<String, String> getConfigMap() {
return configMap;
}
@@ -505,6 +514,12 @@ public final class NodeConfiguration {
configMap.put(envKey, value);
setNodeStoragePath(value);
break;
+ case LOGGING_MQTT_URL:
+ if (isValidUrl(value)) {
+ configMap.put(envKey, value);
+ setLoggingMqttUrl(value);
+ }
+ break;
default:
throw new IllegalArgumentException("Invalid environment config param: " + configParam);
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
index b0c6cbd..6a041ac 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
@@ -42,12 +42,9 @@ public class DockerExtensionsContainer extends AbstractStreamPipesDockerContaine
.addNodeEnvs(generateStreamPipesNodeEnvs())
.add("SP_HOST", NodeConfiguration.getNodeHost())
.add("SP_PORT", "8090")
- .add("SP_LOGGING_FILE_PATH", "/var/log/streampipes/eval")
+ .add("SP_LOGGING_MQTT_URL", NodeConfiguration.getLoggingMqttUrl())
.add("TZ", "Europe/Berlin")
.build())
- .withVolumes(ContainerVolumesBuilder.create()
- .add("streampipes-eval", "/var/log/streampipes/eval", false)
- .build())
.withLabels(ContainerLabels.with(SP_SVC_EXTENSIONS_ID, retrieveNodeType(), ContainerType.EXTENSIONS))
.build();
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
index 741f96c..26b19d3 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.node.controller.management.node.NodeManager;
import org.apache.streampipes.node.controller.management.orchestrator.DockerEngineManager;
import org.apache.streampipes.node.controller.management.orchestrator.docker.DockerContainerDeclarerSingleton;
import org.apache.streampipes.node.controller.management.resource.ResourceManager;
+import org.apache.streampipes.node.controller.management.statscollector.DockerStatsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
@@ -79,6 +80,10 @@ public abstract class NodeControllerSubmitter {
LOG.info("Start janitor manager");
JanitorManager.getInstance().run();
+
+ // TODO: remove after evaluation tests
+ LOG.info("Start docker stats collector");
+ DockerStatsCollector.getInstance().run();
}
} else throw new SpRuntimeException("Could not register node controller at StreamPipes node management");
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 197dfe2..e705d3b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -47,7 +47,7 @@ public class OffloadingPolicyManager {
private final List<InvocableStreamPipesEntity> unsuccessfullyTriedEntities = new ArrayList<>();
private static OffloadingPolicyManager instance;
private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
- private static EvaluationLogger logger = EvaluationLogger.getInstance();
+ //private static EvaluationLogger logger = EvaluationLogger.getInstance();
public static OffloadingPolicyManager getInstance(){
if(instance == null){
@@ -71,7 +71,7 @@ public class OffloadingPolicyManager {
// account
//TODO: Remove Logger after debugging
Object[] line = {System.currentTimeMillis() ,"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
- logger.logMQTT("Offloading", line);
+ EvaluationLogger.getInstance().logMQTT("Offloading", line);
triggerOffloading(violatedPolicies.get(0));
}
//Blacklist of entities is cleared when no policies were violated.
@@ -81,7 +81,7 @@ public class OffloadingPolicyManager {
private void triggerOffloading(OffloadingStrategy strategy){
InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
Object[] line = {System.currentTimeMillis() ,"entity to offload selected"};
- logger.logMQTT("Offloading", line);
+ EvaluationLogger.getInstance().logMQTT("Offloading", line);
if(offloadEntity != null){
Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
@@ -89,7 +89,7 @@ public class OffloadingPolicyManager {
String pipelineName = offloadEntity.getCorrespondingPipeline();
Object[] line_done = {System.currentTimeMillis() ,"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
- logger.logMQTT("Offloading", line_done);
+ EvaluationLogger.getInstance().logMQTT("Offloading", line_done);
if(resp.isSuccess()){
LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
index 387ad7c..abe22f7 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
@@ -17,6 +17,8 @@
*/
package org.apache.streampipes.node.controller.management.orchestrator.docker.utils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.spotify.docker.client.DefaultDockerClient;
@@ -29,7 +31,9 @@ import com.spotify.docker.client.shaded.com.google.common.collect.Lists;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.node.container.ContainerLabel;
import org.apache.streampipes.model.node.container.DockerContainer;
+import org.apache.streampipes.node.controller.management.orchestrator.DockerEngineManager;
import org.apache.streampipes.node.controller.management.orchestrator.docker.model.DockerInfo;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +43,10 @@ import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class DockerUtils {
private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class.getCanonicalName());
@@ -65,6 +72,33 @@ public class DockerUtils {
return instance;
}
+ public Map<String, ContainerStats> collectStats() {
+ List<Container> containers = getRunningStreamPipesContainer();
+ List<CompletableFuture<Map<String, ContainerStats>>> futures = new ArrayList<>();
+
+ for(Container container: containers) {
+ CompletableFuture<Map<String, ContainerStats>> statsFuture =
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ HashMap<String, ContainerStats> containerStats = new HashMap<>();
+ containerStats.put(container.names().get(0).replace("/",""), docker.stats(container.id()));
+ return containerStats;
+ } catch (DockerException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ futures.add(statsFuture);
+ }
+
+ return futures.stream()
+ .map(CompletableFuture::join)
+ .collect(Collectors.toList())
+ .stream()
+ .flatMap(map -> map.entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
private void init() {
LOG.info("Initialize Docker client");
try {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
new file mode 100644
index 0000000..ccad01c
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.management.statscollector;
+
+import com.spotify.docker.client.messages.ContainerStats;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.node.controller.management.orchestrator.docker.utils.DockerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class DockerStatsCollector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DockerStatsCollector.class.getCanonicalName());
+ private static final String LOGGING_TOPIC = "container/stats/" + NodeConfiguration.getNodeHost();
+
+ private static DockerStatsCollector instance = null;
+
+ private DockerStatsCollector() {}
+
+ public static DockerStatsCollector getInstance() {
+ if (instance == null) {
+ synchronized (DockerStatsCollector.class) {
+ if (instance == null)
+ instance = new DockerStatsCollector();
+ }
+ }
+ return instance;
+ }
+
+ public void run() {
+ LOG.debug("Create Docker stats scheduler");
+ ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService.scheduleAtFixedRate(collect, 0, 1, TimeUnit.SECONDS);
+
+ Object[] header = new Object[]{
+ "timestamp",
+ "containerName",
+ "cpuPercent",
+ "memPercent",
+ "memUsageInBytes",
+ "memUsageHumanReadable",
+ "memTotalInBytes",
+ "memTotalHumanReadable"};
+
+ EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, header);
+ }
+
+ private final Runnable collect = () -> {
+ LOG.debug("Collect Docker stats");
+ Map<String, ContainerStats> stats = DockerUtils.getInstance().collectStats();
+ long timestamp = System.currentTimeMillis();
+
+ stats.forEach((containerName, containerStats) -> {
+
+ float cpuPercent = DockerStatsUtils.getCpuPercent(containerStats);
+ double memUsageInBytes = DockerStatsUtils.getMemUsageInBytes(containerStats);
+ double memTotal = containerStats.memoryStats().limit();
+ double memPercent = (memUsageInBytes / memTotal) * 100.0;
+
+ Object[] collectedStats = new Object[]{
+ timestamp,
+ containerName,
+ cpuPercent,
+ memPercent,
+ memUsageInBytes,
+ DockerStatsUtils.humanReadableByteCountBin((long) memUsageInBytes),
+ memTotal,
+ DockerStatsUtils.humanReadableByteCountBin((long) memTotal)};
+
+ EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, collectedStats);
+ });
+ };
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java
new file mode 100644
index 0000000..b4c7be9
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.management.statscollector;
+
+import com.spotify.docker.client.messages.ContainerStats;
+
+import java.text.CharacterIterator;
+import java.text.StringCharacterIterator;
+
+public class DockerStatsUtils {
+
+ public static double getMemUsageInBytes(ContainerStats containerStats) {
+ return containerStats.memoryStats().usage().doubleValue();
+ }
+
+ public static float getCpuPercent(ContainerStats containerStats) {
+ return calculateCpuUsageInPercent(
+ containerStats.cpuStats().cpuUsage().totalUsage().floatValue(),
+ containerStats.precpuStats().cpuUsage().totalUsage().floatValue(),
+ containerStats.cpuStats().systemCpuUsage().floatValue(),
+ containerStats.precpuStats().systemCpuUsage().floatValue(),
+ containerStats.cpuStats().cpuUsage().percpuUsage().size());
+ }
+
+ public static float calculateCpuUsageInPercent(float cpuCurrentTotal, float cpuPreviousTotal,
+ float cpuSystemCurrentUsage,
+ float cpuSystemPreviousUsage, int numCores) {
+ float cpuPercent = 0.0f;
+ float cpuDelta = cpuCurrentTotal - cpuPreviousTotal;
+ float systemDelta = cpuSystemCurrentUsage - cpuSystemPreviousUsage;
+
+ if (cpuDelta > 0.0 && systemDelta > 0.0) {
+ cpuPercent = (cpuDelta / systemDelta) * (numCores * 100F);
+ }
+ return cpuPercent;
+ }
+
+ public static String humanReadableByteCountBin(long bytes) {
+ long absB = bytes == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(bytes);
+ if (absB < 1024) {
+ return bytes + " B";
+ }
+ long value = absB;
+ CharacterIterator ci = new StringCharacterIterator("KMGTPE");
+ for (int i = 40; i >= 0 && absB > 0xfffccccccccccccL >> i; i -= 10) {
+ value >>= 10;
+ ci.next();
+ }
+ value *= Long.signum(bytes);
+ return String.format("%.1f %ciB", value / 1024.0, ci.current());
+ }
+}
diff --git a/streampipes-performance-tests/pom.xml b/streampipes-performance-tests/pom.xml
index 2f5887f..e8c5acd 100644
--- a/streampipes-performance-tests/pom.xml
+++ b/streampipes-performance-tests/pom.xml
@@ -53,7 +53,6 @@
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-client</artifactId>
<version>0.68.0-SNAPSHOT</version>
- <scope>compile</scope>
</dependency>
<!-- External dependencies -->