You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2021/11/17 09:36:56 UTC

[camel-k] 01/05: feat: Report runtime health checks into Integration readiness condition

This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit c55cad5c7179b6efc36a54c5b6e3de6c5f11cabd
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Tue Oct 26 14:51:56 2021 +0200

    feat: Report runtime health checks into Integration readiness condition
---
 config/rbac/operator-role.yaml            |   6 ++
 go.sum                                    |   1 -
 helm/camel-k/templates/operator-role.yaml |   6 ++
 pkg/apis/camel/v1/integration_types.go    |   2 +
 pkg/controller/integration/health.go      |  59 ++++++++++++++++
 pkg/controller/integration/monitor.go     | 110 ++++++++++++++++++++++++++----
 pkg/resources/resources.go                |   4 +-
 pkg/trait/trait_types.go                  |   6 +-
 8 files changed, 175 insertions(+), 19 deletions(-)

diff --git a/config/rbac/operator-role.yaml b/config/rbac/operator-role.yaml
index e619bbf..cc7fd7b 100644
--- a/config/rbac/operator-role.yaml
+++ b/config/rbac/operator-role.yaml
@@ -54,6 +54,12 @@ rules:
   verbs:
   - create
 - apiGroups:
+  - ""
+  resources:
+  - pods/proxy
+  verbs:
+  - get
+- apiGroups:
   - policy
   resources:
   - poddisruptionbudgets
diff --git a/go.sum b/go.sum
index d98d9d4..ea84466 100644
--- a/go.sum
+++ b/go.sum
@@ -1750,7 +1750,6 @@ k8s.io/code-generator v0.18.2/go.mod h1:+UHX5rSbxmR8kzS+FAv7um6dtYrZokQvjHpDSYRV
 k8s.io/code-generator v0.18.6/go.mod h1:TgNEVx9hCyPGpdtCWA34olQYLkh3ok9ar7XfSsr8b6c=
 k8s.io/code-generator v0.19.2/go.mod h1:moqLn7w0t9cMs4+5CQyxnfA/HV8MF6aAVENF+WZZhgk=
 k8s.io/code-generator v0.21.1/go.mod h1:hUlps5+9QaTrKx+jiM4rmq7YmH8wPOIko64uZCHDh6Q=
-k8s.io/code-generator v0.21.4 h1:vO8jVuEGV4UF+/2s/88Qg05MokE/1QUFi/Q2YDgz++A=
 k8s.io/code-generator v0.21.4/go.mod h1:K3y0Bv9Cz2cOW2vXUrNZlFbflhuPvuadW6JdnN6gGKo=
 k8s.io/component-base v0.17.4/go.mod h1:5BRqHMbbQPm2kKu35v3G+CpVq4K0RJKC7TRioF0I9lE=
 k8s.io/component-base v0.18.2/go.mod h1:kqLlMuhJNHQ9lz8Z7V5bxUUtjFZnrypArGl58gmDfUM=
diff --git a/helm/camel-k/templates/operator-role.yaml b/helm/camel-k/templates/operator-role.yaml
index 1ec9512..3afbe47 100644
--- a/helm/camel-k/templates/operator-role.yaml
+++ b/helm/camel-k/templates/operator-role.yaml
@@ -55,6 +55,12 @@ rules:
   verbs:
   - create
 - apiGroups:
+  - ""
+  resources:
+  - pods/proxy
+  verbs:
+  - get
+- apiGroups:
   - policy
   resources:
   - poddisruptionbudgets
