You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/01/12 08:15:21 UTC

[camel-k] 21/22: Fix #1107: disable applier code to detect real CI errors

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 885d2bd99ca5ce46bad901df5082d6e424a1500b
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Mon Jan 10 14:43:43 2022 +0100

    Fix #1107: disable applier code to detect real CI errors
---
 pkg/install/kamelets.go    |  94 +++++++++++++++++++++++++++++++++++++--
 pkg/resources/resources.go |  12 ++---
 pkg/trait/deployer.go      | 108 ++++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 203 insertions(+), 11 deletions(-)

diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go
index 4ff4572..82a818b 100644
--- a/pkg/install/kamelets.go
+++ b/pkg/install/kamelets.go
@@ -19,21 +19,33 @@ package install
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"io/fs"
+	"net/http"
 	"os"
 	"path"
 	"path/filepath"
 	"strings"
+	"sync"
+	"sync/atomic"
 
 	"golang.org/x/sync/errgroup"
 
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/types"
+
+	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+	logf "sigs.k8s.io/controller-runtime/pkg/log"
+
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/client"
 	"github.com/apache/camel-k/pkg/util"
 	"github.com/apache/camel-k/pkg/util/defaults"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
-	"k8s.io/apimachinery/pkg/runtime"
+	"github.com/apache/camel-k/pkg/util/patch"
 )
 
 const (
@@ -41,6 +53,13 @@ const (
 	defaultKameletDir = "/kamelets/"
 )
 
+var (
+	log = logf.Log
+
+	hasServerSideApply atomic.Value
+	tryServerSideApply sync.Once
+)
+
 // KameletCatalog installs the bundled Kamelets into the specified namespace.
 func KameletCatalog(ctx context.Context, c client.Client, namespace string) error {
 	kameletDir := os.Getenv(kameletDirEnv)
@@ -58,7 +77,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 	}
 
 	g, gCtx := errgroup.WithContext(ctx)
-	applier := c.ServerOrClientSideApplier()
+
 	err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error {
 		if err != nil {
 			return err
@@ -75,9 +94,31 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 			if err != nil {
 				return err
 			}
-			if err := applier.Apply(gCtx, kamelet); err != nil {
+			once := false
+			tryServerSideApply.Do(func() {
+				once = true
+				if err = serverSideApply(gCtx, c, kamelet); err != nil {
+					if isIncompatibleServerError(err) {
+						log.Info("Fallback to client-side apply for installing bundled Kamelets")
+						hasServerSideApply.Store(false)
+						err = nil
+					} else {
+						tryServerSideApply = sync.Once{}
+					}
+				} else {
+					hasServerSideApply.Store(true)
+				}
+			})
+			if err != nil {
 				return err
 			}
+			if v := hasServerSideApply.Load(); v.(bool) {
+				if !once {
+					return serverSideApply(gCtx, c, kamelet)
+				}
+			} else {
+				return clientSideApply(gCtx, c, kamelet)
+			}
 			return nil
 		})
 		return nil
@@ -89,6 +130,53 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 	return g.Wait()
 }
 
+func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error {
+	target, err := patch.PositiveApplyPatch(resource)
+	if err != nil {
+		return err
+	}
+	return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+}
+
+func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error {
+	err := c.Create(ctx, resource)
+	if err == nil {
+		return nil
+	} else if !k8serrors.IsAlreadyExists(err) {
+		return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+	}
+	object := &unstructured.Unstructured{}
+	object.SetNamespace(resource.GetNamespace())
+	object.SetName(resource.GetName())
+	object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+	err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
+	if err != nil {
+		return err
+	}
+	p, err := patch.PositiveMergePatch(object, resource)
+	if err != nil {
+		return err
+	} else if len(p) == 0 {
+		return nil
+	}
+	return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
+}
+
+func isIncompatibleServerError(err error) bool {
+	// First simpler check for older servers (i.e. OpenShift 3.11)
+	if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+		return true
+	}
+	// 415: Unsupported media type means we're talking to a server which doesn't
+	// support server-side apply.
+	var serr *k8serrors.StatusError
+	if errors.As(err, &serr) {
+		return serr.Status().Code == http.StatusUnsupportedMediaType
+	}
+	// Non-StatusError means the error isn't because the server is incompatible.
+	return false
+}
+
 func loadKamelet(path string, namespace string, scheme *runtime.Scheme) (*v1alpha1.Kamelet, error) {
 	content, err := util.ReadFile(path)
 	if err != nil {
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index e64bea2..bcc7095 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -145,16 +145,16 @@ var assets = func() http.FileSystem {
 		"/crd/bases/camel.apache.org_integrations.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "camel.apache.org_integrations.yaml",
 			modTime:          time.Time{},
-			uncompressedSize: 366985,
+			uncompressedSize: 367530,
 
-			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x8a\x53\x4e\xea\x93\xb4\x11\x29\x3b\x99\x9d\xbb\xe3\x3b\xf5\xa5\x34\x92\x9c\xd5\x8d\x2d\xab\x2c\x25\xf9\x52\x4e\x36\x0b\x76\x83\x24\x56\xdd\x40\x2f\x80\xa6\xcc\xbd\xbe\xff\xfb\x2d\x1c\x00\xfd\xe0\xab\x81\x16\xe9\x38\x53\x8d\xa9\x9a\x98\x14\xfb\x34\x1e\xe7\x7d\x0e\xce\xf9\x12\x46\xfb\x1b\x5f\x7c\x09\xaf\x59\x42\xb9\xa2\x29\x68\x01\x7a\x4e\xe1\xbc\x20\xc9\x9c\xc2\x9d\x98\xea\x [...]
+			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x8a\x53\x4e\xea\x93\xb4\x11\x29\x3b\x99\x9d\xbb\xe3\x3b\xf5\xa5\x34\x92\x9c\xd5\x8d\x2d\xab\x2c\x25\xf9\x52\x4e\x36\x0b\x76\x83\x24\x56\xdd\x40\x2f\x80\xa6\xcc\xbd\xbe\xff\xfb\x2d\x1c\x00\xfd\xe0\xab\x81\x16\xe9\x38\x53\x8d\xa9\x9a\x98\x14\xfb\x34\x1e\xe7\x7d\x0e\xce\xf9\x12\x46\xfb\x1b\x5f\x7c\x09\xaf\x59\x42\xb9\xa2\x29\x68\x01\x7a\x4e\xe1\xbc\x20\xc9\x9c\xc2\x9d\x98\xea\x [...]
 		},
 		"/crd/bases/camel.apache.org_kameletbindings.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "camel.apache.org_kameletbindings.yaml",
 			modTime:          time.Time{},
-			uncompressedSize: 432125,
+			uncompressedSize: 432720,
 
-			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x0a\x94\x9c\xfa\x24\x6d\x44\xca\xce\xcc\xce\xdd\xf1\x9d\xfa\x52\x1a\x59\xce\xe8\xc6\x96\x59\x96\xe2\x7c\x29\x27\x9b\x05\xbb\x41\x12\xab\x6e\xa0\x17\x40\x53\xe2\x5e\xdf\xff\xfd\x16\x0e\x80\x7e\xf0\x25\x9c\xa6\xa8\x28\x3b\x8d\xa9\x9a\x98\x22\xfb\x34\x5e\xe7\xfd\x7a\x41\x06\x8f\x37\xbe\x7a\x41\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\xce\x0a\x9a\xcc\x18\xb9\x96\x13\x73\x47\x [...]
+			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x0a\x94\x9c\xfa\x24\x6d\x44\xca\xce\xcc\xce\xdd\xf1\x9d\xfa\x52\x1a\x59\xce\xe8\xc6\x96\x59\x96\xe2\x7c\x29\x27\x9b\x05\xbb\x41\x12\xab\x6e\xa0\x17\x40\x53\xe2\x5e\xdf\xff\xfd\x16\x0e\x80\x7e\xf0\x25\x9c\xa6\xa8\x28\x3b\x8d\xa9\x9a\x98\x22\xfb\x34\x5e\xe7\xfd\x7a\x41\x06\x8f\x37\xbe\x7a\x41\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\xce\x0a\x9a\xcc\x18\xb9\x96\x13\x73\x47\x [...]
 		},
 		"/crd/bases/camel.apache.org_kamelets.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "camel.apache.org_kamelets.yaml",
@@ -555,9 +555,9 @@ var assets = func() http.FileSystem {
 		"/traits.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "traits.yaml",
 			modTime:          time.Time{},
-			uncompressedSize: 49341,
+			uncompressedSize: 50652,
 
-			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x7d\xfd\x73\x5b\xb9\x91\xe0\xef\xf3\x57\xa0\xb4\x57\x65\x49\x45\x52\x9e\xc9\x26\x3b\xa7\xbb\xd9\x94\xc6\x76\x12\xcd\xf8\x43\x67\x3b\xb3\x97\x9a\x9b\x0a\xc1\xf7\x9a\x24\xcc\x47\xe0\x05\xc0\x93\xcc\xdc\xde\xff\x7e\x85\xee\xc6\xc7\x7b\x24\x25\xca\xb6\x66\xa3\xad\xdd\x54\xed\x58\xd2\x03\xd0\x68\x34\xfa\xbb\x1b\xde\x4a\xe5\xdd\xf9\x57\x63\xa1\xe5\x1a\xce\x85\x9c\xcf\x95\x56\x7e\xf3\x95\x10\x6d\x23\xfd\xdc\xd8\xf5\xb9\x [...]
+			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\x7d\x73\x1c\xb9\x91\x27\xfc\xff\x7c\x0a\x04\xfd\x44\x88\x64\x74\x37\x35\xe3\xb5\x3d\x0f\xef\xb4\x3e\x8e\x24\xdb\x9c\xd1\x0b\x4f\x92\xc7\xe7\xd0\x29\xdc\xe8\xaa\xec\x6e\xa8\xab\x81\x32\x80\x22\xd5\x3e\xdf\x77\xbf\x40\x66\xe2\xa5\xaa\x9b\x64\x53\x12\x67\xcd\x8d\x5d\x47\xec\x88\x64\x01\x48\x24\x12\x89\x44\xe6\x2f\x13\xde\x4a\xe5\xdd\xe9\x37\x63\xa1\xe5\x1a\x4e\x85\x9c\xcf\x95\x56\x7e\xf3\x8d\x10\x6d\x23\xfd\xdc\x [...]
 		},
 	}
 	fs["/"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
diff --git a/pkg/trait/deployer.go b/pkg/trait/deployer.go
index 7735a37..67cdb79 100644
--- a/pkg/trait/deployer.go
+++ b/pkg/trait/deployer.go
@@ -17,6 +17,22 @@ limitations under the License.
 
 package trait
 
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"net/http"
+	"strings"
+
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+	"k8s.io/apimachinery/pkg/types"
+
+	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+
+	"github.com/apache/camel-k/pkg/util/patch"
+)
+
 // The deployer trait is responsible for deploying the resources owned by the integration, and can be used
 // to explicitly select the underlying controller that will manage the integration pods.
 //
