You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2022/05/31 04:56:31 UTC

[camel-k] branch release-1.9.x updated (8bd3fe893 -> 80fd27e07)

This is an automated email from the ASF dual-hosted git repository.

tsato pushed a change to branch release-1.9.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git


    from 8bd3fe893 test(e2e): add test for 'kamel run --dev' in a warmed-up environment
     new 204af03e7 chore(controller): refactor integration monitor
     new 80fd27e07 fix(health): support new Camel health check format

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 e2e/common/traits/health_test.go                 |   2 +-
 pkg/controller/integration/health.go             |  15 +-
 pkg/controller/integration/health_test.go        |  87 ++++++++++
 pkg/controller/integration/monitor.go            | 201 ++++++++---------------
 pkg/controller/integration/monitor_cronjob.go    | 104 ++++++++++++
 pkg/controller/integration/monitor_deployment.go |  77 +++++++++
 pkg/controller/integration/monitor_knative.go    |  60 +++++++
 7 files changed, 414 insertions(+), 132 deletions(-)
 create mode 100644 pkg/controller/integration/health_test.go
 create mode 100644 pkg/controller/integration/monitor_cronjob.go
 create mode 100644 pkg/controller/integration/monitor_deployment.go
 create mode 100644 pkg/controller/integration/monitor_knative.go


[camel-k] 02/02: fix(health): support new Camel health check format

Posted by ts...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch release-1.9.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 80fd27e07e5d0507c9749ba6a9aef6177c7d0fa4
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Fri May 27 20:30:22 2022 +0900

    fix(health): support new Camel health check format
    
    Fix #2886
---
 e2e/common/traits/health_test.go          |  2 +-
 pkg/controller/integration/health.go      | 15 +++++-
 pkg/controller/integration/health_test.go | 87 +++++++++++++++++++++++++++++++
 pkg/controller/integration/monitor.go     |  7 +--
 4 files changed, 102 insertions(+), 9 deletions(-)