diff --git a/pkg/apis/camel/v1/integration_types.go b/pkg/apis/camel/v1/integration_types.go
index 0729413..cafaa57 100644
--- a/pkg/apis/camel/v1/integration_types.go
+++ b/pkg/apis/camel/v1/integration_types.go
@@ -197,6 +197,8 @@ const (
 	IntegrationConditionLastJobSucceededReason string = "LastJobSucceeded"
 	// IntegrationConditionLastJobFailedReason --
 	IntegrationConditionLastJobFailedReason string = "LastJobFailed"
+	// IntegrationConditionRuntimeNotReadyReason --
+	IntegrationConditionRuntimeNotReadyReason string = "RuntimeNotReady"
 	// IntegrationConditionErrorReason --
 	IntegrationConditionErrorReason string = "Error"
 
diff --git a/pkg/controller/integration/health.go b/pkg/controller/integration/health.go
new file mode 100644
index 0000000..e14d241
--- /dev/null
+++ b/pkg/controller/integration/health.go
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+	"context"
+	"fmt"
+	"strings"
+	"time"
+
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/client-go/kubernetes"
+)
+
+type HealthCheckState string
+
+const (
+	HealthCheckStateDown HealthCheckState = "DOWN"
+	HealthCheckStateUp   HealthCheckState = "UP"
+)
+
+type HealthCheck struct {
+	Status HealthCheckState      `json:"state,omitempty"`
+	Checks []HealthCheckResponse `json:"checks,omitempty"`
+}
+
+type HealthCheckResponse struct {
+	Name   string                 `json:"name,omitempty"`
+	Status HealthCheckState       `json:"state,omitempty"`
+	Data   map[string]interface{} `json:"data,omitempty"`
+}
+
+func proxyGetHTTPProbe(ctx context.Context, c kubernetes.Interface, p *corev1.Probe, pod *corev1.Pod) ([]byte, error) {
+	if p.HTTPGet == nil {
+		return nil, fmt.Errorf("missing probe handler for %s/%s", pod.Namespace, pod.Name)
+	}
+
+	probeCtx, cancel := context.WithTimeout(ctx, time.Duration(p.TimeoutSeconds)*time.Second)
+	defer cancel()
+	params := make(map[string]string)
+	return c.CoreV1().Pods(pod.Namespace).
+		ProxyGet(strings.ToLower(string(p.HTTPGet.Scheme)), pod.Name, p.HTTPGet.Port.String(), p.HTTPGet.Path, params).
+		DoRaw(probeCtx)
+}
diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index 2b01a7a..d4bd5ab 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -19,6 +19,7 @@ package integration
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"reflect"
 	"strconv"
@@ -28,6 +29,7 @@ import (
 	batchv1beta1 "k8s.io/api/batch/v1beta1"
 	corev1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/api/equality"
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/apimachinery/pkg/selection"
 
@@ -41,6 +43,9 @@ import (
 	"github.com/apache/camel-k/pkg/util/kubernetes"
 )
 
+// The key used for propagating error details from Camel health to MicroProfile Health (See CAMEL-17138)
+const runtimeHealthCheckErrorMessage = "error.message"
+
 func NewMonitorAction() Action {
 	return &monitorAction{}
 }
@@ -161,6 +166,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra
 func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error {
 	var controller ctrl.Object
 	var lastCompletedJob *batchv1.Job
+	var podSpec corev1.PodSpec
 
 	switch {
 	case isConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable):
@@ -189,6 +195,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 			setReadyConditionError(integration, progressing.Message)
 			return nil
 		}
+		podSpec = c.Spec.Template.Spec
 
 	case *servingv1.Service:
 		// Check the KnativeService conditions
@@ -197,6 +204,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 			setReadyConditionError(integration, ready.Message)
 			return nil
 		}
+		podSpec = c.Spec.Template.Spec.PodSpec
 
 	case *batchv1beta1.CronJob:
 		// Check latest job result
@@ -224,6 +232,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 				}
 			}
 		}
+		podSpec = c.Spec.JobTemplate.Spec.Template.Spec
 	}
 
 	// Check Pods statuses
@@ -274,6 +283,26 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 
 	integration.Status.Phase = v1.IntegrationPhaseRunning
 
