You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/01/12 08:15:04 UTC
[camel-k] 04/22: Fix #1107: generalize server side apply code and reuse
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 5a7e49cf585aa987549f76f9301c92b40323acba
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Wed Dec 15 11:16:46 2021 +0100
Fix #1107: generalize server side apply code and reuse
---
addons/keda/keda.go | 6 +--
pkg/client/client.go | 1 +
pkg/client/serverside.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++
pkg/install/kamelets.go | 86 +-------------------------------
pkg/util/test/client.go | 6 +++
5 files changed, 136 insertions(+), 87 deletions(-)
diff --git a/addons/keda/keda.go b/addons/keda/keda.go
index 65a8bd4..f59edd9 100644
--- a/addons/keda/keda.go
+++ b/addons/keda/keda.go
@@ -117,7 +117,7 @@ func (t *kedaTrait) getScaledObject(e *trait.Environment) (*kedav1alpha1.ScaledO
func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
ctrlRef := t.getTopControllerReference(e)
-
+ applier := e.Client.ServerOrClientSideApplier()
if ctrlRef.Kind == camelv1alpha1.KameletBindingKind {
// Update the KameletBinding directly (do not add it to env resources, it's the integration parent)
key := client.ObjectKey{
@@ -131,7 +131,7 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
if klb.Spec.Replicas == nil {
one := int32(1)
klb.Spec.Replicas = &one
- if err := e.Client.Update(e.Ctx, &klb); err != nil {
+ if err := applier.Apply(e.Ctx, &klb); err != nil {
return err
}
}
@@ -139,7 +139,7 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
if e.Integration.Spec.Replicas == nil {
one := int32(1)
e.Integration.Spec.Replicas = &one
- if err := e.Client.Update(e.Ctx, e.Integration); err != nil {
+ if err := applier.Apply(e.Ctx, e.Integration); err != nil {
return err
}
}
diff --git a/pkg/client/client.go b/pkg/client/client.go
index 3334e70..2cf73c2 100644
--- a/pkg/client/client.go
+++ b/pkg/client/client.go
@@ -63,6 +63,7 @@ type Client interface {
GetScheme() *runtime.Scheme
GetConfig() *rest.Config
GetCurrentNamespace(kubeConfig string) (string, error)
+ ServerOrClientSideApplier() ServerOrClientSideApplier
}
// Injectable identifies objects that can receive a Client.
diff --git a/pkg/client/serverside.go b/pkg/client/serverside.go
new file mode 100644
index 0000000..6efd758
--- /dev/null
+++ b/pkg/client/serverside.go
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package client
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strings"
+ "sync"
+ "sync/atomic"
+
+ "github.com/apache/camel-k/pkg/util/log"
+ "github.com/apache/camel-k/pkg/util/patch"
+ "github.com/pkg/errors"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/types"
+ ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+type ServerOrClientSideApplier struct {
+ Client ctrl.Client
+ hasServerSideApply atomic.Value
+ tryServerSideApply sync.Once
+}
+
+func (c *defaultClient) ServerOrClientSideApplier() ServerOrClientSideApplier {
+ return ServerOrClientSideApplier{
+ Client: c,
+ }
+}
+
+func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object ctrl.Object) error {
+ once := false
+ var err error
+ a.tryServerSideApply.Do(func() {
+ once = true
+ if err = a.serverSideApply(ctx, object); err != nil {
+ if isIncompatibleServerError(err) {
+ log.Info("Fallback to client-side apply for installing resources")
+ a.hasServerSideApply.Store(false)
+ err = nil
+ } else {
+ a.tryServerSideApply = sync.Once{}
+ }
+ } else {
+ a.hasServerSideApply.Store(true)
+ }
+ })
+ if err != nil {
+ return err
+ }
+ if v := a.hasServerSideApply.Load(); v.(bool) {
+ if !once {
+ return a.serverSideApply(ctx, object)
+ }
+ } else {
+ return a.clientSideApply(ctx, object)
+ }
+ return nil
+}
+
+func (a *ServerOrClientSideApplier) serverSideApply(ctx context.Context, resource ctrl.Object) error {
+ target, err := patch.PositiveApplyPatch(resource)
+ if err != nil {
+ return err
+ }
+ return a.Client.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+}
+
+func (a *ServerOrClientSideApplier) clientSideApply(ctx context.Context, resource ctrl.Object) error {
+ err := a.Client.Create(ctx, resource)
+ if err == nil {
+ return nil
+ } else if !k8serrors.IsAlreadyExists(err) {
+ return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+ }
+ object := &unstructured.Unstructured{}
+ object.SetNamespace(resource.GetNamespace())
+ object.SetName(resource.GetName())
+ object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+ err = a.Client.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
+ if err != nil {
+ return err
+ }
+ p, err := patch.PositiveMergePatch(object, resource)
+ if err != nil {
+ return err
+ } else if len(p) == 0 {
+ return nil
+ }
+ return a.Client.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
+}
+
+func isIncompatibleServerError(err error) bool {
+ // First simpler check for older servers (i.e. OpenShift 3.11)
+ if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+ return true
+ }
+ // 415: Unsupported media type means we're talking to a server which doesn't
+ // support server-side apply.
+ var serr *k8serrors.StatusError
+ if errors.As(err, &serr) {
+ return serr.Status().Code == http.StatusUnsupportedMediaType
+ }
+ // Non-StatusError means the error isn't because the server is incompatible.
+ return false
+}
diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go
index 82a818b..fc64e25 100644
--- a/pkg/install/kamelets.go
+++ b/pkg/install/kamelets.go
@@ -19,25 +19,16 @@ package install
import (
"context"
- "errors"
"fmt"
"io/fs"
- "net/http"
"os"
"path"
"path/filepath"
"strings"
- "sync"
- "sync/atomic"
"golang.org/x/sync/errgroup"
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
-
- ctrl "sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -45,7 +36,6 @@ import (
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/defaults"
"github.com/apache/camel-k/pkg/util/kubernetes"
- "github.com/apache/camel-k/pkg/util/patch"
)
const (
@@ -55,9 +45,6 @@ const (
var (
log = logf.Log
-
- hasServerSideApply atomic.Value
- tryServerSideApply sync.Once
)
// KameletCatalog installs the bundled Kamelets into the specified namespace.
@@ -77,7 +64,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
}
g, gCtx := errgroup.WithContext(ctx)
-
+ applier := c.ServerOrClientSideApplier()
err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error {
if err != nil {
return err
@@ -94,31 +81,9 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
if err != nil {
return err
}
- once := false
- tryServerSideApply.Do(func() {
- once = true
- if err = serverSideApply(gCtx, c, kamelet); err != nil {
- if isIncompatibleServerError(err) {
- log.Info("Fallback to client-side apply for installing bundled Kamelets")
- hasServerSideApply.Store(false)
- err = nil
- } else {
- tryServerSideApply = sync.Once{}
- }
- } else {
- hasServerSideApply.Store(true)
- }
- })
- if err != nil {
+ if err := applier.Apply(gCtx, kamelet); err != nil {
return err
}
- if v := hasServerSideApply.Load(); v.(bool) {
- if !once {
- return serverSideApply(gCtx, c, kamelet)
- }
- } else {
- return clientSideApply(gCtx, c, kamelet)
- }
return nil
})
return nil
@@ -130,53 +95,6 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
return g.Wait()
}
-func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error {
- target, err := patch.PositiveApplyPatch(resource)
- if err != nil {
- return err
- }
- return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
-}
-
-func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error {
- err := c.Create(ctx, resource)
- if err == nil {
- return nil
- } else if !k8serrors.IsAlreadyExists(err) {
- return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
- }
- object := &unstructured.Unstructured{}
- object.SetNamespace(resource.GetNamespace())
- object.SetName(resource.GetName())
- object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
- err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
- if err != nil {
- return err
- }
- p, err := patch.PositiveMergePatch(object, resource)
- if err != nil {
- return err
- } else if len(p) == 0 {
- return nil
- }
- return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
-}
-
-func isIncompatibleServerError(err error) bool {
- // First simpler check for older servers (i.e. OpenShift 3.11)
- if strings.Contains(err.Error(), "415: Unsupported Media Type") {
- return true
- }
- // 415: Unsupported media type means we're talking to a server which doesn't
- // support server-side apply.
- var serr *k8serrors.StatusError
- if errors.As(err, &serr) {
- return serr.Status().Code == http.StatusUnsupportedMediaType
- }
- // Non-StatusError means the error isn't because the server is incompatible.
- return false
-}
-
func loadKamelet(path string, namespace string, scheme *runtime.Scheme) (*v1alpha1.Kamelet, error) {
content, err := util.ReadFile(path)
if err != nil {
diff --git a/pkg/util/test/client.go b/pkg/util/test/client.go
index 50d32fb..b4f6db4 100644
--- a/pkg/util/test/client.go
+++ b/pkg/util/test/client.go
@@ -117,6 +117,12 @@ func (c *FakeClient) Discovery() discovery.DiscoveryInterface {
}
}
+func (c *FakeClient) ServerOrClientSideApplier() client.ServerOrClientSideApplier {
+ return client.ServerOrClientSideApplier{
+ Client: c,
+ }
+}
+
type FakeDiscovery struct {
discovery.DiscoveryInterface
}