You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ma...@apache.org on 2023/07/13 16:10:08 UTC
[camel-karavan] 04/07: DataGrid in bashi works #817
This is an automated email from the ASF dual-hosted git repository.
marat pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-karavan.git
commit 7f195bd8875b46562b393053b5f8d591f8a60259
Author: Marat Gubaidullin <ma...@gmail.com>
AuthorDate: Wed Jul 12 14:58:51 2023 -0400
DataGrid in bashi works #817
---
karavan-cloud/karavan-bashi/.java-version | 1 +
.../camel/karavan/bashi/ConductorService.java | 58 ++++++++++----
.../apache/camel/karavan/bashi/KaravanBashi.java | 2 +-
.../karavan/bashi/docker/DockerEventListener.java | 39 +++++----
.../camel/karavan/bashi/docker/DockerService.java | 92 ++++++++++++++++++----
.../src/main/resources/application.properties | 2 +-
6 files changed, 146 insertions(+), 48 deletions(-)
diff --git a/karavan-cloud/karavan-bashi/.java-version b/karavan-cloud/karavan-bashi/.java-version
new file mode 100644
index 00000000..b4de3947
--- /dev/null
+++ b/karavan-cloud/karavan-bashi/.java-version
@@ -0,0 +1 @@
+11
diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java
index 71e2872b..7f4f0147 100644
--- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java
+++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/ConductorService.java
@@ -1,13 +1,13 @@
package org.apache.camel.karavan.bashi;
+import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.api.model.HealthCheck;
+import com.github.dockerjava.api.model.Statistics;
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.core.json.JsonObject;
import org.apache.camel.karavan.bashi.docker.DockerService;
import org.apache.camel.karavan.datagrid.DatagridService;
-import org.apache.camel.karavan.datagrid.model.CommandName;
-import org.apache.camel.karavan.datagrid.model.DevModeCommand;
-import org.apache.camel.karavan.datagrid.model.Project;
+import org.apache.camel.karavan.datagrid.model.*;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
@@ -46,6 +46,9 @@ public class ConductorService {
@ConfigProperty(name = "infinispan.password")
String infinispanPassword;
+ @ConfigProperty(name = "karavan.environment")
+ String environment;
+
@Inject
DockerService dockerService;
@@ -56,6 +59,7 @@ public class ConductorService {
public static final String ADDRESS_INFINISPAN_START = "ADDRESS_INFINISPAN_START";
public static final String ADDRESS_INFINISPAN_HEALTH = "ADDRESS_DATAGRID_HEALTH";
+ public static final String ADDRESS_CONTAINER_STATS = "ADDRESS_CONTAINER_STATS";
@ConsumeEvent(value = ADDRESS_INFINISPAN_START, blocking = true, ordered = true)
void startInfinispan(String data) throws InterruptedException {
@@ -73,11 +77,13 @@ public class ConductorService {
}
@ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true)
- void startDatagridService(String infinispanHealth){
- datagridService.start();
+ void startServices(String infinispanHealth){
+ if (infinispanHealth.equals("healthy")) {
+ datagridService.start();
+ }
}
-// @ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true)
+ @ConsumeEvent(value = ADDRESS_INFINISPAN_HEALTH, blocking = true, ordered = true)
void startKaravan(String infinispanHealth) throws InterruptedException {
if (infinispanHealth.equals("healthy")) {
LOGGER.info("Karavan is starting...");
@@ -98,20 +104,42 @@ public class ConductorService {
@ConsumeEvent(value = DatagridService.ADDRESS_DEVMODE_COMMAND, blocking = true, ordered = true)
void receiveCommand(JsonObject message) throws InterruptedException {
- System.out.println("receiveCommand " + message);
+ LOGGER.info("DevMode Command: " + message);
DevModeCommand command = message.mapTo(DevModeCommand.class);
- String runnerName = command.getProjectId() + "-" + DEVMODE_SUFFIX;
+ String containerName = command.getProjectId() + "-" + DEVMODE_SUFFIX;
+ Project p = datagridService.getProject(command.getProjectId());
if (Objects.equals(command.getCommandName(), CommandName.RUN)) {
- Project p = datagridService.getProject(command.getProjectId());
- LOGGER.infof("Runner starting for %s", p.getProjectId());
- dockerService.createContainer(runnerName, runnerImage,
+ LOGGER.infof("DevMode starting for %s", p.getProjectId());
+ Container container = dockerService.createContainer(containerName, runnerImage,
List.of(), "", false, new HealthCheck(), Map.of("type", "runner")
);
- dockerService.startContainer(runnerName);
- LOGGER.infof("Runner started for %s", p.getProjectId());
+ dockerService.startContainer(containerName);
+ LOGGER.infof("DevMode started for %s", p.getProjectId());
+
+ // update DevModeStatus
+ DevModeStatus dms = datagridService.getDevModeStatus(p.getProjectId());
+ dms.setContainerName(containerName);
+ dms.setContainerId(container.getId());
+ datagridService.saveDevModeStatus(dms);
} else if (Objects.equals(command.getCommandName(), CommandName.DELETE)){
- dockerService.stopContainer(runnerName);
- dockerService.deleteContainer(runnerName);
+ dockerService.stopContainer(containerName);
+ dockerService.deleteContainer(containerName);
+ datagridService.deleteDevModeStatus(p.getName());
+ }
+ }
+
+ @ConsumeEvent(value = ADDRESS_CONTAINER_STATS, blocking = true, ordered = true)
+ public void saveStats(JsonObject data) {
+ String projectId = data.getString("projectId");
+ String memory = data.getString("memory");
+ String cpu = data.getString("cpu");
+ if (datagridService.isReady()) {
+ PodStatus podStatus = datagridService.getDevModePodStatuses(projectId, environment);
+ if (podStatus != null) {
+ podStatus.setCpuInfo(cpu);
+ podStatus.setMemoryInfo(memory);
+ datagridService.savePodStatus(podStatus);
+ }
}
}
}
\ No newline at end of file
diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java
index 7b3ed5d7..f0a8a630 100644
--- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java
+++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/KaravanBashi.java
@@ -23,8 +23,8 @@ public class KaravanBashi {
void onStart(@Observes StartupEvent ev) throws InterruptedException {
LOGGER.info("Karavan Bashi is starting...");
- dockerService.checkContainersStatus();
dockerService.createNetwork();
+ dockerService.checkDataGridHealth();
dockerService.startListeners();
eventBus.publish(ConductorService.ADDRESS_INFINISPAN_START, "");
}
diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java
index 64acd143..be068895 100644
--- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java
+++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerEventListener.java
@@ -8,6 +8,7 @@ import io.vertx.core.eventbus.EventBus;
import org.apache.camel.karavan.bashi.ConductorService;
import org.apache.camel.karavan.bashi.Constants;
import org.apache.camel.karavan.datagrid.DatagridService;
+import org.apache.camel.karavan.datagrid.model.PodStatus;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
@@ -15,6 +16,7 @@ import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Instant;
import java.util.Arrays;
import java.util.Objects;
@@ -43,22 +45,31 @@ public class DockerEventListener implements ResultCallback<Event> {
@Override
public void onNext(Event event) {
// LOGGER.info(event.getType() + " : " + event.getStatus());
- if (Objects.equals(event.getType(), EventType.CONTAINER)) {
- Container container = dockerService.getContainer(event.getId());
- String status = event.getStatus();
- if (container.getNames()[0].equals("/infinispan") && status.startsWith("health_status:")) {
- String health = status.replace("health_status: ", "");
- LOGGER.infof("Container %s health status: %s", container.getNames()[0], health);
- eventBus.publish(ConductorService.ADDRESS_INFINISPAN_HEALTH, health);
- } else if (container.getNames()[0].endsWith(Constants.DEVMODE_SUFFIX)) {
- if (Arrays.asList("stop", "die", "kill", "pause", "destroy").contains(event.getStatus())) {
- String name = container.getNames()[0].replace("/", "");
- String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, "");
- datagridService.deletePodStatus(projectId, environment, name);
- } else if (Arrays.asList("start", "unpause").contains(event.getStatus())) {
-
+ try {
+ if (Objects.equals(event.getType(), EventType.CONTAINER)) {
+ Container container = dockerService.getContainer(event.getId());
+ String status = event.getStatus();
+ if (container.getNames()[0].equals("/infinispan") && status.startsWith("health_status:")) {
+ String health = status.replace("health_status: ", "");
+ LOGGER.infof("Container %s health status: %s", container.getNames()[0], health);
+ eventBus.publish(ConductorService.ADDRESS_INFINISPAN_HEALTH, health);
+ } else if (container.getNames()[0].endsWith(Constants.DEVMODE_SUFFIX)) {
+ if (Arrays.asList("stop", "die", "kill", "pause", "destroy").contains(event.getStatus())) {
+ String name = container.getNames()[0].replace("/", "");
+ String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, "");
+ LOGGER.info("Deleted PodStatus for " + projectId);
+ datagridService.deletePodStatus(projectId, environment, name);
+ } else if (Arrays.asList("start", "unpause").contains(event.getStatus())) {
+ String name = container.getNames()[0].replace("/", "");
+ String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, "");
+ PodStatus ps = new PodStatus(name, true, null, projectId, environment, true, Instant.ofEpochSecond(container.getCreated()).toString());
+ LOGGER.info("Saved PodStatus for " + projectId);
+ datagridService.savePodStatus(ps);
+ }
}
}
+ } catch (Exception exception) {
+ LOGGER.error(exception.getMessage());
}
}
diff --git a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java
index 44fa6824..80558b5f 100644
--- a/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java
+++ b/karavan-cloud/karavan-bashi/src/main/java/org/apache/camel/karavan/bashi/docker/DockerService.java
@@ -12,25 +12,34 @@ import com.github.dockerjava.core.DockerClientImpl;
import com.github.dockerjava.core.InvocationBuilder;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import com.github.dockerjava.transport.DockerHttpClient;
+import io.quarkus.scheduler.Scheduled;
+import io.smallrye.mutiny.tuples.Tuple2;
import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.json.JsonObject;
+import org.apache.camel.karavan.bashi.Constants;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.text.DecimalFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import static org.apache.camel.karavan.bashi.ConductorService.ADDRESS_CONTAINER_STATS;
import static org.apache.camel.karavan.bashi.ConductorService.ADDRESS_INFINISPAN_HEALTH;
+import static org.apache.camel.karavan.bashi.Constants.DATAGRID_CONTAINER_NAME;
import static org.apache.camel.karavan.bashi.Constants.NETWORK_NAME;
@ApplicationScoped
public class DockerService {
private static final Logger LOGGER = Logger.getLogger(DockerService.class.getName());
+ private static final DecimalFormat formatCpu = new DecimalFormat("0.00");
+ private static final DecimalFormat formatMiB = new DecimalFormat("0.0");
+ private static final DecimalFormat formatGiB = new DecimalFormat("0.00");
+ private static final Map<String, Tuple2<Long, Long>> previousStats = new ConcurrentHashMap<>();
@Inject
DockerEventListener dockerEventListener;
@@ -38,6 +47,57 @@ public class DockerService {
@Inject
EventBus eventBus;
+ @Scheduled(every = "{karavan.container-stats-interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
+ void collectContainersStats() {
+ System.out.println("collectContainersStats ");
+ getDockerClient().listContainersCmd().exec().forEach(container -> {
+ Statistics stats = getContainerStats(container.getId());
+
+ String name = container.getNames()[0].replace("/", "");
+ String projectId = name.replace("-" + Constants.DEVMODE_SUFFIX, "");
+ String memoryUsage = formatMemory(stats.getMemoryStats().getUsage());
+ String memoryLimit = formatMemory(stats.getMemoryStats().getLimit());
+ JsonObject data = JsonObject.of(
+ "projectId", projectId,
+ "memory", memoryUsage + " / " + memoryLimit,
+ "cpu", formatCpu(name, stats)
+ );
+ eventBus.publish(ADDRESS_CONTAINER_STATS, data);
+ });
+ }
+
+ private String formatMemory(Long memory) {
+ if (memory < (1073741824)) {
+ return formatMiB.format(memory.doubleValue() / 1048576) + "MiB";
+ } else {
+ return formatGiB.format(memory.doubleValue() / 1073741824) + "GiB";
+ }
+ }
+
+ private String formatCpu(String containerName, Statistics stats) {
+ double cpuUsage = 0;
+ long previousCpu = previousStats.containsKey(containerName) ? previousStats.get(containerName).getItem1() : -1;
+ long previousSystem = previousStats.containsKey(containerName) ? previousStats.get(containerName).getItem2() : -1;
+
+ CpuStatsConfig cpuStats = stats.getCpuStats();
+ if (cpuStats != null) {
+ CpuUsageConfig cpuUsageConfig = cpuStats.getCpuUsage();
+ long systemUsage = cpuStats.getSystemCpuUsage();
+ long totalUsage = cpuUsageConfig.getTotalUsage();
+
+ if (previousCpu != -1 && previousSystem != -1) {
+ float cpuDelta = totalUsage - previousCpu;
+ float systemDelta = systemUsage - previousSystem;
+
+ if (cpuDelta > 0 && systemDelta > 0) {
+ cpuUsage = cpuDelta / systemDelta * cpuStats.getOnlineCpus() * 100;
+ }
+ }
+ previousStats.put(containerName, Tuple2.of(totalUsage, systemUsage));
+ }
+ return formatCpu.format(cpuUsage) + "%";
+ }
+
public void startListeners() {
getDockerClient().eventsCmd().exec(dockerEventListener);
}
@@ -53,12 +113,12 @@ public class DockerService {
}
}
- public void checkContainersStatus() {
- getDockerClient().listContainersCmd().withShowAll(true).exec().stream()
+ public void checkDataGridHealth() {
+ getDockerClient().listContainersCmd().exec().stream()
.filter(c -> c.getState().equals("running"))
.forEach(c -> {
HealthState hs = getDockerClient().inspectContainerCmd(c.getId()).exec().getState().getHealth();
- if (c.getNames()[0].equals("/infinispan")) {
+ if (c.getNames()[0].equals("/" + DATAGRID_CONTAINER_NAME)) {
eventBus.publish(ADDRESS_INFINISPAN_HEALTH, hs.getStatus());
}
});
@@ -69,14 +129,9 @@ public class DockerService {
return containers.get(0);
}
- public List<Container> getRunnerContainer() {
- return getDockerClient().listContainersCmd()
- .withShowAll(true).withLabelFilter(Map.of("type", "runner")).exec();
- }
-
- public Statistics getContainerStats(String id) {
+ public Statistics getContainerStats(String containerId) {
InvocationBuilder.AsyncResultCallback<Statistics> callback = new InvocationBuilder.AsyncResultCallback<>();
- getDockerClient().statsCmd(id).withContainerId(id).exec(callback);
+ getDockerClient().statsCmd(containerId).withContainerId(containerId).withNoStream(true).exec(callback);
Statistics stats = null;
try {
stats = callback.awaitResult();
@@ -87,7 +142,7 @@ public class DockerService {
return stats;
}
- public void createContainer(String name, String image, List<String> env, String ports,
+ public Container createContainer(String name, String image, List<String> env, String ports,
boolean exposedPort, HealthCheck healthCheck, Map<String, String> labels) throws InterruptedException {
List<Container> containers = getDockerClient().listContainersCmd().withShowAll(true).withNameFilter(List.of(name)).exec();
if (containers.size() == 0) {
@@ -95,7 +150,7 @@ public class DockerService {
List<ExposedPort> exposedPorts = getPortsFromString(ports).values().stream().map(i -> ExposedPort.tcp(i)).collect(Collectors.toList());
- CreateContainerResponse container = getDockerClient().createContainerCmd(image)
+ CreateContainerResponse response = getDockerClient().createContainerCmd(image)
.withName(name)
.withLabels(labels)
.withEnv(env)
@@ -104,9 +159,12 @@ public class DockerService {
.withHostConfig(getHostConfig(ports, exposedPort))
.withHealthcheck(healthCheck)
.exec();
- LOGGER.info("Container created: " + container.getId());
+ LOGGER.info("Container created: " + response.getId());
+ return getDockerClient().listContainersCmd().withShowAll(true)
+ .withIdFilter(Collections.singleton(response.getId())).exec().get(0);
} else {
LOGGER.info("Container already exists: " + containers.get(0).getId());
+ return containers.get(0);
}
}
diff --git a/karavan-cloud/karavan-bashi/src/main/resources/application.properties b/karavan-cloud/karavan-bashi/src/main/resources/application.properties
index 1ed3add7..534ad21d 100644
--- a/karavan-cloud/karavan-bashi/src/main/resources/application.properties
+++ b/karavan-cloud/karavan-bashi/src/main/resources/application.properties
@@ -11,7 +11,7 @@ karavan.port=8080:8080
karavan.environment=dev
karavan.default-runtime=quarkus
karavan.runtimes=quarkus,spring-boot
-karavan.devmode-status-interval=2s
+karavan.container-stats-interval=5s
# Git repository Configuration
karavan.git-repository=${GIT_REPOSITORY}