You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/01/12 08:15:04 UTC

[camel-k] 04/22: Fix #1107: generalize server side apply code and reuse

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 5a7e49cf585aa987549f76f9301c92b40323acba
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Wed Dec 15 11:16:46 2021 +0100

    Fix #1107: generalize server side apply code and reuse
---
 addons/keda/keda.go      |   6 +--
 pkg/client/client.go     |   1 +
 pkg/client/serverside.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++
 pkg/install/kamelets.go  |  86 +-------------------------------
 pkg/util/test/client.go  |   6 +++
 5 files changed, 136 insertions(+), 87 deletions(-)

diff --git a/addons/keda/keda.go b/addons/keda/keda.go
index 65a8bd4..f59edd9 100644
--- a/addons/keda/keda.go
+++ b/addons/keda/keda.go
@@ -117,7 +117,7 @@ func (t *kedaTrait) getScaledObject(e *trait.Environment) (*kedav1alpha1.ScaledO
 
 func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
 	ctrlRef := t.getTopControllerReference(e)
-
+	applier := e.Client.ServerOrClientSideApplier()
 	if ctrlRef.Kind == camelv1alpha1.KameletBindingKind {
 		// Update the KameletBinding directly (do not add it to env resources, it's the integration parent)
 		key := client.ObjectKey{
@@ -131,7 +131,7 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
 		if klb.Spec.Replicas == nil {
 			one := int32(1)
 			klb.Spec.Replicas = &one
-			if err := e.Client.Update(e.Ctx, &klb); err != nil {
+			if err := applier.Apply(e.Ctx, &klb); err != nil {
 				return err
 			}
 		}
@@ -139,7 +139,7 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
 		if e.Integration.Spec.Replicas == nil {
 			one := int32(1)
 			e.Integration.Spec.Replicas = &one
-			if err := e.Client.Update(e.Ctx, e.Integration); err != nil {
+			if err := applier.Apply(e.Ctx, e.Integration); err != nil {
 				return err
 			}
 		}
diff --git a/pkg/client/client.go b/pkg/client/client.go
index 3334e70..2cf73c2 100644
--- a/pkg/client/client.go
+++ b/pkg/client/client.go
@@ -63,6 +63,7 @@ type Client interface {
 	GetScheme() *runtime.Scheme
 	GetConfig() *rest.Config
 	GetCurrentNamespace(kubeConfig string) (string, error)
+	ServerOrClientSideApplier() ServerOrClientSideApplier
 }
 
 // Injectable identifies objects that can receive a Client.
diff --git a/pkg/client/serverside.go b/pkg/client/serverside.go
new file mode 100644
index 0000000..6efd758
--- /dev/null
+++ b/pkg/client/serverside.go
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package client
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"strings"
+	"sync"
+	"sync/atomic"
+
+	"github.com/apache/camel-k/pkg/util/log"
+	"github.com/apache/camel-k/pkg/util/patch"
+	"github.com/pkg/errors"
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+	"k8s.io/apimachinery/pkg/types"
+	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+type ServerOrClientSideApplier struct {
+	Client             ctrl.Client
+	hasServerSideApply atomic.Value
+	tryServerSideApply sync.Once
+}
+
+func (c *defaultClient) ServerOrClientSideApplier() ServerOrClientSideApplier {
+	return ServerOrClientSideApplier{
+		Client: c,
+	}
+}
+
+func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object ctrl.Object) error {
+	once := false
+	var err error
+	a.tryServerSideApply.Do(func() {
+		once = true
+		if err = a.serverSideApply(ctx, object); err != nil {
+			if isIncompatibleServerError(err) {
+				log.Info("Fallback to client-side apply for installing resources")
+				a.hasServerSideApply.Store(false)
+				err = nil
+			} else {
+				a.tryServerSideApply = sync.Once{}
+			}
+		} else {
+			a.hasServerSideApply.Store(true)
+		}
+	})
+	if err != nil {
+		return err
+	}
+	if v := a.hasServerSideApply.Load(); v.(bool) {
+		if !once {
+			return a.serverSideApply(ctx, object)
+		}
+	} else {
+		return a.clientSideApply(ctx, object)
+	}
+	return nil
+}
+
+func (a *ServerOrClientSideApplier) serverSideApply(ctx context.Context, resource ctrl.Object) error {
+	target, err := patch.PositiveApplyPatch(resource)
+	if err != nil {
+		return err
+	}
+	return a.Client.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+}
+
+func (a *ServerOrClientSideApplier) clientSideApply(ctx context.Context, resource ctrl.Object) error {
+	err := a.Client.Create(ctx, resource)
+	if err == nil {
+		return nil
+	} else if !k8serrors.IsAlreadyExists(err) {
+		return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+	}
+	object := &unstructured.Unstructured{}
+	object.SetNamespace(resource.GetNamespace())
+	object.SetName(resource.GetName())
+	object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+	err = a.Client.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
+	if err != nil {
+		return err
+	}
+	p, err := patch.PositiveMergePatch(object, resource)
+	if err != nil {
+		return err
+	} else if len(p) == 0 {
+		return nil
+	}
+	return a.Client.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
+}
+
+func isIncompatibleServerError(err error) bool {
+	// First simpler check for older servers (i.e. OpenShift 3.11)
+	if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+		return true
+	}
+	// 415: Unsupported media type means we're talking to a server which doesn't
+	// support server-side apply.
+	var serr *k8serrors.StatusError
+	if errors.As(err, &serr) {
+		return serr.Status().Code == http.StatusUnsupportedMediaType
+	}
+	// Non-StatusError means the error isn't because the server is incompatible.
+	return false
+}
diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go
index 82a818b..fc64e25 100644
--- a/pkg/install/kamelets.go
+++ b/pkg/install/kamelets.go
@@ -19,25 +19,16 @@ package install
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"io/fs"
-	"net/http"
 	"os"
 	"path"
 	"path/filepath"
 	"strings"
-	"sync"
-	"sync/atomic"
 
 	"golang.org/x/sync/errgroup"
 
-	k8serrors "k8s.io/apimachinery/pkg/api/errors"
-	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 	"k8s.io/apimachinery/pkg/runtime"
-	"k8s.io/apimachinery/pkg/types"
-
-	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 	logf "sigs.k8s.io/controller-runtime/pkg/log"
 
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -45,7 +36,6 @@ import (
 	"github.com/apache/camel-k/pkg/util"
 	"github.com/apache/camel-k/pkg/util/defaults"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
-	"github.com/apache/camel-k/pkg/util/patch"
 )
 
 const (
@@ -55,9 +45,6 @@ const (
 
 var (
 	log = logf.Log
-
-	hasServerSideApply atomic.Value
-	tryServerSideApply sync.Once
 )
 
 // KameletCatalog installs the bundled Kamelets into the specified namespace.
@@ -77,7 +64,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 	}
 
 	g, gCtx := errgroup.WithContext(ctx)
-
+	applier := c.ServerOrClientSideApplier()
 	err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error {
 		if err != nil {
 			return err
@@ -94,31 +81,9 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 			if err != nil {
 				return err
 			}
-			once := false
-			tryServerSideApply.Do(func() {
-				once = true
-				if err = serverSideApply(gCtx, c, kamelet); err != nil {
-					if isIncompatibleServerError(err) {
-						log.Info("Fallback to client-side apply for installing bundled Kamelets")
-						hasServerSideApply.Store(false)
-						err = nil
-					} else {
-						tryServerSideApply = sync.Once{}
-					}
-				} else {
-					hasServerSideApply.Store(true)
-				}
-			})
-			if err != nil {
+			if err := applier.Apply(gCtx, kamelet); err != nil {
 				return err
 			}
-			if v := hasServerSideApply.Load(); v.(bool) {
-				if !once {
-					return serverSideApply(gCtx, c, kamelet)
-				}
-			} else {
-				return clientSideApply(gCtx, c, kamelet)
-			}
 			return nil
 		})
 		return nil
@@ -130,53 +95,6 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 	return g.Wait()
 }
 
-func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error {
-	target, err := patch.PositiveApplyPatch(resource)
-	if err != nil {
-		return err
-	}
-	return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
-}
-
-func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error {
-	err := c.Create(ctx, resource)
-	if err == nil {
-		return nil
-	} else if !k8serrors.IsAlreadyExists(err) {
-		return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
-	}
-	object := &unstructured.Unstructured{}
-	object.SetNamespace(resource.GetNamespace())
-	object.SetName(resource.GetName())
-	object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
-	err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
-	if err != nil {
-		return err
-	}
-	p, err := patch.PositiveMergePatch(object, resource)
-	if err != nil {
-		return err
-	} else if len(p) == 0 {
-		return nil
-	}
-	return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
-}
-
-func isIncompatibleServerError(err error) bool {
-	// First simpler check for older servers (i.e. OpenShift 3.11)
-	if strings.Contains(err.Error(), "415: Unsupported Media Type") {
-		return true
-	}
-	// 415: Unsupported media type means we're talking to a server which doesn't
-	// support server-side apply.
-	var serr *k8serrors.StatusError
-	if errors.As(err, &serr) {
-		return serr.Status().Code == http.StatusUnsupportedMediaType
-	}
-	// Non-StatusError means the error isn't because the server is incompatible.
-	return false
-}
-
 func loadKamelet(path string, namespace string, scheme *runtime.Scheme) (*v1alpha1.Kamelet, error) {
 	content, err := util.ReadFile(path)
 	if err != nil {
diff --git a/pkg/util/test/client.go b/pkg/util/test/client.go
index 50d32fb..b4f6db4 100644
--- a/pkg/util/test/client.go
+++ b/pkg/util/test/client.go
@@ -117,6 +117,12 @@ func (c *FakeClient) Discovery() discovery.DiscoveryInterface {
 	}
 }
 
+func (c *FakeClient) ServerOrClientSideApplier() client.ServerOrClientSideApplier {
+	return client.ServerOrClientSideApplier{
+		Client: c,
+	}
+}
+
 type FakeDiscovery struct {
 	discovery.DiscoveryInterface
 }