diff --git a/e2e/common/traits/health_test.go b/e2e/common/traits/health_test.go
index d17e5a98d..d3679b3a9 100644
--- a/e2e/common/traits/health_test.go
+++ b/e2e/common/traits/health_test.go
@@ -99,7 +99,7 @@ func TestHealthTrait(t *testing.T) {
 			//
 			Eventually(IntegrationCondition(ns, "java", v1.IntegrationConditionReady), TestTimeoutMedium).Should(And(
 				WithTransform(IntegrationConditionReason, Equal(v1.IntegrationConditionRuntimeNotReadyReason)),
-				WithTransform(IntegrationConditionMessage, HavePrefix(fmt.Sprintf("[Pod %s runtime is not ready: map[consumer:route1:DOWN context:UP", pod.Name))),
+				WithTransform(IntegrationConditionMessage, HavePrefix(fmt.Sprintf("[Pod %s runtime is not ready: map[route.context.name:camel-1 route.id:route1 route.status:Stopped]", pod.Name))),
 			))
 			// Check the Integration is still in running phase
 			Eventually(IntegrationPhase(ns, "java"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning))
diff --git a/pkg/controller/integration/health.go b/pkg/controller/integration/health.go
index e56b10add..731e2eacb 100644
--- a/pkg/controller/integration/health.go
+++ b/pkg/controller/integration/health.go
@@ -19,6 +19,7 @@ package integration
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"strconv"
 	"strings"
@@ -37,16 +38,26 @@ const (
 )
 
 type HealthCheck struct {
-	Status HealthCheckState      `json:"state,omitempty"`
+	Status HealthCheckState      `json:"status,omitempty"`
 	Checks []HealthCheckResponse `json:"checks,omitempty"`
 }
 
 type HealthCheckResponse struct {
 	Name   string                 `json:"name,omitempty"`
-	Status HealthCheckState       `json:"state,omitempty"`
+	Status HealthCheckState       `json:"status,omitempty"`
 	Data   map[string]interface{} `json:"data,omitempty"`
 }
 
+func NewHealthCheck(body []byte) (*HealthCheck, error) {
+	health := HealthCheck{}
+	err := json.Unmarshal(body, &health)
+	if err != nil {
+		return nil, err
+	}
+
+	return &health, nil
+}
+
 func proxyGetHTTPProbe(ctx context.Context, c kubernetes.Interface, p *corev1.Probe, pod *corev1.Pod, container *corev1.Container) ([]byte, error) {
 	if p.HTTPGet == nil {
 		return nil, fmt.Errorf("missing probe handler for %s/%s", pod.Namespace, pod.Name)
diff --git a/pkg/controller/integration/health_test.go b/pkg/controller/integration/health_test.go
new file mode 100644
index 000000000..ab7c2fc89
--- /dev/null
+++ b/pkg/controller/integration/health_test.go
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestNewHealthCheck(t *testing.T) {
+	body := []byte(`
+		{
+			"status": "DOWN",
+			"checks": [
+			{
+				"name": "camel-routes",
+				"status": "DOWN",
+				"data": {
+					"route.id": "route1",
+					"route.context.name": "camel-1",
+					"route.status": "Stopped"
+				}
+			},
+			{
+				"name": "context",
+				"status": "UP",
+				"data": {
+					"context.name": "camel-1",
+					"context.version": "3.16.0",
+					"context.status": "Started"
+				}
+			},
+			{
+				"name": "camel-consumers",
+				"status": "DOWN",
+				"data": {
+					"route.id": "route1",
+					"route.context.name": "camel-1",
+					"route.status": "Stopped"
+				}
+			}
+			]
+		}
+	`)
+	health, err := NewHealthCheck(body)
+	assert.NoError(t, err)
+	assert.Equal(t, HealthCheckStateDown, health.Status)
+	assert.Len(t, health.Checks, 3)
+	assert.Equal(t, "camel-routes", health.Checks[0].Name)
+	assert.Equal(t, HealthCheckStateDown, health.Checks[0].Status)
+	assert.True(t, reflect.DeepEqual(health.Checks[0].Data, map[string]interface{}{
+		"route.id":           "route1",
+		"route.context.name": "camel-1",
+		"route.status":       "Stopped",
+	}))
+	assert.Equal(t, "context", health.Checks[1].Name)
+	assert.Equal(t, HealthCheckStateUp, health.Checks[1].Status)
+	assert.True(t, reflect.DeepEqual(health.Checks[1].Data, map[string]interface{}{
+		"context.name":    "camel-1",
+		"context.version": "3.16.0",
+		"context.status":  "Started",
+	}))
+	assert.Equal(t, "camel-consumers", health.Checks[2].Name)
+	assert.Equal(t, HealthCheckStateDown, health.Checks[2].Status)
+	assert.True(t, reflect.DeepEqual(health.Checks[2].Data, map[string]interface{}{
+		"route.id":           "route1",
+		"route.context.name": "camel-1",
+		"route.status":       "Stopped",
+	}))
+}
diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index 0dfb695b3..91ea73f63 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -19,7 +19,6 @@ package integration
 
 import (
 	"context"
-	"encoding/json"
 	"errors"
 	"fmt"
 	"reflect"
@@ -339,15 +338,11 @@ func (action *monitorAction) probeReadiness(ctx context.Context, environment *tr
 				runtimeNotReadyMessages = append(runtimeNotReadyMessages, fmt.Sprintf("readiness probe failed for Pod %s/%s: %s", pod.Namespace, pod.Name, err.Error()))
 				continue
 			}
-			health := HealthCheck{}
-			err = json.Unmarshal(body, &health)
+			health, err := NewHealthCheck(body)
 			if err != nil {
 				return err
 			}
 			for _, check := range health.Checks {
-				if check.Name != "camel-readiness-checks" {
-					continue
-				}
 				if check.Status == HealthCheckStateUp {
 					continue
 				}


[camel-k] 01/02: chore(controller): refactor integration monitor

Posted by ts...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch release-1.9.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 204af03e7bd04c6d5a733fa974d6138f9e4b5a91
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Fri May 27 18:02:39 2022 +0900

    chore(controller): refactor integration monitor
---
 pkg/controller/integration/monitor.go            | 194 +++++++++--------------
 pkg/controller/integration/monitor_cronjob.go    | 104 ++++++++++++
 pkg/controller/integration/monitor_deployment.go |  77 +++++++++
 pkg/controller/integration/monitor_knative.go    |  60 +++++++
 4 files changed, 312 insertions(+), 123 deletions(-)

diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index e0877e734..0dfb695b3 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -26,7 +26,6 @@ import (
 	"strconv"
 
 	appsv1 "k8s.io/api/apps/v1"
-	batchv1 "k8s.io/api/batch/v1"
 	batchv1beta1 "k8s.io/api/batch/v1beta1"
 	corev1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/api/equality"
@@ -164,85 +163,87 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra
 	return integration, nil
 }
 
-func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error {
-	var controller ctrl.Object
-	var lastCompletedJob *batchv1.Job
-	var podSpec corev1.PodSpec
+type controller interface {
+	checkReadyCondition() (bool, error)
+	getPodSpec() corev1.PodSpec
+	updateReadyCondition(readyPods []corev1.Pod) bool
+}
 
+func (action *monitorAction) newController(ctx context.Context, env *trait.Environment, integration *v1.Integration) (controller, error) {
+	var controller controller
+	var obj ctrl.Object
 	switch {
 	case isConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable):
-		controller = &appsv1.Deployment{}
+		obj = getUpdatedController(env, &appsv1.Deployment{})
+		controller = &deploymentController{
+			obj:         obj.(*appsv1.Deployment),
+			integration: integration,
+		}
 	case isConditionTrue(integration, v1.IntegrationConditionKnativeServiceAvailable):
-		controller = &servingv1.Service{}
+		obj = getUpdatedController(env, &servingv1.Service{})
+		controller = &knativeServiceController{
+			obj:         obj.(*servingv1.Service),
+			integration: integration,
+		}
 	case isConditionTrue(integration, v1.IntegrationConditionCronJobAvailable):
-		controller = &batchv1beta1.CronJob{}
+		obj = getUpdatedController(env, &batchv1beta1.CronJob{})
+		controller = &cronJobController{
+			obj:         obj.(*batchv1beta1.CronJob),
+			integration: integration,
+			client:      action.client,
+			context:     ctx,
+		}
 	default:
-		return fmt.Errorf("unsupported controller for integration %s", integration.Name)
+		return nil, fmt.Errorf("unsupported controller for integration %s", integration.Name)
 	}
 
-	// Retrieve the controller updated from the deployer trait execution
-	controller = environment.Resources.GetController(func(object ctrl.Object) bool {
-		return reflect.TypeOf(controller) == reflect.TypeOf(object)
+	if obj == nil {
+		return nil, fmt.Errorf("unable to retrieve controller for integration %s", integration.Name)
+	}
+
+	return controller, nil
+}
+
+// getUpdatedController retrieves the controller updated from the deployer trait execution.
+func getUpdatedController(env *trait.Environment, obj ctrl.Object) ctrl.Object {
+	return env.Resources.GetController(func(object ctrl.Object) bool {
+		return reflect.TypeOf(obj) == reflect.TypeOf(object)
 	})
-	if controller == nil {
-		return fmt.Errorf("unable to retrieve controller for integration %s", integration.Name)
+}
+
+func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error {
+	controller, err := action.newController(ctx, environment, integration)
+	if err != nil {
+		return err
 	}
 
-	switch c := controller.(type) {
-	case *appsv1.Deployment:
-		// Check the Deployment progression
-		if progressing := kubernetes.GetDeploymentCondition(*c, appsv1.DeploymentProgressing); progressing != nil && progressing.Status == corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" {
-			integration.Status.Phase = v1.IntegrationPhaseError
-			setReadyConditionError(integration, progressing.Message)
-			return nil
-		}
-		podSpec = c.Spec.Template.Spec
+	if done, err := controller.checkReadyCondition(); done || err != nil {
+		return err
+	}
+	if done := checkPodStatuses(integration, pendingPods, runningPods); done {
+		return nil
+	}
+	integration.Status.Phase = v1.IntegrationPhaseRunning
 
-	case *servingv1.Service:
-		// Check the KnativeService conditions
-		if ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == "RevisionFailed" {
-			integration.Status.Phase = v1.IntegrationPhaseError
-			setReadyConditionError(integration, ready.Message)
-			return nil
-		}
-		podSpec = c.Spec.Template.Spec.PodSpec
-
-	case *batchv1beta1.CronJob:
-		// Check latest job result
-		if lastScheduleTime := c.Status.LastScheduleTime; lastScheduleTime != nil && len(c.Status.Active) == 0 {
-			jobs := batchv1.JobList{}
-			if err := action.client.List(ctx, &jobs,
-				ctrl.InNamespace(integration.Namespace),
-				ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
-			); err != nil {
-				return err
-			}
-			t := lastScheduleTime.Time
-			for i, job := range jobs.Items {
-				if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) {
-					continue
-				}
-				lastCompletedJob = &jobs.Items[i]
-				t = lastCompletedJob.CreationTimestamp.Time
-			}
-			if lastCompletedJob != nil {
-				if failed := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue {
-					setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", lastCompletedJob.Name, failed.Message))
-					integration.Status.Phase = v1.IntegrationPhaseError
-					return nil
-				}
-			}
-		}
-		podSpec = c.Spec.JobTemplate.Spec.Template.Spec
+	readyPods, unreadyPods := filterPodsByReadyStatus(runningPods, controller.getPodSpec())
+	if done := controller.updateReadyCondition(readyPods); done {
+		return nil
+	}
+	if err := action.probeReadiness(ctx, environment, integration, unreadyPods); err != nil {
+		return err
 	}
 
+	return nil
+}
+
+func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool {
 	// Check Pods statuses
 	for _, pod := range pendingPods {
 		// Check the scheduled condition
 		if scheduled := kubernetes.GetPodCondition(pod, corev1.PodScheduled); scheduled != nil && scheduled.Status == corev1.ConditionFalse && scheduled.Reason == "Unschedulable" {
 			integration.Status.Phase = v1.IntegrationPhaseError
 			setReadyConditionError(integration, scheduled.Message)
-			return nil
+			return true
 		}
 	}
 	// Check pending container statuses
@@ -255,7 +256,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 			if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "ImagePullBackOff" {
 				integration.Status.Phase = v1.IntegrationPhaseError
 				setReadyConditionError(integration, waiting.Message)
-				return nil
+				return true
 			}
 		}
 	}
@@ -272,18 +273,20 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 			if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "CrashLoopBackOff" {
 				integration.Status.Phase = v1.IntegrationPhaseError
 				setReadyConditionError(integration, waiting.Message)
-				return nil
+				return true
 			}
 			if terminated := container.State.Terminated; terminated != nil && terminated.Reason == "Error" {
 				integration.Status.Phase = v1.IntegrationPhaseError
 				setReadyConditionError(integration, terminated.Message)
-				return nil
+				return true
 			}
 		}
 	}
 
-	integration.Status.Phase = v1.IntegrationPhaseRunning
+	return false
+}
 
+func filterPodsByReadyStatus(runningPods []corev1.Pod, podSpec corev1.PodSpec) ([]corev1.Pod, []corev1.Pod) {
 	var readyPods []corev1.Pod
 	var unreadyPods []corev1.Pod
 	for _, pod := range runningPods {
@@ -308,66 +311,11 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
 		}
 	}
 
-	switch c := controller.(type) {
-	case *appsv1.Deployment:
-		replicas := int32(1)
-		if r := integration.Spec.Replicas; r != nil {
-			replicas = *r
-		}
-		// The Deployment status reports updated and ready replicas separately,
-		// so that the number of ready replicas also accounts for older versions.
-		readyReplicas := int32(len(readyPods))
-		switch {
-		case readyReplicas >= replicas:
-			// The Integration is considered ready when the number of replicas
-			// reported to be ready is larger than or equal to the specified number
-			// of replicas. This avoids reporting a falsy readiness condition
-			// when the Integration is being down-scaled.
-			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas))
-			return nil
-
-		case c.Status.UpdatedReplicas < replicas:
-			setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.Status.UpdatedReplicas, replicas))
-
-		default:
-			setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas))
-		}
-
-	case *servingv1.Service:
-		ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady)
-		if ready.IsTrue() {
-			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceReadyReason, "")
-			return nil
-		}
-		setReadyCondition(integration, corev1.ConditionFalse, ready.GetReason(), ready.GetMessage())
-
-	case *batchv1beta1.CronJob:
-		switch {
-		case c.Status.LastScheduleTime == nil:
-			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created")
-			return nil
-
-		case len(c.Status.Active) > 0:
-			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active")
-			return nil
-
-		case c.Spec.SuccessfulJobsHistoryLimit != nil && *c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil && *c.Spec.FailedJobsHistoryLimit == 0:
-			setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available")
-			return nil
-
-		case lastCompletedJob != nil:
-			if complete := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue {
-				setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name))
-				return nil
-			}
-
-		default:
-			integration.Status.SetCondition(v1.IntegrationConditionReady, corev1.ConditionUnknown, "", "")
-		}
-	}
+	return readyPods, unreadyPods
+}
 
