You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2018/12/11 21:19:48 UTC

[camel-k] branch master updated: chore(log): make dev-mode work in Knative

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8c149  chore(log): make dev-mode work in Knative
6a8c149 is described below

commit 6a8c149d7803e8a348002e1597ddf053493ff63a
Author: nferraro <ni...@gmail.com>
AuthorDate: Tue Dec 11 17:55:04 2018 +0100

    chore(log): make dev-mode work in Knative
---
 pkg/util/log/pod_scraper.go | 54 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 39 insertions(+), 15 deletions(-)

diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go
index ba4567b..c46f2be 100644
--- a/pkg/util/log/pod_scraper.go
+++ b/pkg/util/log/pod_scraper.go
@@ -34,6 +34,11 @@ import (
 	"k8s.io/apimachinery/pkg/watch"
 )
 
+var commonUserContainerNames = map[string]bool{
+	// Convention used in Knative and Istio
+	"user-container": true,
+}
+
 // PodScraper scrapes logs of a specific pod
 type PodScraper struct {
 	namespace string
@@ -62,13 +67,16 @@ func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
 }
 
 func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) {
-	err := s.waitForPodRunning(ctx, s.namespace, s.name)
+	containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name)
 	if err != nil {
 		s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
 		return
 	}
-
-	byteReader, err := k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, &v1.PodLogOptions{Follow: true}).Context(ctx).Stream()
+	logOptions := v1.PodLogOptions{
+		Follow:    true,
+		Container: containerName,
+	}
+	byteReader, err := k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, &logOptions).Context(ctx).Stream()
 	if err != nil {
 		s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
 		return
@@ -100,14 +108,14 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.
 	}
 
 	if ctx.Err() != nil {
-		logrus.Info("Pod ", s.name, " will no longer be monitored")
+		logrus.Debug("Pod ", s.name, " will no longer be monitored")
 		if err := clientCloser(); err != nil {
 			logrus.Warn("Unable to close the client", err)
 		}
 		return
 	}
 
-	logrus.Info("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
+	logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
 	select {
 	case <-time.After(wait):
 		break
@@ -121,8 +129,9 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.
 	s.doScrape(ctx, out, clientCloser)
 }
 
-// Waits for a given pod to reach the running state
-func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) error {
+// waitForPodRunning waits for a given pod to reach the running state.
+// It may return the internal container to watch if present
+func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) (string, error) {
 	pod := v1.Pod{
 		TypeMeta: metav1.TypeMeta{
 			Kind:       "Pod",
@@ -135,22 +144,22 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
 	}
 	resourceClient, _, err := k8sclient.GetResourceClient(pod.APIVersion, pod.Kind, pod.Namespace)
 	if err != nil {
-		return err
+		return "", err
 	}
 	watcher, err := resourceClient.Watch(metav1.ListOptions{
 		FieldSelector: "metadata.name=" + pod.Name,
 	})
 	if err != nil {
-		return err
+		return "", err
 	}
 	events := watcher.ResultChan()
 	for {
 		select {
 		case <-ctx.Done():
-			return ctx.Err()
+			return "", ctx.Err()
 		case e, ok := <-events:
 			if !ok {
-				return errors.New("event channel closed")
+				return "", errors.New("event channel closed")
 			}
 
 			if e.Object != nil {
@@ -161,19 +170,34 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
 					pcopy := pod.DeepCopy()
 					err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, pcopy)
 					if err != nil {
-						return err
+						return "", err
 					}
 
 					if pcopy.Status.Phase == v1.PodRunning {
-						return nil
+						return s.chooseContainer(pcopy), nil
 					}
 				}
 			} else if e.Type == watch.Deleted || e.Type == watch.Error {
-				return errors.New("unable to watch pod " + s.name)
+				return "", errors.New("unable to watch pod " + s.name)
 			}
 		case <-time.After(30 * time.Second):
-			return errors.New("no state change after 30 seconds for pod " + s.name)
+			return "", errors.New("no state change after 30 seconds for pod " + s.name)
 		}
 	}
 
 }
+
+func (s *PodScraper) chooseContainer(p *v1.Pod) string {
+	if p != nil {
+		if len(p.Spec.Containers) == 1 {
+			// Let Kubernetes auto-detect
+			return ""
+		}
+		for _, c := range p.Spec.Containers {
+			if _, ok := commonUserContainerNames[c.Name]; ok {
+				return c.Name
+			}
+		}
+	}
+	return ""
+}