You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2019/10/02 07:55:17 UTC
[camel-k] 02/05: feat: Enable integration scale sub-resource
This is an automated email from the ASF dual-hosted git repository.
astefanutti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 7d3c2ef7b9248414246a3e72cada73baab3b50b8
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Thu Sep 26 15:27:15 2019 +0200
feat: Enable integration scale sub-resource
---
deploy/crd-integration.yaml | 3 +
pkg/apis/camel/v1alpha1/integration_types.go | 1 +
pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go | 5 ++
.../integration/integration_controller.go | 28 ++++++++-
pkg/controller/integration/monitor.go | 48 ++++++++++++++-
pkg/trait/deployment.go | 37 +++++++++++-
pkg/trait/knative_service.go | 69 ++++++++++++++++++++--
7 files changed, 182 insertions(+), 9 deletions(-)
diff --git a/deploy/crd-integration.yaml b/deploy/crd-integration.yaml
index 0b0f905..94308df 100644
--- a/deploy/crd-integration.yaml
+++ b/deploy/crd-integration.yaml
@@ -27,6 +27,9 @@ spec:
version: v1alpha1
subresources:
status: {}
+ scale:
+ specReplicasPath: .spec.replicas
+ statusReplicasPath: .status.replicas
names:
kind: Integration
listKind: IntegrationList
diff --git a/pkg/apis/camel/v1alpha1/integration_types.go b/pkg/apis/camel/v1alpha1/integration_types.go
index 618c855..e4082e9 100644
--- a/pkg/apis/camel/v1alpha1/integration_types.go
+++ b/pkg/apis/camel/v1alpha1/integration_types.go
@@ -53,6 +53,7 @@ type IntegrationStatus struct {
Configuration []ConfigurationSpec `json:"configuration,omitempty"`
Conditions []IntegrationCondition `json:"conditions,omitempty"`
Version string `json:"version,omitempty"`
+ Replicas *int32 `json:"replicas,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
index e0298b8..8a4e3ea 100644
--- a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
@@ -942,6 +942,11 @@ func (in *IntegrationStatus) DeepCopyInto(out *IntegrationStatus) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
+ if in.Replicas != nil {
+ in, out := &in.Replicas, &out.Replicas
+ *out = new(int32)
+ **out = **in
+ }
return
}
diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go
index 4e1cd46..0923515 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -19,13 +19,13 @@ package integration
import (
"context"
+ appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
-
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -163,6 +163,30 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}
+ // Watch for ReplicaSet to reconcile replicas to the integration status
+ err = c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestsFromMapFunc{
+ ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
+ rs := a.Object.(*appsv1.ReplicaSet)
+ var requests []reconcile.Request
+
+ labels := rs.GetLabels()
+ integrationName, ok := labels["camel.apache.org/integration"]
+ if ok {
+ requests = append(requests, reconcile.Request{
+ NamespacedName: types.NamespacedName{
+ Namespace: rs.Namespace,
+ Name: integrationName,
+ },
+ })
+ }
+
+ return requests
+ }),
+ })
+ if err != nil {
+ return err
+ }
+
return nil
}
diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index 1da8bf0..9dbb169 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -20,7 +20,14 @@ package integration
import (
"context"
+ appsv1 "k8s.io/api/apps/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/selection"
+
+ k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
+
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/pkg/trait"
"github.com/apache/camel-k/pkg/util/defaults"
"github.com/apache/camel-k/pkg/util/digest"
)
@@ -58,6 +65,43 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1alpha1.I
return integration, nil
}
- // TODO check also if deployment matches (e.g. replicas)
- return nil, nil
+ // Run traits that are enabled for the running phase,
+ // such as the deployment and Knative service traits.
+ _, err = trait.Apply(ctx, action.client, integration, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ // Check replicas
+ replicaSets, err := action.getReplicaSetsForIntegration(ctx, integration)
+ if err != nil {
+ return nil, err
+ }
+ // And update the scale status accordingly
+ if len(replicaSets.Items) > 0 {
+ replicas := replicaSets.Items[0].Status.Replicas
+ if integration.Status.Replicas == nil || replicas != *integration.Status.Replicas {
+ integration.Status.Replicas = &replicas
+ }
+ }
+
+ return integration, nil
+}
+
+func (action *monitorAction) getReplicaSetsForIntegration(ctx context.Context, integration *v1alpha1.Integration) (*appsv1.ReplicaSetList, error) {
+ byIntegrationLabel, err := labels.NewRequirement("camel.apache.org/integration", selection.Equals, []string{integration.Name})
+ if err != nil {
+ return nil, err
+ }
+ selector := labels.NewSelector().Add(*byIntegrationLabel)
+
+ options := k8sclient.ListOptions{
+ Namespace: integration.Namespace,
+ LabelSelector: selector,
+ }
+ list := &appsv1.ReplicaSetList{}
+ if err := action.client.List(ctx, &options, list); err != nil {
+ return nil, err
+ }
+ return list, nil
}
diff --git a/pkg/trait/deployment.go b/pkg/trait/deployment.go
index b867366..ccb31ee 100644
--- a/pkg/trait/deployment.go
+++ b/pkg/trait/deployment.go
@@ -18,10 +18,15 @@ limitations under the License.
package trait
import (
- "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+ "context"
+
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
)
type deploymentTrait struct {
@@ -47,6 +52,11 @@ func (t *deploymentTrait) Configure(e *Environment) (bool, error) {
return false, nil
}
+ if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
+ condition := e.Integration.Status.GetCondition(v1alpha1.IntegrationConditionDeploymentAvailable)
+ return condition != nil && condition.Status == corev1.ConditionTrue, nil
+ }
+
enabled := false
if e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying) {
@@ -106,6 +116,31 @@ func (t *deploymentTrait) Apply(e *Environment) error {
v1alpha1.IntegrationConditionDeploymentAvailableReason,
depl.Name,
)
+
+ return nil
+ }
+
+ if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
+ // Do not reconcile the deployment if no replicas is set
+ if e.Integration.Spec.Replicas == nil {
+ return nil
+ }
+ // Otherwise set the deployment replicas if different
+ deployment := &appsv1.Deployment{}
+ err := t.client.Get(context.TODO(), client.ObjectKey{Namespace: e.Integration.Namespace, Name: e.Integration.Name}, deployment)
+ if err != nil {
+ return err
+ }
+ replicas := *e.Integration.Spec.Replicas
+ if *deployment.Spec.Replicas != replicas {
+ *deployment.Spec.Replicas = replicas
+ err := t.client.Update(context.TODO(), deployment)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
}
return nil
diff --git a/pkg/trait/knative_service.go b/pkg/trait/knative_service.go
index 0c395ae..5516597 100644
--- a/pkg/trait/knative_service.go
+++ b/pkg/trait/knative_service.go
@@ -18,16 +18,20 @@ limitations under the License.
package trait
import (
+ "context"
"strconv"
- "github.com/apache/camel-k/pkg/util/kubernetes"
+ "sigs.k8s.io/controller-runtime/pkg/client"
- "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
- "github.com/apache/camel-k/pkg/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
serving "knative.dev/serving/pkg/apis/serving/v1beta1"
+
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/pkg/metadata"
+ "github.com/apache/camel-k/pkg/util/kubernetes"
)
const (
@@ -67,7 +71,10 @@ func (t *knativeServiceTrait) Configure(e *Environment) (bool, error) {
return false, nil
}
- if !e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseDeploying) {
+ if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
+ condition := e.Integration.Status.GetCondition(v1alpha1.IntegrationConditionKnativeServiceAvailable)
+ return condition != nil && condition.Status == corev1.ConditionTrue, nil
+ } else if !e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseDeploying) {
return false, nil
}
@@ -135,6 +142,60 @@ func (t *knativeServiceTrait) Configure(e *Environment) (bool, error) {
}
func (t *knativeServiceTrait) Apply(e *Environment) error {
+ if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
+ // Do not reconcile the Knative if no replicas is set
+ if e.Integration.Spec.Replicas == nil {
+ return nil
+ }
+ // Otherwise set the Knative scale annotations
+ replicas := int(*e.Integration.Spec.Replicas)
+
+ service := &serving.Service{}
+ err := t.client.Get(context.TODO(), client.ObjectKey{Namespace: e.Integration.Namespace, Name: e.Integration.Name}, service)
+ if err != nil {
+ return err
+ }
+
+ isUpdateRequired := false
+ minScale, ok := service.Spec.Template.Annotations[knativeServingMinScaleAnnotation]
+ if ok {
+ min, err := strconv.Atoi(minScale)
+ if err != nil {
+ return err
+ }
+ if min != replicas {
+ isUpdateRequired = true
+ }
+ } else {
+ isUpdateRequired = true
+ }
+
+ maxScale, ok := service.Spec.Template.Annotations[knativeServingMaxScaleAnnotation]
+ if ok {
+ max, err := strconv.Atoi(maxScale)
+ if err != nil {
+ return err
+ }
+ if max != replicas {
+ isUpdateRequired = true
+ }
+ } else {
+ isUpdateRequired = true
+ }
+
+ if isUpdateRequired {
+ scale := strconv.Itoa(replicas)
+ service.Spec.Template.Annotations[knativeServingMinScaleAnnotation] = scale
+ service.Spec.Template.Annotations[knativeServingMaxScaleAnnotation] = scale
+ err := t.client.Update(context.TODO(), service)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+ }
+
ksvc := t.getServiceFor(e)
maps := e.ComputeConfigMaps()