@@ -29,6 +45,8 @@ type deployerTrait struct {
 
 var _ ControllerStrategySelector = &deployerTrait{}
 
+var hasServerSideApply = true
+
 func newDeployerTrait() Trait {
 	return &deployerTrait{
 		BaseTrait: NewBaseTrait("deployer", 900),
@@ -42,9 +60,28 @@ func (t *deployerTrait) Configure(e *Environment) (bool, error) {
 func (t *deployerTrait) Apply(e *Environment) error {
 	// Register a post action that patches the resources generated by the traits
 	e.PostActions = append(e.PostActions, func(env *Environment) error {
-		applier := e.Client.ServerOrClientSideApplier()
 		for _, resource := range env.Resources.Items() {
-			if err := applier.Apply(e.Ctx, resource); err != nil {
+			// We assume that server-side apply is enabled by default.
+			// It is currently convoluted to check pro-actively whether server-side apply
+			// is enabled. This is possible to fetch the OpenAPI endpoint, which returns
+			// the entire server API document, then lookup the resource PATCH endpoint, and
+			// check its list of accepted MIME types.
+			// As a simpler solution, we fall back to client-side apply at the first
+			// 415 error, and assume server-side apply is not available globally.
+			if hasServerSideApply {
+				err := t.serverSideApply(env, resource)
+				switch {
+				case err == nil:
+					continue
+				case isIncompatibleServerError(err):
+					t.L.Info("Fallback to client-side apply to patch resources")
+					hasServerSideApply = false
+				default:
+					// Keep server-side apply unless server is incompatible with it
+					return err
+				}
+			}
+			if err := t.clientSideApply(env, resource); err != nil {
 				return err
 			}
 		}
@@ -54,6 +91,73 @@ func (t *deployerTrait) Apply(e *Environment) error {
 	return nil
 }
 
+func (t *deployerTrait) serverSideApply(env *Environment, resource ctrl.Object) error {
+	target, err := patch.PositiveApplyPatch(resource)
+	if err != nil {
+		return err
+	}
+	err = env.Client.Patch(env.Ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+	if err != nil {
+		return fmt.Errorf("error during apply resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+	}
+	// Update the resource with the response returned from the API server
+	return t.unstructuredToRuntimeObject(target, resource)
+}
+
+func (t *deployerTrait) clientSideApply(env *Environment, resource ctrl.Object) error {
+	err := env.Client.Create(env.Ctx, resource)
+	if err == nil {
+		return nil
+	} else if !k8serrors.IsAlreadyExists(err) {
+		return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+	}
+	object := &unstructured.Unstructured{}
+	object.SetNamespace(resource.GetNamespace())
+	object.SetName(resource.GetName())
+	object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+	err = env.Client.Get(env.Ctx, ctrl.ObjectKeyFromObject(object), object)
+	if err != nil {
+		return err
+	}
+	p, err := patch.PositiveMergePatch(object, resource)
+	if err != nil {
+		return err
+	} else if len(p) == 0 {
+		// Update the resource with the object returned from the API server
+		return t.unstructuredToRuntimeObject(object, resource)
+	}
+	err = env.Client.Patch(env.Ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
+	if err != nil {
+		return fmt.Errorf("error during patch %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+	}
+	return nil
+}
+
+func (t *deployerTrait) unstructuredToRuntimeObject(u *unstructured.Unstructured, obj ctrl.Object) error {
+	data, err := json.Marshal(u)
+	if err != nil {
+		return err
+	}
+	return json.Unmarshal(data, obj)
+}
+
+func isIncompatibleServerError(err error) bool {
+	// First simpler check for older servers (i.e. OpenShift 3.11)
+	if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+		return true
+	}
+
+	// 415: Unsupported media type means we're talking to a server which doesn't
+	// support server-side apply.
+	var serr *k8serrors.StatusError
+	if errors.As(err, &serr) {
+		return serr.Status().Code == http.StatusUnsupportedMediaType
+	}
+
+	// Non-StatusError means the error isn't because the server is incompatible.
+	return false
+}
+
 func (t *deployerTrait) SelectControllerStrategy(e *Environment) (*ControllerStrategy, error) {
 	if IsFalse(t.Enabled) {
 		return nil, nil