You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ma...@apache.org on 2023/07/13 16:10:08 UTC

[camel-karavan] 04/07: DataGrid in bashi works #817

This is an automated email from the ASF dual-hosted git repository.

marat pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-karavan.git

commit 7f195bd8875b46562b393053b5f8d591f8a60259
Author: Marat Gubaidullin <ma...@gmail.com>
AuthorDate: Wed Jul 12 14:58:51 2023 -0400

    DataGrid in bashi works #817
---
 karavan-cloud/karavan-bashi/.java-version          |  1 +
 .../camel/karavan/bashi/ConductorService.java      | 58 ++++++++++----
 .../apache/camel/karavan/bashi/KaravanBashi.java   |  2 +-
 .../karavan/bashi/docker/DockerEventListener.java  | 39 +++++----
 .../camel/karavan/bashi/docker/DockerService.java  | 92 ++++++++++++++++++----
 .../src/main/resources/application.properties      |  2 +-
 6 files changed, 146 insertions(+), 48 deletions(-)

diff --git a/karavan-cloud/karavan-bashi/.java-version b/karavan-cloud/karavan-bashi/.java-version
new file mode 100644
index 00000000..b4de3947
--- /dev/null
+++ b/karavan-cloud/karavan-bashi/.java-version
@@ -0,0 +1 @@
+11
diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java
index 71e2872b..7f4f0147 100644
--- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java
+++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java
@@ -1,13 +1,13 @@
 package org.apache.camel.karavan.bashi;
 
+import com.github.dockerjava.api.model.Container;
 import com.github.dockerjava.api.model.HealthCheck;
+import com.github.dockerjava.api.model.Statistics;
 import io.quarkus.vertx.ConsumeEvent;
 import io.vertx.core.json.JsonObject;
 import org.apache.camel.karavan.bashi.docker.DockerService;
 import org.apache.camel.karavan.datagrid.DatagridService;
-import org.apache.camel.karavan.datagrid.model.CommandName;
-import org.apache.camel.karavan.datagrid.model.DevModeCommand;
-import org.apache.camel.karavan.datagrid.model.Project;
+import org.apache.camel.karavan.datagrid.model.*;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.jboss.logging.Logger;
 
