You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2018/12/11 21:19:48 UTC
[camel-k] branch master updated: chore(log): make dev-mode work in
Knative
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push:
new 6a8c149 chore(log): make dev-mode work in Knative
6a8c149 is described below
commit 6a8c149d7803e8a348002e1597ddf053493ff63a
Author: nferraro <ni...@gmail.com>
AuthorDate: Tue Dec 11 17:55:04 2018 +0100
chore(log): make dev-mode work in Knative
---
pkg/util/log/pod_scraper.go | 54 ++++++++++++++++++++++++++++++++-------------
1 file changed, 39 insertions(+), 15 deletions(-)
diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go
index ba4567b..c46f2be 100644
--- a/pkg/util/log/pod_scraper.go
+++ b/pkg/util/log/pod_scraper.go
@@ -34,6 +34,11 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
+var commonUserContainerNames = map[string]bool{
+ // Convention used in Knative and Istio
+ "user-container": true,
+}
+
// PodScraper scrapes logs of a specific pod
type PodScraper struct {
namespace string
@@ -62,13 +67,16 @@ func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
}
func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) {
- err := s.waitForPodRunning(ctx, s.namespace, s.name)
+ containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name)
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
}
-
- byteReader, err := k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, &v1.PodLogOptions{Follow: true}).Context(ctx).Stream()
+ logOptions := v1.PodLogOptions{
+ Follow: true,
+ Container: containerName,
+ }
+ byteReader, err := k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, &logOptions).Context(ctx).Stream()
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
@@ -100,14 +108,14 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.
}
if ctx.Err() != nil {
- logrus.Info("Pod ", s.name, " will no longer be monitored")
+ logrus.Debug("Pod ", s.name, " will no longer be monitored")
if err := clientCloser(); err != nil {
logrus.Warn("Unable to close the client", err)
}
return
}
- logrus.Info("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
+ logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
select {
case <-time.After(wait):
break
@@ -121,8 +129,9 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.
s.doScrape(ctx, out, clientCloser)
}
-// Waits for a given pod to reach the running state
-func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) error {
+// waitForPodRunning waits for a given pod to reach the running state.
+// It may return the internal container to watch if present
+func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) (string, error) {
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
@@ -135,22 +144,22 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
}
resourceClient, _, err := k8sclient.GetResourceClient(pod.APIVersion, pod.Kind, pod.Namespace)
if err != nil {
- return err
+ return "", err
}
watcher, err := resourceClient.Watch(metav1.ListOptions{
FieldSelector: "metadata.name=" + pod.Name,
})
if err != nil {
- return err
+ return "", err
}
events := watcher.ResultChan()
for {
select {
case <-ctx.Done():
- return ctx.Err()
+ return "", ctx.Err()
case e, ok := <-events:
if !ok {
- return errors.New("event channel closed")
+ return "", errors.New("event channel closed")
}
if e.Object != nil {
@@ -161,19 +170,34 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
pcopy := pod.DeepCopy()
err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, pcopy)
if err != nil {
- return err
+ return "", err
}
if pcopy.Status.Phase == v1.PodRunning {
- return nil
+ return s.chooseContainer(pcopy), nil
}
}
} else if e.Type == watch.Deleted || e.Type == watch.Error {
- return errors.New("unable to watch pod " + s.name)
+ return "", errors.New("unable to watch pod " + s.name)
}
case <-time.After(30 * time.Second):
- return errors.New("no state change after 30 seconds for pod " + s.name)
+ return "", errors.New("no state change after 30 seconds for pod " + s.name)
}
}
}
+
+func (s *PodScraper) chooseContainer(p *v1.Pod) string {
+ if p != nil {
+ if len(p.Spec.Containers) == 1 {
+ // Let Kubernetes auto-detect
+ return ""
+ }
+ for _, c := range p.Spec.Containers {
+ if _, ok := commonUserContainerNames[c.Name]; ok {
+ return c.Name
+ }
+ }
+ }
+ return ""
+}