You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2019/10/02 07:55:17 UTC

[camel-k] 02/05: feat: Enable integration scale sub-resource

This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 7d3c2ef7b9248414246a3e72cada73baab3b50b8
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Thu Sep 26 15:27:15 2019 +0200

    feat: Enable integration scale sub-resource
---
 deploy/crd-integration.yaml                        |  3 +
 pkg/apis/camel/v1alpha1/integration_types.go       |  1 +
 pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go   |  5 ++
 .../integration/integration_controller.go          | 28 ++++++++-
 pkg/controller/integration/monitor.go              | 48 ++++++++++++++-
 pkg/trait/deployment.go                            | 37 +++++++++++-
 pkg/trait/knative_service.go                       | 69 ++++++++++++++++++++--
 7 files changed, 182 insertions(+), 9 deletions(-)

diff --git a/deploy/crd-integration.yaml b/deploy/crd-integration.yaml
index 0b0f905..94308df 100644
--- a/deploy/crd-integration.yaml
+++ b/deploy/crd-integration.yaml
@@ -27,6 +27,9 @@ spec:
   version: v1alpha1
   subresources:
     status: {}
+    scale:
+      specReplicasPath: .spec.replicas
+      statusReplicasPath: .status.replicas
   names:
     kind: Integration
     listKind: IntegrationList
diff --git a/pkg/apis/camel/v1alpha1/integration_types.go b/pkg/apis/camel/v1alpha1/integration_types.go
index 618c855..e4082e9 100644
--- a/pkg/apis/camel/v1alpha1/integration_types.go
+++ b/pkg/apis/camel/v1alpha1/integration_types.go
@@ -53,6 +53,7 @@ type IntegrationStatus struct {
 	Configuration    []ConfigurationSpec    `json:"configuration,omitempty"`
 	Conditions       []IntegrationCondition `json:"conditions,omitempty"`
 	Version          string                 `json:"version,omitempty"`
+	Replicas         *int32                 `json:"replicas,omitempty"`
 }
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
index e0298b8..8a4e3ea 100644
--- a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
@@ -942,6 +942,11 @@ func (in *IntegrationStatus) DeepCopyInto(out *IntegrationStatus) {
 			(*in)[i].DeepCopyInto(&(*out)[i])
 		}
 	}
+	if in.Replicas != nil {
+		in, out := &in.Replicas, &out.Replicas
+		*out = new(int32)
+		**out = **in
+	}
 	return
 }
 
diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go
index 4e1cd46..0923515 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -19,13 +19,13 @@ package integration
 import (
 	"context"
 
+	appsv1 "k8s.io/api/apps/v1"
 	"k8s.io/apimachinery/pkg/api/errors"
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/types"
 
-	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
-
 	"sigs.k8s.io/controller-runtime/pkg/controller"
 	"sigs.k8s.io/controller-runtime/pkg/event"
 	"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -163,6 +163,30 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
 		return err
 	}
 
+	// Watch for ReplicaSet to reconcile replicas to the integration status
+	err = c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestsFromMapFunc{
+		ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
+			rs := a.Object.(*appsv1.ReplicaSet)
+			var requests []reconcile.Request
+
+			labels := rs.GetLabels()
+			integrationName, ok := labels["camel.apache.org/integration"]
+			if ok {
+				requests = append(requests, reconcile.Request{
+					NamespacedName: types.NamespacedName{
+						Namespace: rs.Namespace,
+						Name:      integrationName,
+					},
+				})
+			}
+
+			return requests
+		}),
+	})
+	if err != nil {
+		return err
+	}
+
 	return nil
 }
 
diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index 1da8bf0..9dbb169 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -20,7 +20,14 @@ package integration
 import (
 	"context"
 
+	appsv1 "k8s.io/api/apps/v1"
+	"k8s.io/apimachinery/pkg/labels"
+	"k8s.io/apimachinery/pkg/selection"
+
+	k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
+
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/trait"
 	"github.com/apache/camel-k/pkg/util/defaults"
 	"github.com/apache/camel-k/pkg/util/digest"
 )
@@ -58,6 +65,43 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1alpha1.I
 		return integration, nil
 	}
 
-	// TODO check also if deployment matches (e.g. replicas)
-	return nil, nil
+	// Run traits that are enabled for the running phase,
+	// such as the deployment and Knative service traits.
+	_, err = trait.Apply(ctx, action.client, integration, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	// Check replicas
+	replicaSets, err := action.getReplicaSetsForIntegration(ctx, integration)
+	if err != nil {
+		return nil, err
+	}
+	// And update the scale status accordingly
+	if len(replicaSets.Items) > 0 {
+		replicas := replicaSets.Items[0].Status.Replicas
+		if integration.Status.Replicas == nil || replicas != *integration.Status.Replicas {
+			integration.Status.Replicas = &replicas
+		}
+	}
+
+	return integration, nil
+}
+
+func (action *monitorAction) getReplicaSetsForIntegration(ctx context.Context, integration *v1alpha1.Integration) (*appsv1.ReplicaSetList, error) {
+	byIntegrationLabel, err := labels.NewRequirement("camel.apache.org/integration", selection.Equals, []string{integration.Name})
+	if err != nil {
+		return nil, err
+	}
+	selector := labels.NewSelector().Add(*byIntegrationLabel)
+
+	options := k8sclient.ListOptions{
+		Namespace:     integration.Namespace,
+		LabelSelector: selector,
+	}
+	list := &appsv1.ReplicaSetList{}
+	if err := action.client.List(ctx, &options, list); err != nil {
+		return nil, err
+	}
+	return list, nil
 }