@@ -46,6 +46,9 @@ public class ConductorService {
     @ConfigProperty(name = "infinispan.password")
     String infinispanPassword;
 
+    @ConfigProperty(name = "karavan.environment")
+    String environment;
+
     @Inject
     DockerService dockerService;
 
@@ -56,6 +59,7 @@ public class ConductorService {
 
     public static final String ADDRESS_INFINISPAN_START = "ADDRESS_INFINISPAN_START";
     public static final String ADDRESS_INFINISPAN_HEALTH = "ADDRESS_DATAGRID_HEALTH";
+    public static final String ADDRESS_CONTAINER_STATS = "ADDRESS_CONTAINER_STATS";
 
     @ConsumeEvent(value = ADDRESS_INFINISPAN_START, blocking = true, ordered = true)
     void startInfinispan(String data) throws InterruptedException {
@@ -73,11 +77,13 @@ public class ConductorService {
     }
 
     @ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true)
-    void startDatagridService(String infinispanHealth){
-        datagridService.start();
+    void startServices(String infinispanHealth){
+        if (infinispanHealth.equals("healthy")) {
+            datagridService.start();
+        }
     }
 
-//    @ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true)
+    @ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true)
     void startKaravan(String infinispanHealth) throws InterruptedException {
         if (infinispanHealth.equals("healthy")) {
             LOGGER.info("Karavan is starting...");
@@ -98,20 +104,42 @@ public class ConductorService {
 
     @ConsumeEvent(value = DatagridService.ADDRESS_DEVMODE_COMMAND, blocking = true, ordered = true)
     void receiveCommand(JsonObject message) throws InterruptedException {
-        System.out.println("receiveCommand " + message);
+        LOGGER.info("DevMode Command: " + message);
         DevModeCommand command = message.mapTo(DevModeCommand.class);
-        String runnerName = command.getProjectId() + "-" + DEVMODE_SUFFIX;
+        String containerName = command.getProjectId() + "-" + DEVMODE_SUFFIX;
+        Project p = datagridService.getProject(command.getProjectId());
         if (Objects.equals(command.getCommandName(), CommandName.RUN)) {
-            Project p = datagridService.getProject(command.getProjectId());
-            LOGGER.infof("Runner starting for %s", p.getProjectId());
-            dockerService.createContainer(runnerName, runnerImage,
+            LOGGER.infof("DevMode starting for %s", p.getProjectId());
+            Container container = dockerService.createContainer(containerName, runnerImage,
                     List.of(), "", false, new HealthCheck(), Map.of("type", "runner")
             );
-            dockerService.startContainer(runnerName);
-            LOGGER.infof("Runner started for %s", p.getProjectId());
+            dockerService.startContainer(containerName);
+            LOGGER.infof("DevMode started for %s", p.getProjectId());
+
+            // update DevModeStatus
+            DevModeStatus dms = datagridService.getDevModeStatus(p.getProjectId());
+            dms.setContainerName(containerName);
+            dms.setContainerId(container.getId());
+            datagridService.saveDevModeStatus(dms);
         } else if (Objects.equals(command.getCommandName(), CommandName.DELETE)){
-            dockerService.stopContainer(runnerName);
-            dockerService.deleteContainer(runnerName);
+            dockerService.stopContainer(containerName);
+            dockerService.deleteContainer(containerName);
+            datagridService.deleteDevModeStatus(p.getName());
+        }
+    }
+
+    @ConsumeEvent(value = ADDRESS_CONTAINER_STATS, blocking = true, ordered = true)
+    public void saveStats(JsonObject data) {
+        String projectId = data.getString("projectId");
+        String memory = data.getString("memory");
+        String cpu = data.getString("cpu");
+        if (datagridService.isReady()) {
+            PodStatus podStatus = datagridService.getDevModePodStatuses(projectId, environment);
+            if (podStatus != null) {
+                podStatus.setCpuInfo(cpu);
+                podStatus.setMemoryInfo(memory);
+                datagridService.savePodStatus(podStatus);
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java
index 7b3ed5d7..f0a8a630 100644
--- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java
+++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java
@@ -23,8 +23,8 @@ public class KaravanBashi {
 
     void onStart(@Observes StartupEvent ev) throws InterruptedException {
         LOGGER.info("Karavan Bashi is starting...");
-        dockerService.checkContainersStatus();
         dockerService.createNetwork();
+        dockerService.checkDataGridHealth();
         dockerService.startListeners();
         eventBus.publish(ConductorService.ADDRESS_INFINISPAN_START, "");
     }
diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java
index 64acd143..be068895 100644
--- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java
+++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java
@@ -8,6 +8,7 @@ import io.vertx.core.eventbus.EventBus;
 import org.apache.camel.karavan.bashi.ConductorService;
 import org.apache.camel.karavan.bashi.Constants;
 import org.apache.camel.karavan.datagrid.DatagridService;
+import org.apache.camel.karavan.datagrid.model.PodStatus;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.jboss.logging.Logger;
 
@@ -15,6 +16,7 @@ import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Objects;
 
@@ -43,22 +45,31 @@ public class DockerEventListener implements ResultCallback<Event> {
     @Override
     public void onNext(Event event) {
 //        LOGGER.info(event.getType() + " : " + event.getStatus());
-        if (Objects.equals(event.getType(), EventType.CONTAINER)) {
-            Container container = dockerService.getContainer(event.getId());
-            String status = event.getStatus();
-            if (container.getNames()[0].equals("/infinispan") && status.startsWith("health_status:")) {
-                String health = status.replace("health_status: ", "");
-                LOGGER.infof("Container %s health status: %s", container.getNames()[0], health);
-                eventBus.publish(ConductorService.ADDRESS_INFINISPAN_HEALTH, health);
-            } else if (container.getNames()[0].endsWith(Constants.DEVMODE_SUFFIX)) {
-                if (Arrays.asList("stop", "die", "kill", "pause", "destroy").contains(event.getStatus())) {
-                    String name = container.getNames()[0].replace("/", "");
-                    String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, "");
-                    datagridService.deletePodStatus(projectId, environment, name);
-                } else if (Arrays.asList("start", "unpause").contains(event.getStatus())) {
-
+        try {
+            if (Objects.equals(event.getType(), EventType.CONTAINER)) {
+                Container container = dockerService.getContainer(event.getId());
+                String status = event.getStatus();
+                if (container.getNames()[0].equals("/infinispan") && status.startsWith("health_status:")) {
+                    String health = status.replace("health_status: ", "");
+                    LOGGER.infof("Container %s health status: %s", container.getNames()[0], health);
+                    eventBus.publish(ConductorService.ADDRESS_INFINISPAN_HEALTH, health);
+                } else if (container.getNames()[0].endsWith(Constants.DEVMODE_SUFFIX)) {
+                    if (Arrays.asList("stop", "die", "kill", "pause", "destroy").contains(event.getStatus())) {
+                        String name = container.getNames()[0].replace("/", "");
+                        String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, "");
+                        LOGGER.info("Deleted PodStatus for " + projectId);
+                        datagridService.deletePodStatus(projectId, environment, name);
+                    } else if (Arrays.asList("start", "unpause").contains(event.getStatus())) {
+                        String name = container.getNames()[0].replace("/", "");
+                        String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, "");
+                        PodStatus ps = new PodStatus(name, true, null, projectId, environment, true, Instant.ofEpochSecond(container.getCreated()).toString());
+                        LOGGER.info("Saved PodStatus for " + projectId);
+                        datagridService.savePodStatus(ps);
+                    }
                 }
             }
+        } catch (Exception exception) {
+            LOGGER.error(exception.getMessage());
         }
     }
 
diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java
index 44fa6824..80558b5f 100644
--- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java
+++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java
@@ -12,25 +12,34 @@ import com.github.dockerjava.core.DockerClientImpl;
 import com.github.dockerjava.core.InvocationBuilder;
 import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
 import com.github.dockerjava.transport.DockerHttpClient;
+import io.quarkus.scheduler.Scheduled;
+import io.smallrye.mutiny.tuples.Tuple2;
 import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.json.JsonObject;
+import org.apache.camel.karavan.bashi.Constants;
 import org.jboss.logging.Logger;
 
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.text.DecimalFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static org.apache.camel.karavan.bashi.ConductorService.ADDRESS_CONTAINER_STATS;
 import static org.apache.camel.karavan.bashi.ConductorService.ADDRESS_INFINISPAN_HEALTH;
+import static org.apache.camel.karavan.bashi.Constants.DATAGRID_CONTAINER_NAME;
 import static org.apache.camel.karavan.bashi.Constants.NETWORK_NAME;
 
 @ApplicationScoped
 public class DockerService {
 
     private static final Logger LOGGER = Logger.getLogger(DockerService.class.getName());
+    private static final DecimalFormat formatCpu = new DecimalFormat("0.00");
+    private static final DecimalFormat formatMiB = new DecimalFormat("0.0");
+    private static final DecimalFormat formatGiB = new DecimalFormat("0.00");
+    private static final Map<String, Tuple2<Long, Long>> previousStats = new ConcurrentHashMap<>();
 
     @Inject
     DockerEventListener dockerEventListener;
@@ -38,6 +47,57 @@ public class DockerService {
     @Inject
     EventBus eventBus;
 
+    @Scheduled(every = "{karavan.container-stats-interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
+    void collectContainersStats() {
+        System.out.println("collectContainersStats ");
+        getDockerClient().listContainersCmd().exec().forEach(container -> {
+            Statistics stats = getContainerStats(container.getId());
+
+            String name = container.getNames()[0].replace("/", "");
+            String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, "");
+            String memoryUsage = formatMemory(stats.getMemoryStats().getUsage());
+            String memoryLimit = formatMemory(stats.getMemoryStats().getLimit());
+            JsonObject data = JsonObject.of(
+                    "projectId", projectId,
+                    "memory", memoryUsage + " / " + memoryLimit,
+                    "cpu", formatCpu(name, stats)
+            );
+            eventBus.publish(ADDRESS_CONTAINER_STATS, data);
+        });
+    }
+
+    private String formatMemory(Long memory) {
+        if (memory < (1073741824)) {
+            return formatMiB.format(memory.doubleValue() / 1048576) + "MiB";
+        } else {
+            return formatGiB.format(memory.doubleValue() / 1073741824) + "GiB";
+        }
+    }
+
+    private String formatCpu(String containerName, Statistics stats) {
+        double cpuUsage = 0;
+        long previousCpu = previousStats.containsKey(containerName) ? previousStats.get(containerName).getItem1() : -1;
+        long previousSystem = previousStats.containsKey(containerName) ? previousStats.get(containerName).getItem2() : -1;
+
+        CpuStatsConfig cpuStats = stats.getCpuStats();
+        if (cpuStats != null) {
+            CpuUsageConfig cpuUsageConfig = cpuStats.getCpuUsage();
+            long systemUsage = cpuStats.getSystemCpuUsage();
+            long totalUsage = cpuUsageConfig.getTotalUsage();
+
+            if (previousCpu != -1 && previousSystem != -1) {
+                float cpuDelta = totalUsage - previousCpu;
+                float systemDelta = systemUsage - previousSystem;
+
+                if (cpuDelta > 0 && systemDelta > 0) {
+                    cpuUsage = cpuDelta / systemDelta * cpuStats.getOnlineCpus() * 100;
+                }
+            }
+            previousStats.put(containerName, Tuple2.of(totalUsage, systemUsage));
+        }
+        return formatCpu.format(cpuUsage) + "%";
+    }
+
     public void startListeners() {
         getDockerClient().eventsCmd().exec(dockerEventListener);
     }
@@ -53,12 +113,12 @@ public class DockerService {
         }
     }
 
-    public void checkContainersStatus() {
-        getDockerClient().listContainersCmd().withShowAll(true).exec().stream()
+    public void checkDataGridHealth() {
+        getDockerClient().listContainersCmd().exec().stream()
                 .filter(c -> c.getState().equals("running"))
                 .forEach(c -> {
                     HealthState hs = getDockerClient().inspectContainerCmd(c.getId()).exec().getState().getHealth();
-                    if (c.getNames()[0].equals("/infinispan")) {
+                    if (c.getNames()[0].equals("/" + DATAGRID_CONTAINER_NAME)) {
                         eventBus.publish(ADDRESS_INFINISPAN_HEALTH, hs.getStatus());
                     }
                 });
@@ -69,14 +129,9 @@ public class DockerService {
         return containers.get(0);
     }
 
-    public List<Container> getRunnerContainer() {
-        return getDockerClient().listContainersCmd()
-                .withShowAll(true).withLabelFilter(Map.of("type", "runner")).exec();
-    }
-
-    public Statistics getContainerStats(String id) {
+    public Statistics getContainerStats(String containerId) {
         InvocationBuilder.AsyncResultCallback<Statistics> callback = new InvocationBuilder.AsyncResultCallback<>();
-        getDockerClient().statsCmd(id).withContainerId(id).exec(callback);
+        getDockerClient().statsCmd(containerId).withContainerId(containerId).withNoStream(true).exec(callback);
         Statistics stats = null;
         try {
             stats = callback.awaitResult();
@@ -87,7 +142,7 @@ public class DockerService {
         return stats;
     }
 
-    public void createContainer(String name, String image, List<String> env, String ports,
+    public Container createContainer(String name, String image, List<String> env, String ports,
                                 boolean exposedPort, HealthCheck healthCheck, Map<String, String> labels) throws InterruptedException {
         List<Container> containers = getDockerClient().listContainersCmd().withShowAll(true).withNameFilter(List.of(name)).exec();
         if (containers.size() == 0) {
@@ -95,7 +150,7 @@ public class DockerService {
 
             List<ExposedPort> exposedPorts = getPortsFromString(ports).values().stream().map(i -> ExposedPort.tcp(i)).collect(Collectors.toList());
 
-            CreateContainerResponse container = getDockerClient().createContainerCmd(image)
+            CreateContainerResponse response = getDockerClient().createContainerCmd(image)
                     .withName(name)
                     .withLabels(labels)
                     .withEnv(env)
@@ -104,9 +159,12 @@ public class DockerService {
                     .withHostConfig(getHostConfig(ports, exposedPort))
                     .withHealthcheck(healthCheck)
                     .exec();
-            LOGGER.info("Container created: " + container.getId());
+            LOGGER.info("Container created: " + response.getId());
+            return getDockerClient().listContainersCmd().withShowAll(true)
+                    .withIdFilter(Collections.singleton(response.getId())).exec().get(0);
         } else {
             LOGGER.info("Container already exists: " + containers.get(0).getId());
+            return containers.get(0);
         }
     }
 
diff --git a/karavan-cloud/karavan-bashi/src/main/resources/application.properties b/karavan-cloud/karavan-bashi/src/main/resources/application.properties
index 1ed3add7..534ad21d 100644
--- a/karavan-cloud/karavan-bashi/src/main/resources/application.properties
+++ b/karavan-cloud/karavan-bashi/src/main/resources/application.properties
@@ -11,7 +11,7 @@ karavan.port=8080:8080
 karavan.environment=dev
 karavan.default-runtime=quarkus
 karavan.runtimes=quarkus,spring-boot
-karavan.devmode-status-interval=2s
+karavan.container-stats-interval=5s
 
 # Git repository Configuration
 karavan.git-repository=${GIT_REPOSITORY}