+	var readyPods []corev1.Pod
+	var unreadyPods []corev1.Pod
+	for _, pod := range runningPods {
+		// We compare the Integration PodSpec to that of the Pod in order to make
+		// sure we account for up-to-date version.
+		if !equality.Semantic.DeepDerivative(podSpec, pod.Spec) {
+			continue
+		}
+		ready := kubernetes.GetPodCondition(pod, corev1.PodReady)
+		if ready == nil {
+			continue
+		}
+		switch ready.Status {
+		case corev1.ConditionTrue:
+			readyPods = append(readyPods, pod)
+		case corev1.ConditionFalse:
+			unreadyPods = append(unreadyPods, pod)
+		}
+	}
+
 	switch c := controller.(type) {
 	case *appsv1.Deployment:
 		replicas := int32(1)
@@ -282,27 +311,18 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 		}
 		// The Deployment status reports updated and ready replicas separately,
 		// so that the number of ready replicas also accounts for older versions.
-		// We compare the Integration PodSpec to that of the Pod in order to make
-		// sure we account for up-to-date version.
-		var readyPods []corev1.Pod
-		for _, pod := range runningPods {
-			if ready := kubernetes.GetPodCondition(pod, corev1.PodReady); ready == nil || ready.Status != corev1.ConditionTrue {
-				continue
-			}
-			if equality.Semantic.DeepDerivative(c.Spec.Template.Spec, pod.Spec) {
-				readyPods = append(readyPods, pod)
-			}
-		}
 		readyReplicas := int32(len(readyPods))
-		// The Integration is considered ready when the number of replicas
-		// reported to be ready is larger than or equal to the specified number
-		// of replicas. This avoids reporting a falsy readiness condition
-		// when the Integration is being down-scaled.
 		switch {
 		case readyReplicas >= replicas:
+			// The Integration is considered ready when the number of replicas
+			// reported to be ready is larger than or equal to the specified number
+			// of replicas. This avoids reporting a falsy readiness condition
+			// when the Integration is being down-scaled.
 			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", c.Status.ReadyReplicas, replicas))
+
 		case c.Status.UpdatedReplicas < replicas:
 			setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.Status.UpdatedReplicas, replicas))
+
 		default:
 			setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas))
 		}
@@ -319,19 +339,69 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 		switch {
 		case c.Status.LastScheduleTime == nil:
 			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created")
+
 		case len(c.Status.Active) > 0:
 			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active")
+
 		case c.Spec.SuccessfulJobsHistoryLimit != nil && *c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil && *c.Spec.FailedJobsHistoryLimit == 0:
 			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available")
+
 		case lastCompletedJob != nil:
 			if complete := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue {
 				setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name))
 			}
+
 		default:
 			integration.Status.SetCondition(v1.IntegrationConditionReady, corev1.ConditionUnknown, "", "")
 		}
 	}
 
+	// Finally, call the readiness probes of the non-ready Pods directly,
+	// to retrieve insights from the Camel runtime.
+	var runtimeNotReadyMessages []string
+	for _, pod := range unreadyPods {
+		if ready := kubernetes.GetPodCondition(pod, corev1.PodReady); ready.Reason != "ContainersNotReady" {
+			continue
+		}
+		container := getIntegrationContainer(environment, &pod)
+		if container == nil {
+			return fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name)
+		}
+		if probe := container.ReadinessProbe; probe != nil && probe.HTTPGet != nil {
+			body, err := proxyGetHTTPProbe(ctx, action.client, probe, &pod)
+			if err == nil {
+				continue
+			}
+			if !k8serrors.IsServiceUnavailable(err) {
+				return err
+			}
+			health := HealthCheck{}
+			err = json.Unmarshal(body, &health)
+			if err != nil {
+				return err
+			}
+			for _, check := range health.Checks {
+				if check.Name != "camel-readiness-checks" {
+					continue
+				}
+				if check.Status == HealthCheckStateUp {
+					continue
+				}
+				if _, ok := check.Data[runtimeHealthCheckErrorMessage]; ok {
+					integration.Status.Phase = v1.IntegrationPhaseError
+				}
+				runtimeNotReadyMessages = append(runtimeNotReadyMessages, fmt.Sprintf("Pod %s runtime is not ready: %s", pod.Name, check.Data))
+			}
+		}
+	}
+	if len(runtimeNotReadyMessages) > 0 {
+		reason := v1.IntegrationConditionRuntimeNotReadyReason
+		if integration.Status.Phase == v1.IntegrationPhaseError {
+			reason = v1.IntegrationConditionErrorReason
+		}
+		setReadyCondition(integration, corev1.ConditionFalse, reason, fmt.Sprintf("%s", runtimeNotReadyMessages))
+	}
+
 	return nil
 }
 
