You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ma...@apache.org on 2023/05/03 02:33:06 UTC
[camel-karavan] 03/04: SSE backend api for containers #563
This is an automated email from the ASF dual-hosted git repository.
marat pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-karavan.git
commit 9dd129045b449e58c0fab1aa0308e128dc3a009d
Author: Marat Gubaidullin <ma...@gmail.com>
AuthorDate: Tue May 2 21:07:05 2023 -0400
SSE backend api for containers #563
---
.../camel/karavan/api/KubernetesResource.java | 8 +-
.../apache/camel/karavan/api/LogWatchResource.java | 85 +++++++++++++++++++++
.../apache/camel/karavan/model/PipelineRunLog.java | 27 -------
.../camel/karavan/service/KubernetesService.java | 89 ++++++++++------------
.../src/main/webui/src/projects/ProjectLog.tsx | 6 +-
5 files changed, 130 insertions(+), 85 deletions(-)
diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java
index 60733747..fee4841c 100644
--- a/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java
+++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java
@@ -29,13 +29,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import javax.inject.Inject;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
+import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Comparator;
diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java
new file mode 100644
index 00000000..5fa14161
--- /dev/null
+++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.karavan.api;
+
+import io.fabric8.kubernetes.client.dsl.LogWatch;
+import io.smallrye.mutiny.tuples.Tuple2;
+import org.apache.camel.karavan.service.KubernetesService;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.eclipse.microprofile.context.ManagedExecutor;
+import org.jboss.logging.Logger;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Path("/api/logwatch")
+public class LogWatchResource {
+
+ private static final Logger LOGGER = Logger.getLogger(LogWatchResource.class.getName());
+ private static final ConcurrentHashMap<String, LogWatch> logWatches = new ConcurrentHashMap<>();
+
+ @Inject
+ KubernetesService kubernetesService;
+
+ @ConfigProperty(name = "karavan.environment")
+ String environment;
+
+
+ @Inject
+ ManagedExecutor managedExecutor;
+
+ @GET
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ @Path("/{type}/{env}/{name}")
+ public void eventSourcing(@PathParam("env") String env,
+ @PathParam("type") String type,
+ @PathParam("name") String name,
+ @Context SseEventSink eventSink,
+ @Context Sse sse
+ ) {
+ managedExecutor.execute(() -> {
+ try (SseEventSink sink = eventSink) {
+ LogWatch logWatch = kubernetesService.getLogWatch(name);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput()));
+ try {
+ for (String line; (line = reader.readLine()) != null && !sink.isClosed(); ) {
+ sink.send(sse.newEvent(line + System.lineSeparator()));
+ }
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage());
+ }
+ if (sink.isClosed()) {
+ logWatch.close();
+ LOGGER.info("LogWatch for " + name + " closed");
+ }
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java b/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java
deleted file mode 100644
index 307a12e5..00000000
--- a/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.camel.karavan.model;
-
-public class PipelineRunLog {
- private String task;
- private String log;
-
- public PipelineRunLog(String task, String log) {
- this.task = task;
- this.log = log;
- }
-
- public String getTask() {
- return task;
- }
-
- public void setTask(String task) {
- this.task = task;
- }
-
- public String getLog() {
- return log;
- }
-
- public void setLog(String log) {
- this.log = log;
- }
-}
diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java b/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java
index 165e800b..13b4484c 100644
--- a/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java
+++ b/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java
@@ -17,11 +17,7 @@
package org.apache.camel.karavan.service;
import io.fabric8.knative.internal.pkg.apis.Condition;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.Secret;
-import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -30,23 +26,14 @@ import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.openshift.api.model.ImageStream;
import io.fabric8.openshift.client.OpenShiftClient;
import io.fabric8.tekton.client.DefaultTektonClient;
-import io.fabric8.tekton.pipeline.v1beta1.ParamBuilder;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRef;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRefBuilder;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRun;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRunBuilder;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRunSpec;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRunSpecBuilder;
-import io.fabric8.tekton.pipeline.v1beta1.TaskRun;
-import io.fabric8.tekton.pipeline.v1beta1.WorkspaceBindingBuilder;
+import io.fabric8.tekton.pipeline.v1beta1.*;
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.mutiny.core.eventbus.EventBus;
-import org.apache.camel.karavan.informer.ServiceEventHandler;
-import org.apache.camel.karavan.model.PipelineRunLog;
-import org.apache.camel.karavan.model.Project;
import org.apache.camel.karavan.informer.DeploymentEventHandler;
import org.apache.camel.karavan.informer.PipelineRunEventHandler;
import org.apache.camel.karavan.informer.PodEventHandler;
+import org.apache.camel.karavan.informer.ServiceEventHandler;
+import org.apache.camel.karavan.model.Project;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
@@ -57,13 +44,7 @@ import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Default;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
import java.util.stream.Collectors;
@@ -103,8 +84,7 @@ public class KubernetesService implements HealthCheck{
@ConfigProperty(name = "karavan.environment")
public String environment;
-
- List<SharedIndexInformer> informers = new ArrayList<>(3);
+ List<SharedIndexInformer> informers = new ArrayList<>(4);
@ConsumeEvent(value = START_INFORMERS, blocking = true)
void startInformers(String data) {
@@ -205,34 +185,49 @@ public class KubernetesService implements HealthCheck{
return logText;
}
-
// TODO: implement log watch
- public void startContainerLogWatch(String podName, String namespace) {
- LogWatch logWatch = kubernetesClient().pods().inNamespace(namespace).withName(podName).watchLog();
- InputStream is = logWatch.getOutput();
- Integer i;
- try {
- while ((i = is.available()) != null) {
- eventBus.publish(podName + "-" + namespace, new String(is.readNBytes(i)));
- }
- } catch (IOException e) {
- LOGGER.error(e);
- }
- }
-
- public List<PipelineRunLog> getPipelineRunLog(String pipelineRuneName, String namespace) {
- List<PipelineRunLog> result = new ArrayList<>(1);
+ public LogWatch getLogWatch(String podName) {
+ return kubernetesClient().pods().inNamespace(getNamespace()).withName(podName).watchLog();
+ }
+// public void startContainerLogWatch(String session, String podName) {
+// Tuple2<CompletableFuture<Void>, LogWatch> old = logWatches.get(session);
+// if (old != null) {
+// LOGGER.info("Closing old");
+// old.getItem1().cancel(true);
+// old.getItem2().close();
+// logWatches.remove(session);
+// LOGGER.info("Closed old");
+// }
+//
+// LOGGER.info("Starting startContainerLogWatch");
+// CompletableFuture<Void> future = managedExecutor.runAsync(() -> {
+// LogWatch logWatch =
+// BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput()));
+// try {
+// for (String line; (line = reader.readLine()) != null; ) {
+// eventBus.publish(session, System.lineSeparator());
+// eventBus.publish(session, line);
+// System.out.println(line);
+// }
+// } catch (IOException e) {
+// LOGGER.error(e.getMessage());
+// }
+// });
+// logWatches.put(session, Tuple2.of(future, logWatch));
+// LOGGER.info("Done startContainerLogWatch");
+// }
+
+ public String getPipelineRunLog(String pipelineRuneName, String namespace) {
+ StringBuilder result = new StringBuilder();
getTaskRuns(pipelineRuneName, namespace).forEach(taskRun -> {
String podName = taskRun.getStatus().getPodName();
- StringBuilder log = new StringBuilder();
taskRun.getStatus().getSteps().forEach(stepState -> {
String logText = kubernetesClient().pods().inNamespace(namespace).withName(podName).inContainer(stepState.getContainer()).getLog(true);
- log.append(stepState.getContainer()).append(System.lineSeparator());
- log.append(logText).append(System.lineSeparator());
+ result.append(stepState.getContainer()).append(System.lineSeparator());
+ result.append(logText).append(System.lineSeparator());
});
- result.add(new PipelineRunLog(taskRun.getMetadata().getName(), log.toString()));
});
- return result;
+ return result.toString();
}
public PipelineRun getLastPipelineRun(String projectId, String pipelineName, String namespace) {
diff --git a/karavan-app/src/main/webui/src/projects/ProjectLog.tsx b/karavan-app/src/main/webui/src/projects/ProjectLog.tsx
index 3c706444..2b81dda5 100644
--- a/karavan-app/src/main/webui/src/projects/ProjectLog.tsx
+++ b/karavan-app/src/main/webui/src/projects/ProjectLog.tsx
@@ -31,7 +31,7 @@ export class ProjectLog extends React.Component<Props, State> {
showLog: false,
height: "30%",
logViewerRef: React.createRef(),
- isTextWrapped: false,
+ isTextWrapped: true,
data: []
}
@@ -40,7 +40,6 @@ export class ProjectLog extends React.Component<Props, State> {
componentDidMount() {
this.sub = ProjectEventBus.onShowLog()?.subscribe((log: ShowLogCommand) => {
this.setState({showLog: true, log: log});
- console.log(log)
this.showLogs(log.type, log.name, log.environment);
});
}
@@ -52,8 +51,7 @@ export class ProjectLog extends React.Component<Props, State> {
showLogs = (type: 'container' | 'pipeline', name: string, environment: string) => {
if (type === 'pipeline') {
KaravanApi.getPipelineLog(environment, name, (res: any) => {
- if (Array.isArray(res) && Array.from(res).length > 0)
- this.setState({data: res.at(0).log});
+ this.setState({data: res});
});
} else if (type === 'container') {
KaravanApi.getContainerLog(environment, name, (res: any) => {