You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/01/21 16:54:51 UTC

[camel-k] branch master updated: fix(kamel log): Use integration name for looking up containers

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/master by this push:
     new d3e646c  fix(kamel log): Use integration name for looking up containers
d3e646c is described below

commit d3e646cb9d2b5923e681a703e2975e3448371e78
Author: Roland Huß <ro...@ro14nd.de>
AuthorDate: Mon Jan 21 10:50:16 2019 +0100

    fix(kamel log): Use integration name for looking up containers
    
    Use integration name for looking up containers as a fallback if no
    container could be found.
    
    If no container could be identified even with the integration name,
    use the first container for the log, assuming its the "main" container.
    
    Fixes #347
---
 pkg/util/log/annotation_scraper.go  | 28 +++++++++++++-----------
 pkg/util/log/pod_scraper.go         | 43 +++++++++++++++++++++----------------
 pkg/util/log/util.go                |  2 +-
 test/log_scrape_integration_test.go |  4 ++--
 4 files changed, 43 insertions(+), 34 deletions(-)

diff --git a/pkg/util/log/annotation_scraper.go b/pkg/util/log/annotation_scraper.go
index 31aaba7..697c20e 100644
--- a/pkg/util/log/annotation_scraper.go
+++ b/pkg/util/log/annotation_scraper.go
@@ -34,19 +34,21 @@ import (
 
 // SelectorScraper scrapes all pods with a given selector
 type SelectorScraper struct {
-	client        kubernetes.Interface
-	namespace     string
-	labelSelector string
-	podScrapers   sync.Map
-	counter       uint64
+	client               kubernetes.Interface
+	namespace            string
+	defaultContainerName string
+	labelSelector        string
+	podScrapers          sync.Map
+	counter              uint64
 }
 
 // NewSelectorScraper creates a new SelectorScraper
-func NewSelectorScraper(client kubernetes.Interface, namespace string, labelSelector string) *SelectorScraper {
+func NewSelectorScraper(client kubernetes.Interface, namespace string, defaultContainerName string, labelSelector string) *SelectorScraper {
 	return &SelectorScraper{
-		client:        client,
-		namespace:     namespace,
-		labelSelector: labelSelector,
+		client:               client,
+		namespace:            namespace,
+		defaultContainerName: defaultContainerName,
+		labelSelector:        labelSelector,
 	}
 }
 
@@ -122,17 +124,17 @@ func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer) er
 	return nil
 }
 
-func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out *bufio.Writer) {
-	podScraper := NewPodScraper(s.client, s.namespace, name)
+func (s *SelectorScraper) addPodScraper(ctx context.Context, podName string,  out *bufio.Writer) {
+	podScraper := NewPodScraper(s.client, s.namespace, podName, s.defaultContainerName)
 	podCtx, podCancel := context.WithCancel(ctx)
 	id := atomic.AddUint64(&s.counter, 1)
 	prefix := "[" + strconv.FormatUint(id, 10) + "] "
 	podReader := podScraper.Start(podCtx)
-	s.podScrapers.Store(name, podCancel)
+	s.podScrapers.Store(podName, podCancel)
 	go func() {
 		defer podCancel()
 
-		if _, err := out.WriteString(prefix + "Monitoring pod " + name); err != nil {
+		if _, err := out.WriteString(prefix + "Monitoring pod " + podName); err != nil {
 			logrus.Error("Cannot write to output: ", err)
 			return
 		}
diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go
index a0c3c16..93ebc18 100644
--- a/pkg/util/log/pod_scraper.go
+++ b/pkg/util/log/pod_scraper.go
@@ -41,17 +41,19 @@ var commonUserContainerNames = map[string]bool{
 
 // PodScraper scrapes logs of a specific pod
 type PodScraper struct {
-	namespace string
-	name      string
-	client    kubernetes.Interface
+	namespace            string
+	podName              string
+	defaultContainerName string
+	client               kubernetes.Interface
 }
 
 // NewPodScraper creates a new pod scraper
-func NewPodScraper(c kubernetes.Interface, namespace string, name string) *PodScraper {
+func NewPodScraper(c kubernetes.Interface, namespace string, podName string, defaultContainerName string) *PodScraper {
 	return &PodScraper{
-		namespace: namespace,
-		name:      name,
-		client:    c,
+		namespace:            namespace,
+		podName:              podName,
+		defaultContainerName: defaultContainerName,
+		client:               c,
 	}
 }
 
@@ -69,7 +71,7 @@ func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
 }
 
 func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) {
-	containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name)
+	containerName, err := s.waitForPodRunning(ctx, s.namespace, s.podName, s.defaultContainerName)
 	if err != nil {
 		s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
 		return
@@ -78,7 +80,7 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos
 		Follow:    true,
 		Container: containerName,
 	}
-	byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.name, &logOptions).Context(ctx).Stream()
+	byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.podName, &logOptions).Context(ctx).Stream()
 	if err != nil {
 		s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
 		return
@@ -106,18 +108,18 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos
 
 func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.Duration, out *bufio.Writer, clientCloser func() error) {
 	if err != nil {
-		logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.name))
+		logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.podName))
 	}
 
 	if ctx.Err() != nil {
-		logrus.Debug("Pod ", s.name, " will no longer be monitored")
+		logrus.Debug("Pod ", s.podName, " will no longer be monitored")
 		if err := clientCloser(); err != nil {
 			logrus.Warn("Unable to close the client", err)
 		}
 		return
 	}
 
