You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/01/21 16:54:51 UTC
[camel-k] branch master updated: fix(kamel log): Use integration
name for looking up containers
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push:
new d3e646c fix(kamel log): Use integration name for looking up containers
d3e646c is described below
commit d3e646cb9d2b5923e681a703e2975e3448371e78
Author: Roland Huß <ro...@ro14nd.de>
AuthorDate: Mon Jan 21 10:50:16 2019 +0100
fix(kamel log): Use integration name for looking up containers
Use integration name for looking up containers as a fallback if no
container could be found.
If no container could be identified even with the integration name,
use the first container for the log, assuming its the "main" container.
Fixes #347
---
pkg/util/log/annotation_scraper.go | 28 +++++++++++++-----------
pkg/util/log/pod_scraper.go | 43 +++++++++++++++++++++----------------
pkg/util/log/util.go | 2 +-
test/log_scrape_integration_test.go | 4 ++--
4 files changed, 43 insertions(+), 34 deletions(-)
diff --git a/pkg/util/log/annotation_scraper.go b/pkg/util/log/annotation_scraper.go
index 31aaba7..697c20e 100644
--- a/pkg/util/log/annotation_scraper.go
+++ b/pkg/util/log/annotation_scraper.go
@@ -34,19 +34,21 @@ import (
// SelectorScraper scrapes all pods with a given selector
type SelectorScraper struct {
- client kubernetes.Interface
- namespace string
- labelSelector string
- podScrapers sync.Map
- counter uint64
+ client kubernetes.Interface
+ namespace string
+ defaultContainerName string
+ labelSelector string
+ podScrapers sync.Map
+ counter uint64
}
// NewSelectorScraper creates a new SelectorScraper
-func NewSelectorScraper(client kubernetes.Interface, namespace string, labelSelector string) *SelectorScraper {
+func NewSelectorScraper(client kubernetes.Interface, namespace string, defaultContainerName string, labelSelector string) *SelectorScraper {
return &SelectorScraper{
- client: client,
- namespace: namespace,
- labelSelector: labelSelector,
+ client: client,
+ namespace: namespace,
+ defaultContainerName: defaultContainerName,
+ labelSelector: labelSelector,
}
}
@@ -122,17 +124,17 @@ func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer) er
return nil
}
-func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out *bufio.Writer) {
- podScraper := NewPodScraper(s.client, s.namespace, name)
+func (s *SelectorScraper) addPodScraper(ctx context.Context, podName string, out *bufio.Writer) {
+ podScraper := NewPodScraper(s.client, s.namespace, podName, s.defaultContainerName)
podCtx, podCancel := context.WithCancel(ctx)
id := atomic.AddUint64(&s.counter, 1)
prefix := "[" + strconv.FormatUint(id, 10) + "] "
podReader := podScraper.Start(podCtx)
- s.podScrapers.Store(name, podCancel)
+ s.podScrapers.Store(podName, podCancel)
go func() {
defer podCancel()
- if _, err := out.WriteString(prefix + "Monitoring pod " + name); err != nil {
+ if _, err := out.WriteString(prefix + "Monitoring pod " + podName); err != nil {
logrus.Error("Cannot write to output: ", err)
return
}
diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go
index a0c3c16..93ebc18 100644
--- a/pkg/util/log/pod_scraper.go
+++ b/pkg/util/log/pod_scraper.go
@@ -41,17 +41,19 @@ var commonUserContainerNames = map[string]bool{
// PodScraper scrapes logs of a specific pod
type PodScraper struct {
- namespace string
- name string
- client kubernetes.Interface
+ namespace string
+ podName string
+ defaultContainerName string
+ client kubernetes.Interface
}
// NewPodScraper creates a new pod scraper
-func NewPodScraper(c kubernetes.Interface, namespace string, name string) *PodScraper {
+func NewPodScraper(c kubernetes.Interface, namespace string, podName string, defaultContainerName string) *PodScraper {
return &PodScraper{
- namespace: namespace,
- name: name,
- client: c,
+ namespace: namespace,
+ podName: podName,
+ defaultContainerName: defaultContainerName,
+ client: c,
}
}
@@ -69,7 +71,7 @@ func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
}
func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) {
- containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name)
+ containerName, err := s.waitForPodRunning(ctx, s.namespace, s.podName, s.defaultContainerName)
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
@@ -78,7 +80,7 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos
Follow: true,
Container: containerName,
}
- byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.name, &logOptions).Context(ctx).Stream()
+ byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.podName, &logOptions).Context(ctx).Stream()
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
@@ -106,18 +108,18 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos
func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.Duration, out *bufio.Writer, clientCloser func() error) {
if err != nil {
- logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.name))
+ logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.podName))
}
if ctx.Err() != nil {
- logrus.Debug("Pod ", s.name, " will no longer be monitored")
+ logrus.Debug("Pod ", s.podName, " will no longer be monitored")
if err := clientCloser(); err != nil {
logrus.Warn("Unable to close the client", err)
}
return
}
- logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
+ logrus.Debug("Retrying to scrape pod ", s.podName, " logs in ", wait.Seconds(), " seconds...")
select {
case <-time.After(wait):
break
@@ -133,14 +135,14 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.
// waitForPodRunning waits for a given pod to reach the running state.
// It may return the internal container to watch if present
-func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) (string, error) {
+func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, podName string, defaultContainerName string) (string, error) {
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: v1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
- Name: name,
+ Name: podName,
Namespace: namespace,
},
}
@@ -181,29 +183,34 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
}
if recvPod != nil && recvPod.Status.Phase == v1.PodRunning {
- return s.chooseContainer(recvPod), nil
+ return s.chooseContainer(recvPod, defaultContainerName), nil
}
} else if e.Type == watch.Deleted || e.Type == watch.Error {
- return "", errors.New("unable to watch pod " + s.name)
+ return "", errors.New("unable to watch pod " + s.podName)
}
case <-time.After(30 * time.Second):
- return "", errors.New("no state change after 30 seconds for pod " + s.name)
+ return "", errors.New("no state change after 30 seconds for pod " + s.podName)
}
}
}
-func (s *PodScraper) chooseContainer(p *v1.Pod) string {
+func (s *PodScraper) chooseContainer(p *v1.Pod, defaultContainerName string) string {
if p != nil {
if len(p.Spec.Containers) == 1 {
// Let Kubernetes auto-detect
return ""
}
+ // Fallback to first container name
+ containerNameFound := p.Spec.Containers[0].Name
for _, c := range p.Spec.Containers {
if _, ok := commonUserContainerNames[c.Name]; ok {
return c.Name
+ } else if c.Name == defaultContainerName {
+ containerNameFound = defaultContainerName
}
}
+ return containerNameFound
}
return ""
}
diff --git a/pkg/util/log/util.go b/pkg/util/log/util.go
index eb20eb9..2dc91fc 100644
--- a/pkg/util/log/util.go
+++ b/pkg/util/log/util.go
@@ -30,7 +30,7 @@ import (
// Print prints integrations logs to the stdout
func Print(ctx context.Context, client kubernetes.Interface, integration *v1alpha1.Integration) error {
- scraper := NewSelectorScraper(client, integration.Namespace, "camel.apache.org/integration="+integration.Name)
+ scraper := NewSelectorScraper(client, integration.Namespace, integration.Name,"camel.apache.org/integration="+integration.Name)
reader := scraper.Start(ctx)
if _, err := io.Copy(os.Stdout, ioutil.NopCloser(reader)); err != nil {
diff --git a/test/log_scrape_integration_test.go b/test/log_scrape_integration_test.go
index 9a74b57..f233e29 100644
--- a/test/log_scrape_integration_test.go
+++ b/test/log_scrape_integration_test.go
@@ -39,7 +39,7 @@ func TestPodLogScrape(t *testing.T) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
defer cancel()
- scraper := log.NewPodScraper(testClient, pod.Namespace, pod.Name)
+ scraper := log.NewPodScraper(testClient, pod.Namespace, pod.Name, "scraped")
in := scraper.Start(ctx)
res := make(chan bool)
@@ -74,7 +74,7 @@ func TestSelectorLogScrape(t *testing.T) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
defer cancel()
- scraper := log.NewSelectorScraper(testClient, deployment.Namespace, "scrape=me")
+ scraper := log.NewSelectorScraper(testClient, deployment.Namespace, "main", "scrape=me")
in := scraper.Start(ctx)
res := make(chan string)