You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ma...@apache.org on 2023/05/03 02:33:06 UTC

[camel-karavan] 03/04: SSE backend api for containers #563

This is an automated email from the ASF dual-hosted git repository.

marat pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-karavan.git

commit 9dd129045b449e58c0fab1aa0308e128dc3a009d
Author: Marat Gubaidullin <ma...@gmail.com>
AuthorDate: Tue May 2 21:07:05 2023 -0400

    SSE backend api for containers #563
---
 .../camel/karavan/api/KubernetesResource.java      |  8 +-
 .../apache/camel/karavan/api/LogWatchResource.java | 85 +++++++++++++++++++++
 .../apache/camel/karavan/model/PipelineRunLog.java | 27 -------
 .../camel/karavan/service/KubernetesService.java   | 89 ++++++++++------------
 .../src/main/webui/src/projects/ProjectLog.tsx     |  6 +-
 5 files changed, 130 insertions(+), 85 deletions(-)

diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java
index 60733747..fee4841c 100644
--- a/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java
+++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java
@@ -29,13 +29,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.jboss.logging.Logger;
 
 import javax.inject.Inject;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
+import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.Comparator;
diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java
new file mode 100644
index 00000000..5fa14161
--- /dev/null
+++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.karavan.api;
+
+import io.fabric8.kubernetes.client.dsl.LogWatch;
+import io.smallrye.mutiny.tuples.Tuple2;
+import org.apache.camel.karavan.service.KubernetesService;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.eclipse.microprofile.context.ManagedExecutor;
+import org.jboss.logging.Logger;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Path("/api/logwatch")
+public class LogWatchResource {
+
+    private static final Logger LOGGER = Logger.getLogger(LogWatchResource.class.getName());
+    private static final ConcurrentHashMap<String, LogWatch> logWatches = new ConcurrentHashMap<>();
+
+    @Inject
+    KubernetesService kubernetesService;
+
+    @ConfigProperty(name = "karavan.environment")
+    String environment;
+
+
+    @Inject
+    ManagedExecutor managedExecutor;
+
+    @GET
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    @Path("/{type}/{env}/{name}")
+    public void eventSourcing(@PathParam("env") String env,
+                                       @PathParam("type") String type,
+                                       @PathParam("name") String name,
+                                       @Context SseEventSink eventSink,
+                                       @Context Sse sse
+                                       ) {
+        managedExecutor.execute(() -> {
+            try (SseEventSink sink = eventSink) {
+                LogWatch logWatch = kubernetesService.getLogWatch(name);
+                BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput()));
+                try {
+                    for (String line; (line = reader.readLine()) != null && !sink.isClosed(); ) {
+                        sink.send(sse.newEvent(line + System.lineSeparator()));
+                    }
+                } catch (IOException e) {
+                    LOGGER.error(e.getMessage());
+                }
+                if (sink.isClosed()) {
+                    logWatch.close();
+                    LOGGER.info("LogWatch for " + name + " closed");
+                }
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java b/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java
deleted file mode 100644
index 307a12e5..00000000
--- a/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.camel.karavan.model;
-
-public class PipelineRunLog {
-    private String task;
-    private String log;
-
-    public PipelineRunLog(String task, String log) {
-        this.task = task;
-        this.log = log;
-    }
-
-    public String getTask() {
-        return task;
-    }
-
-    public void setTask(String task) {
-        this.task = task;
-    }
-
-    public String getLog() {
-        return log;
-    }
-
-    public void setLog(String log) {
-        this.log = log;
-    }
-}
diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java b/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java
index 165e800b..13b4484c 100644
--- a/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java
+++ b/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java
@@ -17,11 +17,7 @@
 package org.apache.camel.karavan.service;
 
 import io.fabric8.knative.internal.pkg.apis.Condition;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.Secret;
-import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.*;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -30,23 +26,14 @@ import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 import io.fabric8.openshift.api.model.ImageStream;
 import io.fabric8.openshift.client.OpenShiftClient;
 import io.fabric8.tekton.client.DefaultTektonClient;
-import io.fabric8.tekton.pipeline.v1beta1.ParamBuilder;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRef;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRefBuilder;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRun;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRunBuilder;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRunSpec;
-import io.fabric8.tekton.pipeline.v1beta1.PipelineRunSpecBuilder;
-import io.fabric8.tekton.pipeline.v1beta1.TaskRun;
-import io.fabric8.tekton.pipeline.v1beta1.WorkspaceBindingBuilder;
+import io.fabric8.tekton.pipeline.v1beta1.*;
 import io.quarkus.vertx.ConsumeEvent;
 import io.vertx.mutiny.core.eventbus.EventBus;
-import org.apache.camel.karavan.informer.ServiceEventHandler;
-import org.apache.camel.karavan.model.PipelineRunLog;
-import org.apache.camel.karavan.model.Project;
 import org.apache.camel.karavan.informer.DeploymentEventHandler;
 import org.apache.camel.karavan.informer.PipelineRunEventHandler;
 import org.apache.camel.karavan.informer.PodEventHandler;
+import org.apache.camel.karavan.informer.ServiceEventHandler;
+import org.apache.camel.karavan.model.Project;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.eclipse.microprofile.health.HealthCheck;
 import org.eclipse.microprofile.health.HealthCheckResponse;
@@ -57,13 +44,7 @@ import javax.enterprise.context.ApplicationScoped;
 import javax.enterprise.inject.Default;
 import javax.enterprise.inject.Produces;
 import javax.inject.Inject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 import java.util.stream.Collectors;
 
 
@@ -103,8 +84,7 @@ public class KubernetesService implements HealthCheck{
     @ConfigProperty(name = "karavan.environment")
     public String environment;
 
-
-    List<SharedIndexInformer> informers = new ArrayList<>(3);
+    List<SharedIndexInformer> informers = new ArrayList<>(4);
 
     @ConsumeEvent(value = START_INFORMERS, blocking = true)
     void startInformers(String data) {
@@ -205,34 +185,49 @@ public class KubernetesService implements HealthCheck{
         return logText;
     }
 
-
     // TODO: implement log watch
-    public void startContainerLogWatch(String podName, String namespace) {
-        LogWatch logWatch = kubernetesClient().pods().inNamespace(namespace).withName(podName).watchLog();
-        InputStream is = logWatch.getOutput();
-        Integer i;
-        try {
-            while ((i = is.available()) != null) {
-                eventBus.publish(podName + "-" + namespace, new String(is.readNBytes(i)));
-            }
-        } catch (IOException e) {
-            LOGGER.error(e);
-        }
-    }
-
-    public List<PipelineRunLog> getPipelineRunLog(String pipelineRuneName, String namespace) {
-        List<PipelineRunLog> result = new ArrayList<>(1);
+    public LogWatch getLogWatch(String podName) {
+        return kubernetesClient().pods().inNamespace(getNamespace()).withName(podName).watchLog();
+    }
+//    public void startContainerLogWatch(String session, String podName) {
+//        Tuple2<CompletableFuture<Void>, LogWatch> old = logWatches.get(session);
+//        if (old != null) {
+//            LOGGER.info("Closing old");
+//            old.getItem1().cancel(true);
+//            old.getItem2().close();
+//            logWatches.remove(session);
+//            LOGGER.info("Closed old");
+//        }
+//
+//        LOGGER.info("Starting startContainerLogWatch");
+//        CompletableFuture<Void> future = managedExecutor.runAsync(() -> {
+//            LogWatch logWatch =
+//            BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput()));
+//            try {
+//                for (String line; (line = reader.readLine()) != null; ) {
+//                    eventBus.publish(session, System.lineSeparator());
+//                    eventBus.publish(session, line);
+//                    System.out.println(line);
+//                }
+//            } catch (IOException e) {
+//                LOGGER.error(e.getMessage());
+//            }
+//        });
+//        logWatches.put(session, Tuple2.of(future, logWatch));
+//        LOGGER.info("Done startContainerLogWatch");
+//    }
+
+    public String getPipelineRunLog(String pipelineRuneName, String namespace) {
+        StringBuilder result = new StringBuilder();
         getTaskRuns(pipelineRuneName, namespace).forEach(taskRun -> {
             String podName = taskRun.getStatus().getPodName();
-            StringBuilder log = new StringBuilder();
             taskRun.getStatus().getSteps().forEach(stepState -> {
                 String logText = kubernetesClient().pods().inNamespace(namespace).withName(podName).inContainer(stepState.getContainer()).getLog(true);
-                log.append(stepState.getContainer()).append(System.lineSeparator());
-                log.append(logText).append(System.lineSeparator());
+                result.append(stepState.getContainer()).append(System.lineSeparator());
+                result.append(logText).append(System.lineSeparator());
             });
-            result.add(new PipelineRunLog(taskRun.getMetadata().getName(), log.toString()));
         });
-        return result;
+        return result.toString();
     }
 
     public PipelineRun getLastPipelineRun(String projectId, String pipelineName, String namespace) {
diff --git a/karavan-app/src/main/webui/src/projects/ProjectLog.tsx b/karavan-app/src/main/webui/src/projects/ProjectLog.tsx
index 3c706444..2b81dda5 100644
--- a/karavan-app/src/main/webui/src/projects/ProjectLog.tsx
+++ b/karavan-app/src/main/webui/src/projects/ProjectLog.tsx
@@ -31,7 +31,7 @@ export class ProjectLog extends React.Component<Props, State> {
         showLog: false,
         height: "30%",
         logViewerRef: React.createRef(),
-        isTextWrapped: false,
+        isTextWrapped: true,
         data: []
     }
 
@@ -40,7 +40,6 @@ export class ProjectLog extends React.Component<Props, State> {
     componentDidMount() {
         this.sub = ProjectEventBus.onShowLog()?.subscribe((log: ShowLogCommand) => {
             this.setState({showLog: true, log: log});
-            console.log(log)
             this.showLogs(log.type, log.name, log.environment);
         });
     }
@@ -52,8 +51,7 @@ export class ProjectLog extends React.Component<Props, State> {
     showLogs = (type: 'container' | 'pipeline', name: string, environment: string) => {
         if (type === 'pipeline') {
             KaravanApi.getPipelineLog(environment, name, (res: any) => {
-                if (Array.isArray(res) && Array.from(res).length > 0)
-                    this.setState({data: res.at(0).log});
+                this.setState({data: res});
             });
         } else if (type === 'container') {
             KaravanApi.getContainerLog(environment, name, (res: any) => {