You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/01/12 08:15:21 UTC
[camel-k] 21/22: Fix #1107: disable applier code to detect real CI errors
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 885d2bd99ca5ce46bad901df5082d6e424a1500b
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Mon Jan 10 14:43:43 2022 +0100
Fix #1107: disable applier code to detect real CI errors
---
pkg/install/kamelets.go | 94 +++++++++++++++++++++++++++++++++++++--
pkg/resources/resources.go | 12 ++---
pkg/trait/deployer.go | 108 ++++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 203 insertions(+), 11 deletions(-)
diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go
index 4ff4572..82a818b 100644
--- a/pkg/install/kamelets.go
+++ b/pkg/install/kamelets.go
@@ -19,21 +19,33 @@ package install
import (
"context"
+ "errors"
"fmt"
"io/fs"
+ "net/http"
"os"
"path"
"path/filepath"
"strings"
+ "sync"
+ "sync/atomic"
"golang.org/x/sync/errgroup"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+
+ ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+ logf "sigs.k8s.io/controller-runtime/pkg/log"
+
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/defaults"
"github.com/apache/camel-k/pkg/util/kubernetes"
- "k8s.io/apimachinery/pkg/runtime"
+ "github.com/apache/camel-k/pkg/util/patch"
)
const (
@@ -41,6 +53,13 @@ const (
defaultKameletDir = "/kamelets/"
)
+var (
+ log = logf.Log
+
+ hasServerSideApply atomic.Value
+ tryServerSideApply sync.Once
+)
+
// KameletCatalog installs the bundled Kamelets into the specified namespace.
func KameletCatalog(ctx context.Context, c client.Client, namespace string) error {
kameletDir := os.Getenv(kameletDirEnv)
@@ -58,7 +77,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
}
g, gCtx := errgroup.WithContext(ctx)
- applier := c.ServerOrClientSideApplier()
+
err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error {
if err != nil {
return err
@@ -75,9 +94,31 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
if err != nil {
return err
}
- if err := applier.Apply(gCtx, kamelet); err != nil {
+ once := false
+ tryServerSideApply.Do(func() {
+ once = true
+ if err = serverSideApply(gCtx, c, kamelet); err != nil {
+ if isIncompatibleServerError(err) {
+ log.Info("Fallback to client-side apply for installing bundled Kamelets")
+ hasServerSideApply.Store(false)
+ err = nil
+ } else {
+ tryServerSideApply = sync.Once{}
+ }
+ } else {
+ hasServerSideApply.Store(true)
+ }
+ })
+ if err != nil {
return err
}
+ if v := hasServerSideApply.Load(); v.(bool) {
+ if !once {
+ return serverSideApply(gCtx, c, kamelet)
+ }
+ } else {
+ return clientSideApply(gCtx, c, kamelet)
+ }
return nil
})
return nil
@@ -89,6 +130,53 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
return g.Wait()
}
+func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error {
+ target, err := patch.PositiveApplyPatch(resource)
+ if err != nil {
+ return err
+ }
+ return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+}
+
+func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error {
+ err := c.Create(ctx, resource)
+ if err == nil {
+ return nil
+ } else if !k8serrors.IsAlreadyExists(err) {
+ return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+ }
+ object := &unstructured.Unstructured{}
+ object.SetNamespace(resource.GetNamespace())
+ object.SetName(resource.GetName())
+ object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+ err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
+ if err != nil {
+ return err
+ }
+ p, err := patch.PositiveMergePatch(object, resource)
+ if err != nil {
+ return err
+ } else if len(p) == 0 {
+ return nil
+ }
+ return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
+}
+
+func isIncompatibleServerError(err error) bool {
+ // First simpler check for older servers (i.e. OpenShift 3.11)
+ if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+ return true
+ }
+ // 415: Unsupported media type means we're talking to a server which doesn't
+ // support server-side apply.
+ var serr *k8serrors.StatusError
+ if errors.As(err, &serr) {
+ return serr.Status().Code == http.StatusUnsupportedMediaType
+ }
+ // Non-StatusError means the error isn't because the server is incompatible.
+ return false
+}
+
func loadKamelet(path string, namespace string, scheme *runtime.Scheme) (*v1alpha1.Kamelet, error) {
content, err := util.ReadFile(path)
if err != nil {
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index e64bea2..bcc7095 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -145,16 +145,16 @@ var assets = func() http.FileSystem {
"/crd/bases/camel.apache.org_integrations.yaml": &vfsgen۰CompressedFileInfo{
name: "camel.apache.org_integrations.yaml",
modTime: time.Time{},
- uncompressedSize: 366985,
+ uncompressedSize: 367530,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x8a\x53\x4e\xea\x93\xb4\x11\x29\x3b\x99\x9d\xbb\xe3\x3b\xf5\xa5\x34\x92\x9c\xd5\x8d\x2d\xab\x2c\x25\xf9\x52\x4e\x36\x0b\x76\x83\x24\x56\xdd\x40\x2f\x80\xa6\xcc\xbd\xbe\xff\xfb\x2d\x1c\x00\xfd\xe0\xab\x81\x16\xe9\x38\x53\x8d\xa9\x9a\x98\x14\xfb\x34\x1e\xe7\x7d\x0e\xce\xf9\x12\x46\xfb\x1b\x5f\x7c\x09\xaf\x59\x42\xb9\xa2\x29\x68\x01\x7a\x4e\xe1\xbc\x20\xc9\x9c\xc2\x9d\x98\xea\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x8a\x53\x4e\xea\x93\xb4\x11\x29\x3b\x99\x9d\xbb\xe3\x3b\xf5\xa5\x34\x92\x9c\xd5\x8d\x2d\xab\x2c\x25\xf9\x52\x4e\x36\x0b\x76\x83\x24\x56\xdd\x40\x2f\x80\xa6\xcc\xbd\xbe\xff\xfb\x2d\x1c\x00\xfd\xe0\xab\x81\x16\xe9\x38\x53\x8d\xa9\x9a\x98\x14\xfb\x34\x1e\xe7\x7d\x0e\xce\xf9\x12\x46\xfb\x1b\x5f\x7c\x09\xaf\x59\x42\xb9\xa2\x29\x68\x01\x7a\x4e\xe1\xbc\x20\xc9\x9c\xc2\x9d\x98\xea\x [...]
},
"/crd/bases/camel.apache.org_kameletbindings.yaml": &vfsgen۰CompressedFileInfo{
name: "camel.apache.org_kameletbindings.yaml",
modTime: time.Time{},
- uncompressedSize: 432125,
+ uncompressedSize: 432720,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x0a\x94\x9c\xfa\x24\x6d\x44\xca\xce\xcc\xce\xdd\xf1\x9d\xfa\x52\x1a\x59\xce\xe8\xc6\x96\x59\x96\xe2\x7c\x29\x27\x9b\x05\xbb\x41\x12\xab\x6e\xa0\x17\x40\x53\xe2\x5e\xdf\xff\xfd\x16\x0e\x80\x7e\xf0\x25\x9c\xa6\xa8\x28\x3b\x8d\xa9\x9a\x98\x22\xfb\x34\x5e\xe7\xfd\x7a\x41\x06\x8f\x37\xbe\x7a\x41\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\xce\x0a\x9a\xcc\x18\xb9\x96\x13\x73\x47\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x0a\x94\x9c\xfa\x24\x6d\x44\xca\xce\xcc\xce\xdd\xf1\x9d\xfa\x52\x1a\x59\xce\xe8\xc6\x96\x59\x96\xe2\x7c\x29\x27\x9b\x05\xbb\x41\x12\xab\x6e\xa0\x17\x40\x53\xe2\x5e\xdf\xff\xfd\x16\x0e\x80\x7e\xf0\x25\x9c\xa6\xa8\x28\x3b\x8d\xa9\x9a\x98\x22\xfb\x34\x5e\xe7\xfd\x7a\x41\x06\x8f\x37\xbe\x7a\x41\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\xce\x0a\x9a\xcc\x18\xb9\x96\x13\x73\x47\x [...]
},
"/crd/bases/camel.apache.org_kamelets.yaml": &vfsgen۰CompressedFileInfo{
name: "camel.apache.org_kamelets.yaml",
@@ -555,9 +555,9 @@ var assets = func() http.FileSystem {
"/traits.yaml": &vfsgen۰CompressedFileInfo{
name: "traits.yaml",
modTime: time.Time{},
- uncompressedSize: 49341,
+ uncompressedSize: 50652,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x7d\xfd\x73\x5b\xb9\x91\xe0\xef\xf3\x57\xa0\xb4\x57\x65\x49\x45\x52\x9e\xc9\x26\x3b\xa7\xbb\xd9\x94\xc6\x76\x12\xcd\xf8\x43\x67\x3b\xb3\x97\x9a\x9b\x0a\xc1\xf7\x9a\x24\xcc\x47\xe0\x05\xc0\x93\xcc\xdc\xde\xff\x7e\x85\xee\xc6\xc7\x7b\x24\x25\xca\xb6\x66\xa3\xad\xdd\x54\xed\x58\xd2\x03\xd0\x68\x34\xfa\xbb\x1b\xde\x4a\xe5\xdd\xf9\x57\x63\xa1\xe5\x1a\xce\x85\x9c\xcf\x95\x56\x7e\xf3\x95\x10\x6d\x23\xfd\xdc\xd8\xf5\xb9\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\x7d\x73\x1c\xb9\x91\x27\xfc\xff\x7c\x0a\x04\xfd\x44\x88\x64\x74\x37\x35\xe3\xb5\x3d\x0f\xef\xb4\x3e\x8e\x24\xdb\x9c\xd1\x0b\x4f\x92\xc7\xe7\xd0\x29\xdc\xe8\xaa\xec\x6e\xa8\xab\x81\x32\x80\x22\xd5\x3e\xdf\x77\xbf\x40\x66\xe2\xa5\xaa\x9b\x64\x53\x12\x67\xcd\x8d\x5d\x47\xec\x88\x64\x01\x48\x24\x12\x89\x44\xe6\x2f\x13\xde\x4a\xe5\xdd\xe9\x37\x63\xa1\xe5\x1a\x4e\x85\x9c\xcf\x95\x56\x7e\xf3\x8d\x10\x6d\x23\xfd\xdc\x [...]
},
}
fs["/"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
diff --git a/pkg/trait/deployer.go b/pkg/trait/deployer.go
index 7735a37..67cdb79 100644
--- a/pkg/trait/deployer.go
+++ b/pkg/trait/deployer.go
@@ -17,6 +17,22 @@ limitations under the License.
package trait
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/http"
+ "strings"
+
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/types"
+
+ ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+
+ "github.com/apache/camel-k/pkg/util/patch"
+)
+
// The deployer trait is responsible for deploying the resources owned by the integration, and can be used
// to explicitly select the underlying controller that will manage the integration pods.
//
@@ -29,6 +45,8 @@ type deployerTrait struct {
var _ ControllerStrategySelector = &deployerTrait{}
+var hasServerSideApply = true
+
func newDeployerTrait() Trait {
return &deployerTrait{
BaseTrait: NewBaseTrait("deployer", 900),
@@ -42,9 +60,28 @@ func (t *deployerTrait) Configure(e *Environment) (bool, error) {
func (t *deployerTrait) Apply(e *Environment) error {
// Register a post action that patches the resources generated by the traits
e.PostActions = append(e.PostActions, func(env *Environment) error {
- applier := e.Client.ServerOrClientSideApplier()
for _, resource := range env.Resources.Items() {
- if err := applier.Apply(e.Ctx, resource); err != nil {
+ // We assume that server-side apply is enabled by default.
+ // It is currently convoluted to check pro-actively whether server-side apply
+ // is enabled. This is possible to fetch the OpenAPI endpoint, which returns
+ // the entire server API document, then lookup the resource PATCH endpoint, and
+ // check its list of accepted MIME types.
+ // As a simpler solution, we fall back to client-side apply at the first
+ // 415 error, and assume server-side apply is not available globally.
+ if hasServerSideApply {
+ err := t.serverSideApply(env, resource)
+ switch {
+ case err == nil:
+ continue
+ case isIncompatibleServerError(err):
+ t.L.Info("Fallback to client-side apply to patch resources")
+ hasServerSideApply = false
+ default:
+ // Keep server-side apply unless server is incompatible with it
+ return err
+ }
+ }
+ if err := t.clientSideApply(env, resource); err != nil {
return err
}
}
@@ -54,6 +91,73 @@ func (t *deployerTrait) Apply(e *Environment) error {
return nil
}
+func (t *deployerTrait) serverSideApply(env *Environment, resource ctrl.Object) error {
+ target, err := patch.PositiveApplyPatch(resource)
+ if err != nil {
+ return err
+ }
+ err = env.Client.Patch(env.Ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+ if err != nil {
+ return fmt.Errorf("error during apply resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+ }
+ // Update the resource with the response returned from the API server
+ return t.unstructuredToRuntimeObject(target, resource)
+}
+
+func (t *deployerTrait) clientSideApply(env *Environment, resource ctrl.Object) error {
+ err := env.Client.Create(env.Ctx, resource)
+ if err == nil {
+ return nil
+ } else if !k8serrors.IsAlreadyExists(err) {
+ return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+ }
+ object := &unstructured.Unstructured{}
+ object.SetNamespace(resource.GetNamespace())
+ object.SetName(resource.GetName())
+ object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+ err = env.Client.Get(env.Ctx, ctrl.ObjectKeyFromObject(object), object)
+ if err != nil {
+ return err
+ }
+ p, err := patch.PositiveMergePatch(object, resource)
+ if err != nil {
+ return err
+ } else if len(p) == 0 {
+ // Update the resource with the object returned from the API server
+ return t.unstructuredToRuntimeObject(object, resource)
+ }
+ err = env.Client.Patch(env.Ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
+ if err != nil {
+ return fmt.Errorf("error during patch %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+ }
+ return nil
+}
+
+func (t *deployerTrait) unstructuredToRuntimeObject(u *unstructured.Unstructured, obj ctrl.Object) error {
+ data, err := json.Marshal(u)
+ if err != nil {
+ return err
+ }
+ return json.Unmarshal(data, obj)
+}
+
+func isIncompatibleServerError(err error) bool {
+ // First simpler check for older servers (i.e. OpenShift 3.11)
+ if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+ return true
+ }
+
+ // 415: Unsupported media type means we're talking to a server which doesn't
+ // support server-side apply.
+ var serr *k8serrors.StatusError
+ if errors.As(err, &serr) {
+ return serr.Status().Code == http.StatusUnsupportedMediaType
+ }
+
+ // Non-StatusError means the error isn't because the server is incompatible.
+ return false
+}
+
func (t *deployerTrait) SelectControllerStrategy(e *Environment) (*ControllerStrategy, error) {
if IsFalse(t.Enabled) {
return nil, nil