-	logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
+	logrus.Debug("Retrying to scrape pod ", s.podName, " logs in ", wait.Seconds(), " seconds...")
 	select {
 	case <-time.After(wait):
 		break
@@ -133,14 +135,14 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.
 
 // waitForPodRunning waits for a given pod to reach the running state.
 // It may return the internal container to watch if present
-func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) (string, error) {
+func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, podName string, defaultContainerName string) (string, error) {
 	pod := v1.Pod{
 		TypeMeta: metav1.TypeMeta{
 			Kind:       "Pod",
 			APIVersion: v1.SchemeGroupVersion.String(),
 		},
 		ObjectMeta: metav1.ObjectMeta{
-			Name:      name,
+			Name:      podName,
 			Namespace: namespace,
 		},
 	}
@@ -181,29 +183,34 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
 				}
 
 				if recvPod != nil && recvPod.Status.Phase == v1.PodRunning {
-					return s.chooseContainer(recvPod), nil
+					return s.chooseContainer(recvPod, defaultContainerName), nil
 				}
 			} else if e.Type == watch.Deleted || e.Type == watch.Error {
-				return "", errors.New("unable to watch pod " + s.name)
+				return "", errors.New("unable to watch pod " + s.podName)
 			}
 		case <-time.After(30 * time.Second):
-			return "", errors.New("no state change after 30 seconds for pod " + s.name)
+			return "", errors.New("no state change after 30 seconds for pod " + s.podName)
 		}
 	}
 
 }
 
-func (s *PodScraper) chooseContainer(p *v1.Pod) string {
+func (s *PodScraper) chooseContainer(p *v1.Pod, defaultContainerName string) string {
 	if p != nil {
 		if len(p.Spec.Containers) == 1 {
 			// Let Kubernetes auto-detect
 			return ""
 		}
+		// Fallback to first container name
+		containerNameFound := p.Spec.Containers[0].Name
 		for _, c := range p.Spec.Containers {
 			if _, ok := commonUserContainerNames[c.Name]; ok {
 				return c.Name
+			} else if c.Name == defaultContainerName {
+				containerNameFound = defaultContainerName
 			}
 		}
+		return containerNameFound
 	}
 	return ""
 }
diff --git a/pkg/util/log/util.go b/pkg/util/log/util.go
index eb20eb9..2dc91fc 100644
--- a/pkg/util/log/util.go
+++ b/pkg/util/log/util.go
@@ -30,7 +30,7 @@ import (
 
 // Print prints integrations logs to the stdout
 func Print(ctx context.Context, client kubernetes.Interface, integration *v1alpha1.Integration) error {
-	scraper := NewSelectorScraper(client, integration.Namespace, "camel.apache.org/integration="+integration.Name)
+	scraper := NewSelectorScraper(client, integration.Namespace, integration.Name,"camel.apache.org/integration="+integration.Name)
 	reader := scraper.Start(ctx)
 
 	if _, err := io.Copy(os.Stdout, ioutil.NopCloser(reader)); err != nil {
diff --git a/test/log_scrape_integration_test.go b/test/log_scrape_integration_test.go
index 9a74b57..f233e29 100644
--- a/test/log_scrape_integration_test.go
+++ b/test/log_scrape_integration_test.go
@@ -39,7 +39,7 @@ func TestPodLogScrape(t *testing.T) {
 
 	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
 	defer cancel()
-	scraper := log.NewPodScraper(testClient, pod.Namespace, pod.Name)
+	scraper := log.NewPodScraper(testClient, pod.Namespace, pod.Name, "scraped")
 	in := scraper.Start(ctx)
 
 	res := make(chan bool)
@@ -74,7 +74,7 @@ func TestSelectorLogScrape(t *testing.T) {
 
 	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
 	defer cancel()
-	scraper := log.NewSelectorScraper(testClient, deployment.Namespace, "scrape=me")
+	scraper := log.NewSelectorScraper(testClient, deployment.Namespace, "main", "scrape=me")
 	in := scraper.Start(ctx)
 
 	res := make(chan string)