@@ -357,6 +427,16 @@ func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit,
 	return kit, nil
 }
 
+func getIntegrationContainer(environment *trait.Environment, pod *corev1.Pod) *corev1.Container {
+	name := environment.GetIntegrationContainerName()
+	for i, container := range pod.Spec.Containers {
+		if container.Name == name {
+			return &pod.Spec.Containers[i]
+		}
+	}
+	return nil
+}
+
 func isConditionTrue(integration *v1.Integration, conditionType v1.IntegrationConditionType) bool {
 	cond := integration.Status.GetCondition(conditionType)
 	if cond == nil {
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index 9189dc6..f486ed7 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -371,9 +371,9 @@ var assets = func() http.FileSystem {
 		"/rbac/operator-role.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "operator-role.yaml",
 			modTime:          time.Time{},
-			uncompressedSize: 2311,
+			uncompressedSize: 2376,
 
-			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x95\x41\x8f\xdb\x36\x10\x85\xef\xfa\x15\x0f\xd6\x25\x29\xd6\x76\xdb\x53\xe1\x9e\xdc\x64\xb7\x35\x1a\xd8\xc0\xca\x69\x90\x23\x45\x8e\xe5\xe9\x52\x1c\x96\xa4\xec\x75\x7f\x7d\x41\xda\x6e\xbc\xf1\x2e\x90\x43\xd0\x54\x17\x0f\xa9\xd1\x9b\xef\x71\xc6\x52\x8d\xf1\xd7\xbb\xaa\x1a\xef\x58\x93\x8b\x64\x90\x04\x69\x4b\x98\x7b\xa5\xb7\x84\x46\x36\x69\xaf\x02\xe1\x4e\x06\x67\x54\x62\x71\x78\x35\x6f\xee\x5e\x63\x70\x86\x02\xc4\x [...]
+			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x55\xc1\x6e\xe3\x36\x10\xbd\xeb\x2b\x1e\xac\xcb\x6e\x11\xdb\x6d\x4f\x85\x7b\x72\xb3\x49\x6b\x74\x61\x03\x91\xb7\x8b\x3d\x52\xd4\x58\x9e\x86\xe2\xb0\x43\x2a\x8e\xfb\xf5\x05\x65\xbb\xeb\xac\x13\x20\x87\x45\xb7\xba\x78\x48\x8d\xde\xbc\x37\xf3\x4c\x96\x18\x7f\xbd\xa7\x28\xf1\x9e\x2d\xf9\x48\x0d\x92\x20\x6d\x09\xf3\x60\xec\x96\x50\xc9\x26\xed\x8c\x12\x6e\xa5\xf7\x8d\x49\x2c\x1e\x6f\xe6\xd5\xed\x5b\xf4\xbe\x21\x85\x78\x [...]
 		},
 		"/rbac/patch-role-to-clusterrole.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "patch-role-to-clusterrole.yaml",
diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go
index d4993b9..8c5a5d0 100644
--- a/pkg/trait/trait_types.go
+++ b/pkg/trait/trait_types.go
@@ -747,13 +747,17 @@ func (e *Environment) collectConfigurations(configurationType string) []map[stri
 	return collectConfigurations(configurationType, e.Platform, e.IntegrationKit, e.Integration)
 }
 
-func (e *Environment) GetIntegrationContainer() *corev1.Container {
+func (e *Environment) GetIntegrationContainerName() string {
 	containerName := defaultContainerName
 	dt := e.Catalog.GetTrait(containerTraitID)
 	if dt != nil {
 		containerName = dt.(*containerTrait).Name
 	}
+	return containerName
+}
 
+func (e *Environment) GetIntegrationContainer() *corev1.Container {
+	containerName := e.GetIntegrationContainerName()
 	return e.Resources.GetContainerByName(containerName)
 }