diff --git a/pkg/trait/deployment.go b/pkg/trait/deployment.go
index b867366..ccb31ee 100644
--- a/pkg/trait/deployment.go
+++ b/pkg/trait/deployment.go
@@ -18,10 +18,15 @@ limitations under the License.
 package trait
 
 import (
-	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"context"
+
 	appsv1 "k8s.io/api/apps/v1"
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	"sigs.k8s.io/controller-runtime/pkg/client"
+
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 )
 
 type deploymentTrait struct {
@@ -47,6 +52,11 @@ func (t *deploymentTrait) Configure(e *Environment) (bool, error) {
 		return false, nil
 	}
 
+	if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
+		condition := e.Integration.Status.GetCondition(v1alpha1.IntegrationConditionDeploymentAvailable)
+		return condition != nil && condition.Status == corev1.ConditionTrue, nil
+	}
+
 	enabled := false
 
 	if e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying) {
@@ -106,6 +116,31 @@ func (t *deploymentTrait) Apply(e *Environment) error {
 			v1alpha1.IntegrationConditionDeploymentAvailableReason,
 			depl.Name,
 		)
+
+		return nil
+	}
+
+	if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
+		// Do not reconcile the deployment if no replicas is set
+		if e.Integration.Spec.Replicas == nil {
+			return nil
+		}
+		// Otherwise set the deployment replicas if different
+		deployment := &appsv1.Deployment{}
+		err := t.client.Get(context.TODO(), client.ObjectKey{Namespace: e.Integration.Namespace, Name: e.Integration.Name}, deployment)
+		if err != nil {
+			return err
+		}
+		replicas := *e.Integration.Spec.Replicas
+		if *deployment.Spec.Replicas != replicas {
+			*deployment.Spec.Replicas = replicas
+			err := t.client.Update(context.TODO(), deployment)
+			if err != nil {
+				return err
+			}
+		}
+
+		return nil
 	}
 
 	return nil
diff --git a/pkg/trait/knative_service.go b/pkg/trait/knative_service.go
index 0c395ae..5516597 100644
--- a/pkg/trait/knative_service.go
+++ b/pkg/trait/knative_service.go
@@ -18,16 +18,20 @@ limitations under the License.
 package trait
 
 import (
+	"context"
 	"strconv"
 
-	"github.com/apache/camel-k/pkg/util/kubernetes"
+	"sigs.k8s.io/controller-runtime/pkg/client"
 
-	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
-	"github.com/apache/camel-k/pkg/metadata"
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
 	servingv1 "knative.dev/serving/pkg/apis/serving/v1"
 	serving "knative.dev/serving/pkg/apis/serving/v1beta1"
+
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/metadata"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
 )
 
 const (
@@ -67,7 +71,10 @@ func (t *knativeServiceTrait) Configure(e *Environment) (bool, error) {
 		return false, nil
 	}
 
-	if !e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseDeploying) {
+	if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
+		condition := e.Integration.Status.GetCondition(v1alpha1.IntegrationConditionKnativeServiceAvailable)
+		return condition != nil && condition.Status == corev1.ConditionTrue, nil
+	} else if !e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseDeploying) {
 		return false, nil
 	}
 
@@ -135,6 +142,60 @@ func (t *knativeServiceTrait) Configure(e *Environment) (bool, error) {
 }
 
 func (t *knativeServiceTrait) Apply(e *Environment) error {
+	if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
+		// Do not reconcile the Knative if no replicas is set
+		if e.Integration.Spec.Replicas == nil {
+			return nil
+		}
+		// Otherwise set the Knative scale annotations
+		replicas := int(*e.Integration.Spec.Replicas)
+
+		service := &serving.Service{}
+		err := t.client.Get(context.TODO(), client.ObjectKey{Namespace: e.Integration.Namespace, Name: e.Integration.Name}, service)
+		if err != nil {
+			return err
+		}
+
+		isUpdateRequired := false
+		minScale, ok := service.Spec.Template.Annotations[knativeServingMinScaleAnnotation]
+		if ok {
+			min, err := strconv.Atoi(minScale)
+			if err != nil {
+				return err
+			}
+			if min != replicas {
+				isUpdateRequired = true
+			}
+		} else {
+			isUpdateRequired = true
+		}
+
+		maxScale, ok := service.Spec.Template.Annotations[knativeServingMaxScaleAnnotation]
+		if ok {
+			max, err := strconv.Atoi(maxScale)
+			if err != nil {
+				return err
+			}
+			if max != replicas {
+				isUpdateRequired = true
+			}
+		} else {
+			isUpdateRequired = true
+		}
+
+		if isUpdateRequired {
+			scale := strconv.Itoa(replicas)
+			service.Spec.Template.Annotations[knativeServingMinScaleAnnotation] = scale
+			service.Spec.Template.Annotations[knativeServingMaxScaleAnnotation] = scale
+			err := t.client.Update(context.TODO(), service)
+			if err != nil {
+				return err
+			}
+		}
+
+		return nil
+	}
+
 	ksvc := t.getServiceFor(e)
 	maps := e.ComputeConfigMaps()