-	// Finally, call the readiness probes of the non-ready Pods directly,
-	// to retrieve insights from the Camel runtime.
+// probeReadiness calls the readiness probes of the non-ready Pods directly to retrieve insights from the Camel runtime.
+func (action *monitorAction) probeReadiness(ctx context.Context, environment *trait.Environment, integration *v1.Integration, unreadyPods []corev1.Pod) error {
 	var runtimeNotReadyMessages []string
 	for i := range unreadyPods {
 		pod := &unreadyPods[i]
diff --git a/pkg/controller/integration/monitor_cronjob.go b/pkg/controller/integration/monitor_cronjob.go
new file mode 100644
index 000000000..8a024df16
--- /dev/null
+++ b/pkg/controller/integration/monitor_cronjob.go
@@ -0,0 +1,104 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+	"context"
+	"fmt"
+
+	batchv1 "k8s.io/api/batch/v1"
+	batchv1beta1 "k8s.io/api/batch/v1beta1"
+	corev1 "k8s.io/api/core/v1"
+
+	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+
+	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	"github.com/apache/camel-k/pkg/client"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
+)
+
+type cronJobController struct {
+	obj              *batchv1beta1.CronJob
+	integration      *v1.Integration
+	client           client.Client
+	context          context.Context
+	lastCompletedJob *batchv1.Job
+}
+
+var _ controller = &cronJobController{}
+
+func (c *cronJobController) checkReadyCondition() (bool, error) {
+	// Check latest job result
+	if lastScheduleTime := c.obj.Status.LastScheduleTime; lastScheduleTime != nil && len(c.obj.Status.Active) == 0 {
+		jobs := batchv1.JobList{}
+		if err := c.client.List(c.context, &jobs,
+			ctrl.InNamespace(c.integration.Namespace),
+			ctrl.MatchingLabels{v1.IntegrationLabel: c.integration.Name},
+		); err != nil {
+			return true, err
+		}
+		t := lastScheduleTime.Time
+		for i, job := range jobs.Items {
+			if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) {
+				continue
+			}
+			c.lastCompletedJob = &jobs.Items[i]
+			t = c.lastCompletedJob.CreationTimestamp.Time
+		}
+		if c.lastCompletedJob != nil {
+			if failed := kubernetes.GetJobCondition(*c.lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue {
+				setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", c.lastCompletedJob.Name, failed.Message))
+				c.integration.Status.Phase = v1.IntegrationPhaseError
+				return true, nil
+			}
+		}
+	}
+
+	return false, nil
+}
+
+func (c *cronJobController) getPodSpec() corev1.PodSpec {
+	return c.obj.Spec.JobTemplate.Spec.Template.Spec
+}
+
+func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool {
+	switch {
+	case c.obj.Status.LastScheduleTime == nil:
+		setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created")
+		return true
+
+	case len(c.obj.Status.Active) > 0:
+		setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active")
+		return true
+
+	case c.obj.Spec.SuccessfulJobsHistoryLimit != nil && *c.obj.Spec.SuccessfulJobsHistoryLimit == 0 && c.obj.Spec.FailedJobsHistoryLimit != nil && *c.obj.Spec.FailedJobsHistoryLimit == 0:
+		setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available")
+		return true
+
+	case c.lastCompletedJob != nil:
+		if complete := kubernetes.GetJobCondition(*c.lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue {
+			setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", c.lastCompletedJob.Name))
+			return true
+		}
+
+	default:
+		setReadyCondition(c.integration, corev1.ConditionUnknown, "", "")
+	}
+
+	return false
+}
diff --git a/pkg/controller/integration/monitor_deployment.go b/pkg/controller/integration/monitor_deployment.go
new file mode 100644
index 000000000..9cf748ff6
--- /dev/null
+++ b/pkg/controller/integration/monitor_deployment.go
@@ -0,0 +1,77 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+	"fmt"
+
+	appsv1 "k8s.io/api/apps/v1"
+	corev1 "k8s.io/api/core/v1"
+
+	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
+)
+
+type deploymentController struct {
+	obj         *appsv1.Deployment
+	integration *v1.Integration
+}
+
+var _ controller = &deploymentController{}
+
+func (c *deploymentController) checkReadyCondition() (bool, error) {
+	// Check the Deployment progression
+	if progressing := kubernetes.GetDeploymentCondition(*c.obj, appsv1.DeploymentProgressing); progressing != nil && progressing.Status == corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" {
+		c.integration.Status.Phase = v1.IntegrationPhaseError
+		setReadyConditionError(c.integration, progressing.Message)
+		return true, nil
+	}
+
+	return false, nil
+}
+
+func (c *deploymentController) getPodSpec() corev1.PodSpec {
+	return c.obj.Spec.Template.Spec
+}
+
+func (c *deploymentController) updateReadyCondition(readyPods []corev1.Pod) bool {
+	replicas := int32(1)
+	if r := c.integration.Spec.Replicas; r != nil {
+		replicas = *r
+	}
+	// The Deployment status reports updated and ready replicas separately,
+	// so that the number of ready replicas also accounts for older versions.
+	readyReplicas := int32(len(readyPods))
+	switch {
+	case readyReplicas >= replicas:
+		// The Integration is considered ready when the number of replicas
+		// reported to be ready is larger than or equal to the specified number
+		// of replicas. This avoids reporting a falsy readiness condition
+		// when the Integration is being down-scaled.
+		setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas))
+		return true
+
+	case c.obj.Status.UpdatedReplicas < replicas:
+		setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.obj.Status.UpdatedReplicas, replicas))
+
+	default:
+		setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas))
+	}
+
+	return false
+}
diff --git a/pkg/controller/integration/monitor_knative.go b/pkg/controller/integration/monitor_knative.go
new file mode 100644
index 000000000..cf8d09860
--- /dev/null
+++ b/pkg/controller/integration/monitor_knative.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+	corev1 "k8s.io/api/core/v1"
+
+	servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+
+	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
+)
+
+type knativeServiceController struct {
+	obj         *servingv1.Service
+	integration *v1.Integration
+}
+
+var _ controller = &knativeServiceController{}
+
+func (c *knativeServiceController) checkReadyCondition() (bool, error) {
+	// Check the KnativeService conditions
+	if ready := kubernetes.GetKnativeServiceCondition(*c.obj, servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == "RevisionFailed" {
+		c.integration.Status.Phase = v1.IntegrationPhaseError
+		setReadyConditionError(c.integration, ready.Message)
+		return true, nil
+	}
+
+	return false, nil
+}
+
+func (c *knativeServiceController) getPodSpec() corev1.PodSpec {
+	return c.obj.Spec.Template.Spec.PodSpec
+}
+
+func (c *knativeServiceController) updateReadyCondition(readyPods []corev1.Pod) bool {
+	ready := kubernetes.GetKnativeServiceCondition(*c.obj, servingv1.ServiceConditionReady)
+	if ready.IsTrue() {
+		setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceReadyReason, "")
+		return true
+	}
+	setReadyCondition(c.integration, corev1.ConditionFalse, ready.GetReason(), ready.GetMessage())
+
+	return false
+}