You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2021/11/17 09:36:56 UTC
[camel-k] 01/05: feat: Report runtime health checks into Integration readiness condition
This is an automated email from the ASF dual-hosted git repository.
astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit c55cad5c7179b6efc36a54c5b6e3de6c5f11cabd
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Tue Oct 26 14:51:56 2021 +0200
feat: Report runtime health checks into Integration readiness condition
---
config/rbac/operator-role.yaml | 6 ++
go.sum | 1 -
helm/camel-k/templates/operator-role.yaml | 6 ++
pkg/apis/camel/v1/integration_types.go | 2 +
pkg/controller/integration/health.go | 59 ++++++++++++++++
pkg/controller/integration/monitor.go | 110 ++++++++++++++++++++++++++----
pkg/resources/resources.go | 4 +-
pkg/trait/trait_types.go | 6 +-
8 files changed, 175 insertions(+), 19 deletions(-)
diff --git a/config/rbac/operator-role.yaml b/config/rbac/operator-role.yaml
index e619bbf..cc7fd7b 100644
--- a/config/rbac/operator-role.yaml
+++ b/config/rbac/operator-role.yaml
@@ -54,6 +54,12 @@ rules:
verbs:
- create
- apiGroups:
+ - ""
+ resources:
+ - pods/proxy
+ verbs:
+ - get
+- apiGroups:
- policy
resources:
- poddisruptionbudgets
diff --git a/go.sum b/go.sum
index d98d9d4..ea84466 100644
--- a/go.sum
+++ b/go.sum
@@ -1750,7 +1750,6 @@ k8s.io/code-generator v0.18.2/go.mod h1:+UHX5rSbxmR8kzS+FAv7um6dtYrZokQvjHpDSYRV
k8s.io/code-generator v0.18.6/go.mod h1:TgNEVx9hCyPGpdtCWA34olQYLkh3ok9ar7XfSsr8b6c=
k8s.io/code-generator v0.19.2/go.mod h1:moqLn7w0t9cMs4+5CQyxnfA/HV8MF6aAVENF+WZZhgk=
k8s.io/code-generator v0.21.1/go.mod h1:hUlps5+9QaTrKx+jiM4rmq7YmH8wPOIko64uZCHDh6Q=
-k8s.io/code-generator v0.21.4 h1:vO8jVuEGV4UF+/2s/88Qg05MokE/1QUFi/Q2YDgz++A=
k8s.io/code-generator v0.21.4/go.mod h1:K3y0Bv9Cz2cOW2vXUrNZlFbflhuPvuadW6JdnN6gGKo=
k8s.io/component-base v0.17.4/go.mod h1:5BRqHMbbQPm2kKu35v3G+CpVq4K0RJKC7TRioF0I9lE=
k8s.io/component-base v0.18.2/go.mod h1:kqLlMuhJNHQ9lz8Z7V5bxUUtjFZnrypArGl58gmDfUM=
diff --git a/helm/camel-k/templates/operator-role.yaml b/helm/camel-k/templates/operator-role.yaml
index 1ec9512..3afbe47 100644
--- a/helm/camel-k/templates/operator-role.yaml
+++ b/helm/camel-k/templates/operator-role.yaml
@@ -55,6 +55,12 @@ rules:
verbs:
- create
- apiGroups:
+ - ""
+ resources:
+ - pods/proxy
+ verbs:
+ - get
+- apiGroups:
- policy
resources:
- poddisruptionbudgets
diff --git a/pkg/apis/camel/v1/integration_types.go b/pkg/apis/camel/v1/integration_types.go
index 0729413..cafaa57 100644
--- a/pkg/apis/camel/v1/integration_types.go
+++ b/pkg/apis/camel/v1/integration_types.go
@@ -197,6 +197,8 @@ const (
IntegrationConditionLastJobSucceededReason string = "LastJobSucceeded"
// IntegrationConditionLastJobFailedReason --
IntegrationConditionLastJobFailedReason string = "LastJobFailed"
+ // IntegrationConditionRuntimeNotReadyReason --
+ IntegrationConditionRuntimeNotReadyReason string = "RuntimeNotReady"
// IntegrationConditionErrorReason --
IntegrationConditionErrorReason string = "Error"
diff --git a/pkg/controller/integration/health.go b/pkg/controller/integration/health.go
new file mode 100644
index 0000000..e14d241
--- /dev/null
+++ b/pkg/controller/integration/health.go
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/kubernetes"
+)
+
+type HealthCheckState string
+
+const (
+ HealthCheckStateDown HealthCheckState = "DOWN"
+ HealthCheckStateUp HealthCheckState = "UP"
+)
+
+type HealthCheck struct {
+ Status HealthCheckState `json:"state,omitempty"`
+ Checks []HealthCheckResponse `json:"checks,omitempty"`
+}
+
+type HealthCheckResponse struct {
+ Name string `json:"name,omitempty"`
+ Status HealthCheckState `json:"state,omitempty"`
+ Data map[string]interface{} `json:"data,omitempty"`
+}
+
+func proxyGetHTTPProbe(ctx context.Context, c kubernetes.Interface, p *corev1.Probe, pod *corev1.Pod) ([]byte, error) {
+ if p.HTTPGet == nil {
+ return nil, fmt.Errorf("missing probe handler for %s/%s", pod.Namespace, pod.Name)
+ }
+
+ probeCtx, cancel := context.WithTimeout(ctx, time.Duration(p.TimeoutSeconds)*time.Second)
+ defer cancel()
+ params := make(map[string]string)
+ return c.CoreV1().Pods(pod.Namespace).
+ ProxyGet(strings.ToLower(string(p.HTTPGet.Scheme)), pod.Name, p.HTTPGet.Port.String(), p.HTTPGet.Path, params).
+ DoRaw(probeCtx)
+}
diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index 2b01a7a..d4bd5ab 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -19,6 +19,7 @@ package integration
import (
"context"
+ "encoding/json"
"fmt"
"reflect"
"strconv"
@@ -28,6 +29,7 @@ import (
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
@@ -41,6 +43,9 @@ import (
"github.com/apache/camel-k/pkg/util/kubernetes"
)
+// The key used for propagating error details from Camel health to MicroProfile Health (See CAMEL-17138)
+const runtimeHealthCheckErrorMessage = "error.message"
+
func NewMonitorAction() Action {
return &monitorAction{}
}
@@ -161,6 +166,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra
func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error {
var controller ctrl.Object
var lastCompletedJob *batchv1.Job
+ var podSpec corev1.PodSpec
switch {
case isConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable):
@@ -189,6 +195,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
setReadyConditionError(integration, progressing.Message)
return nil
}
+ podSpec = c.Spec.Template.Spec
case *servingv1.Service:
// Check the KnativeService conditions
@@ -197,6 +204,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
setReadyConditionError(integration, ready.Message)
return nil
}
+ podSpec = c.Spec.Template.Spec.PodSpec
case *batchv1beta1.CronJob:
// Check latest job result
@@ -224,6 +232,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
}
}
}
+ podSpec = c.Spec.JobTemplate.Spec.Template.Spec
}
// Check Pods statuses
@@ -274,6 +283,26 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
integration.Status.Phase = v1.IntegrationPhaseRunning
+ var readyPods []corev1.Pod
+ var unreadyPods []corev1.Pod
+ for _, pod := range runningPods {
+ // We compare the Integration PodSpec to that of the Pod in order to make
+ // sure we account for up-to-date version.
+ if !equality.Semantic.DeepDerivative(podSpec, pod.Spec) {
+ continue
+ }
+ ready := kubernetes.GetPodCondition(pod, corev1.PodReady)
+ if ready == nil {
+ continue
+ }
+ switch ready.Status {
+ case corev1.ConditionTrue:
+ readyPods = append(readyPods, pod)
+ case corev1.ConditionFalse:
+ unreadyPods = append(unreadyPods, pod)
+ }
+ }
+
switch c := controller.(type) {
case *appsv1.Deployment:
replicas := int32(1)
@@ -282,27 +311,18 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
}
// The Deployment status reports updated and ready replicas separately,
// so that the number of ready replicas also accounts for older versions.
- // We compare the Integration PodSpec to that of the Pod in order to make
- // sure we account for up-to-date version.
- var readyPods []corev1.Pod
- for _, pod := range runningPods {
- if ready := kubernetes.GetPodCondition(pod, corev1.PodReady); ready == nil || ready.Status != corev1.ConditionTrue {
- continue
- }
- if equality.Semantic.DeepDerivative(c.Spec.Template.Spec, pod.Spec) {
- readyPods = append(readyPods, pod)
- }
- }
readyReplicas := int32(len(readyPods))
- // The Integration is considered ready when the number of replicas
- // reported to be ready is larger than or equal to the specified number
- // of replicas. This avoids reporting a falsy readiness condition
- // when the Integration is being down-scaled.
switch {
case readyReplicas >= replicas:
+ // The Integration is considered ready when the number of replicas
+ // reported to be ready is larger than or equal to the specified number
+ // of replicas. This avoids reporting a falsy readiness condition
+ // when the Integration is being down-scaled.
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", c.Status.ReadyReplicas, replicas))
+
case c.Status.UpdatedReplicas < replicas:
setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.Status.UpdatedReplicas, replicas))
+
default:
setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas))
}
@@ -319,19 +339,69 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
switch {
case c.Status.LastScheduleTime == nil:
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created")
+
case len(c.Status.Active) > 0:
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active")
+
case c.Spec.SuccessfulJobsHistoryLimit != nil && *c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil && *c.Spec.FailedJobsHistoryLimit == 0:
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available")
+
case lastCompletedJob != nil:
if complete := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue {
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name))
}
+
default:
integration.Status.SetCondition(v1.IntegrationConditionReady, corev1.ConditionUnknown, "", "")
}
}
+ // Finally, call the readiness probes of the non-ready Pods directly,
+ // to retrieve insights from the Camel runtime.
+ var runtimeNotReadyMessages []string
+ for _, pod := range unreadyPods {
+ if ready := kubernetes.GetPodCondition(pod, corev1.PodReady); ready.Reason != "ContainersNotReady" {
+ continue
+ }
+ container := getIntegrationContainer(environment, &pod)
+ if container == nil {
+ return fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name)
+ }
+ if probe := container.ReadinessProbe; probe != nil && probe.HTTPGet != nil {
+ body, err := proxyGetHTTPProbe(ctx, action.client, probe, &pod)
+ if err == nil {
+ continue
+ }
+ if !k8serrors.IsServiceUnavailable(err) {
+ return err
+ }
+ health := HealthCheck{}
+ err = json.Unmarshal(body, &health)
+ if err != nil {
+ return err
+ }
+ for _, check := range health.Checks {
+ if check.Name != "camel-readiness-checks" {
+ continue
+ }
+ if check.Status == HealthCheckStateUp {
+ continue
+ }
+ if _, ok := check.Data[runtimeHealthCheckErrorMessage]; ok {
+ integration.Status.Phase = v1.IntegrationPhaseError
+ }
+ runtimeNotReadyMessages = append(runtimeNotReadyMessages, fmt.Sprintf("Pod %s runtime is not ready: %s", pod.Name, check.Data))
+ }
+ }
+ }
+ if len(runtimeNotReadyMessages) > 0 {
+ reason := v1.IntegrationConditionRuntimeNotReadyReason
+ if integration.Status.Phase == v1.IntegrationPhaseError {
+ reason = v1.IntegrationConditionErrorReason
+ }
+ setReadyCondition(integration, corev1.ConditionFalse, reason, fmt.Sprintf("%s", runtimeNotReadyMessages))
+ }
+
return nil
}
@@ -357,6 +427,16 @@ func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit,
return kit, nil
}
+func getIntegrationContainer(environment *trait.Environment, pod *corev1.Pod) *corev1.Container {
+ name := environment.GetIntegrationContainerName()
+ for i, container := range pod.Spec.Containers {
+ if container.Name == name {
+ return &pod.Spec.Containers[i]
+ }
+ }
+ return nil
+}
+
func isConditionTrue(integration *v1.Integration, conditionType v1.IntegrationConditionType) bool {
cond := integration.Status.GetCondition(conditionType)
if cond == nil {
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index 9189dc6..f486ed7 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -371,9 +371,9 @@ var assets = func() http.FileSystem {
"/rbac/operator-role.yaml": &vfsgen۰CompressedFileInfo{
name: "operator-role.yaml",
modTime: time.Time{},
- uncompressedSize: 2311,
+ uncompressedSize: 2376,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x95\x41\x8f\xdb\x36\x10\x85\xef\xfa\x15\x0f\xd6\x25\x29\xd6\x76\xdb\x53\xe1\x9e\xdc\x64\xb7\x35\x1a\xd8\xc0\xca\x69\x90\x23\x45\x8e\xe5\xe9\x52\x1c\x96\xa4\xec\x75\x7f\x7d\x41\xda\x6e\xbc\xf1\x2e\x90\x43\xd0\x54\x17\x0f\xa9\xd1\x9b\xef\x71\xc6\x52\x8d\xf1\xd7\xbb\xaa\x1a\xef\x58\x93\x8b\x64\x90\x04\x69\x4b\x98\x7b\xa5\xb7\x84\x46\x36\x69\xaf\x02\xe1\x4e\x06\x67\x54\x62\x71\x78\x35\x6f\xee\x5e\x63\x70\x86\x02\xc4\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x55\xc1\x6e\xe3\x36\x10\xbd\xeb\x2b\x1e\xac\xcb\x6e\x11\xdb\x6d\x4f\x85\x7b\x72\xb3\x49\x6b\x74\x61\x03\x91\xb7\x8b\x3d\x52\xd4\x58\x9e\x86\xe2\xb0\x43\x2a\x8e\xfb\xf5\x05\x65\xbb\xeb\xac\x13\x20\x87\x45\xb7\xba\x78\x48\x8d\xde\xbc\x37\xf3\x4c\x96\x18\x7f\xbd\xa7\x28\xf1\x9e\x2d\xf9\x48\x0d\x92\x20\x6d\x09\xf3\x60\xec\x96\x50\xc9\x26\xed\x8c\x12\x6e\xa5\xf7\x8d\x49\x2c\x1e\x6f\xe6\xd5\xed\x5b\xf4\xbe\x21\x85\x78\x [...]
},
"/rbac/patch-role-to-clusterrole.yaml": &vfsgen۰CompressedFileInfo{
name: "patch-role-to-clusterrole.yaml",
diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go
index d4993b9..8c5a5d0 100644
--- a/pkg/trait/trait_types.go
+++ b/pkg/trait/trait_types.go
@@ -747,13 +747,17 @@ func (e *Environment) collectConfigurations(configurationType string) []map[stri
return collectConfigurations(configurationType, e.Platform, e.IntegrationKit, e.Integration)
}
-func (e *Environment) GetIntegrationContainer() *corev1.Container {
+func (e *Environment) GetIntegrationContainerName() string {
containerName := defaultContainerName
dt := e.Catalog.GetTrait(containerTraitID)
if dt != nil {
containerName = dt.(*containerTrait).Name
}
+ return containerName
+}
+func (e *Environment) GetIntegrationContainer() *corev1.Container {
+ containerName := e.GetIntegrationContainerName()
return e.Resources.GetContainerByName(containerName)
}