You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2021/12/07 13:23:49 UTC

[camel-k] branch main updated (2ef7836 -> cc924de)

This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git.


    from 2ef7836  Updated CHANGELOG.md
     new 03bb989  chore: Parallel Kamelets install
     new 8ed4b47  chore: Use Server-Side Apply to install bundled Kamelets
     new 8af6490  chore: Increase client-side throttling maximum burst
     new cc924de  chore: Update Kamelet update e2e test

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...elet_upgrade_test.go => kamelet_update_test.go} |  24 +--
 e2e/support/test_support.go                        |  14 +-
 go.mod                                             |   1 +
 pkg/client/client.go                               |  31 ++--
 pkg/cmd/operator/operator.go                       |  12 +-
 pkg/install/kamelets.go                            | 169 +++++++++++++++------
 6 files changed, 165 insertions(+), 86 deletions(-)
 rename e2e/common/{kamelet_upgrade_test.go => kamelet_update_test.go} (67%)

[camel-k] 02/04: chore: Use Server-Side Apply to install bundled Kamelets

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 8ed4b47951bef17c2856b3c90ffc22ad548bd9d7
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Mon Dec 6 15:39:52 2021 +0100

    chore: Use Server-Side Apply to install bundled Kamelets
---
 pkg/install/kamelets.go | 117 +++++++++++++++++++++++++++++++++++-------------
 1 file changed, 86 insertions(+), 31 deletions(-)

diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go
index b2ad752..e0f353e 100644
--- a/pkg/install/kamelets.go
+++ b/pkg/install/kamelets.go
@@ -19,8 +19,10 @@ package install
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"io/fs"
+	"net/http"
 	"os"
 	"path"
 	"path/filepath"
@@ -29,15 +31,18 @@ import (
 	"golang.org/x/sync/errgroup"
 
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/types"
 
-	"github.com/pkg/errors"
+	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/client"
 	"github.com/apache/camel-k/pkg/util"
 	"github.com/apache/camel-k/pkg/util/defaults"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
+	"github.com/apache/camel-k/pkg/util/patch"
 )
 
 const (
@@ -45,6 +50,8 @@ const (
 	defaultKameletDir = "/kamelets/"
 )
 
+var hasServerSideApply = true
+
 // KameletCatalog installs the bundled Kamelets into the specified namespace.
 func KameletCatalog(ctx context.Context, c client.Client, namespace string) error {
 	kameletDir := os.Getenv(kameletDirEnv)
@@ -75,7 +82,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 		}
 		// We may want to throttle the creation of Go routines if the number of bundled Kamelets increases.
 		g.Go(func() error {
-			return createOrReplaceKamelet(gCtx, c, path.Join(kameletDir, f.Name()), namespace)
+			return applyKamelet(gCtx, c, path.Join(kameletDir, f.Name()), namespace)
 		})
 		return nil
 	})
@@ -86,9 +93,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 	return g.Wait()
 }
 
-func createOrReplaceKamelet(ctx context.Context, c client.Client, path string, namespace string) error {
-	fmt.Printf("Install file: %s in %s", path, namespace)
-
+func applyKamelet(ctx context.Context, c client.Client, path string, namespace string) error {
 	content, err := util.ReadFile(path)
 	if err != nil {
 		return err
@@ -98,38 +103,88 @@ func createOrReplaceKamelet(ctx context.Context, c client.Client, path string, n
 	if err != nil {
 		return err
 	}
-	if k, ok := obj.(*v1alpha1.Kamelet); ok {
-		existing := &v1alpha1.Kamelet{}
-		err = c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: k.Name}, existing)
-		if err != nil {
-			if k8serrors.IsNotFound(err) {
-				existing = nil
-			} else {
-				return err
-			}
-		}
+	kamelet, ok := obj.(*v1alpha1.Kamelet)
+	if !ok {
+		return fmt.Errorf("cannot load Kamelet from file %q", path)
+	}
+
+	kamelet.Namespace = namespace
+
+	if kamelet.GetAnnotations() == nil {
+		kamelet.SetAnnotations(make(map[string]string))
+	}
+	kamelet.GetAnnotations()[kamelVersionAnnotation] = defaults.Version
 
-		if existing == nil || existing.Labels[v1alpha1.KameletBundledLabel] == "true" {
-			if k.GetAnnotations() == nil {
-				k.SetAnnotations(make(map[string]string))
-			}
-			k.GetAnnotations()[kamelVersionAnnotation] = defaults.Version
-
-			if k.GetLabels() == nil {
-				k.SetLabels(make(map[string]string))
-			}
-			k.GetLabels()[v1alpha1.KameletBundledLabel] = "true"
-			k.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true"
-
-			err := ObjectOrCollect(ctx, c, namespace, nil, true, k)
-			if err != nil {
-				return errors.Wrapf(err, "could not create resource from file %q", path)
-			}
+	if kamelet.GetLabels() == nil {
+		kamelet.SetLabels(make(map[string]string))
+	}
+	kamelet.GetLabels()[v1alpha1.KameletBundledLabel] = "true"
+	kamelet.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true"
+
+	if hasServerSideApply {
+		err := serverSideApply(ctx, c, kamelet)
+		switch {
+		case err == nil:
+			break
+		case isIncompatibleServerError(err):
+			hasServerSideApply = false
+		default:
+			return fmt.Errorf("could not apply Kamelet from file %q: %w", path, err)
 		}
+	} else {
+		return clientSideApply(ctx, c, kamelet)
 	}
+
 	return nil
 }
 
+func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error {
+	target, err := patch.PositiveApplyPatch(resource)
+	if err != nil {
+		return err
+	}
+	return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+}
+
+func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error {
+	err := c.Create(ctx, resource)
+	if err == nil {
+		return nil
+	} else if !k8serrors.IsAlreadyExists(err) {
+		return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
+	}
+	object := &unstructured.Unstructured{}
+	object.SetNamespace(resource.GetNamespace())
+	object.SetName(resource.GetName())
+	object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+	err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
+	if err != nil {
+		return err
+	}
+	p, err := patch.PositiveMergePatch(object, resource)
+	if err != nil {
+		return err
+	} else if len(p) == 0 {
+		return nil
+	}
+	return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
+}
+
+func isIncompatibleServerError(err error) bool {
+	// First simpler check for older servers (i.e. OpenShift 3.11)
+	if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+		return true
+	}
+	// 415: Unsupported media type means we're talking to a server which doesn't
+	// support server-side apply.
+	var serr *k8serrors.StatusError
+	if errors.As(err, &serr) {
+		return serr.Status().Code == http.StatusUnsupportedMediaType
+	}
+	// Non-StatusError means the error isn't because the server is incompatible.
+	return false
+}
+
 // KameletViewerRole installs the role that allows any user ro access kamelets in the global namespace.
 func KameletViewerRole(ctx context.Context, c client.Client, namespace string) error {
 	if err := Resource(ctx, c, namespace, true, IdentityResourceCustomizer, "/viewer/user-global-kamelet-viewer-role.yaml"); err != nil {

[camel-k] 03/04: chore: Increase client-side throttling maximum burst

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 8af6490f5e30210b9ad7b870f7ddb50708faae37
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Mon Dec 6 16:26:31 2021 +0100

    chore: Increase client-side throttling maximum burst
---
 pkg/client/client.go         | 31 +++++++++++++++++--------------
 pkg/cmd/operator/operator.go | 12 ++++++++++--
 2 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/pkg/client/client.go b/pkg/client/client.go
index 0893363..3334e70 100644
--- a/pkg/client/client.go
+++ b/pkg/client/client.go
@@ -23,10 +23,6 @@ import (
 	"os"
 	"path/filepath"
 
-	"github.com/apache/camel-k/pkg/util"
-
-	camelv1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1"
-	camelv1alpha1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1"
 	user "github.com/mitchellh/go-homedir"
 	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
@@ -36,18 +32,21 @@ import (
 	"k8s.io/apimachinery/pkg/runtime/schema"
 
 	"k8s.io/client-go/kubernetes"
-	clientscheme "k8s.io/client-go/kubernetes/scheme"
+	"k8s.io/client-go/kubernetes/scheme"
 	"k8s.io/client-go/rest"
 	"k8s.io/client-go/tools/clientcmd"
 	clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
 	clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"
 
-	controller "sigs.k8s.io/controller-runtime/pkg/client"
+	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 	"sigs.k8s.io/controller-runtime/pkg/client/config"
 	"sigs.k8s.io/controller-runtime/pkg/manager"
 
 	"github.com/apache/camel-k/pkg/apis"
 	camel "github.com/apache/camel-k/pkg/client/camel/clientset/versioned"
+	camelv1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1"
+	camelv1alpha1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/util"
 )
 
 const (
@@ -57,7 +56,7 @@ const (
 
 // Client is an abstraction for a k8s client.
 type Client interface {
-	controller.Client
+	ctrl.Client
 	kubernetes.Interface
 	CamelV1() camelv1.CamelV1Interface
 	CamelV1alpha1() camelv1alpha1.CamelV1alpha1Interface
@@ -77,7 +76,7 @@ type Provider struct {
 }
 
 type defaultClient struct {
-	controller.Client
+	ctrl.Client
 	kubernetes.Interface
 	camel  camel.Interface
 	scheme *runtime.Scheme
@@ -116,16 +115,20 @@ func NewOutOfClusterClient(kubeconfig string) (Client, error) {
 
 // NewClient creates a new k8s client that can be used from outside or in the cluster.
 func NewClient(fastDiscovery bool) (Client, error) {
-	// Get a config to talk to the apiserver
 	cfg, err := config.GetConfig()
 	if err != nil {
 		return nil, err
 	}
+	return NewClientWithConfig(fastDiscovery, cfg)
+}
 
-	scheme := clientscheme.Scheme
+// NewClientWithConfig creates a new k8s client that can be used from outside or in the cluster.
+func NewClientWithConfig(fastDiscovery bool, cfg *rest.Config) (Client, error) {
+	clientScheme := scheme.Scheme
 
 	// Setup Scheme for all resources
-	if err := apis.AddToScheme(scheme); err != nil {
+	err := apis.AddToScheme(clientScheme)
+	if err != nil {
 		return nil, err
 	}
 
@@ -145,11 +148,11 @@ func NewClient(fastDiscovery bool) (Client, error) {
 	}
 
 	// Create a new client to avoid using cache (enabled by default with controller-runtime client)
-	clientOptions := controller.Options{
-		Scheme: scheme,
+	clientOptions := ctrl.Options{
+		Scheme: clientScheme,
 		Mapper: mapper,
 	}
-	dynClient, err := controller.New(cfg, clientOptions)
+	dynClient, err := ctrl.New(cfg, clientOptions)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go
index 1d140ea..256d9d5 100644
--- a/pkg/cmd/operator/operator.go
+++ b/pkg/cmd/operator/operator.go
@@ -41,6 +41,7 @@ import (
 
 	"sigs.k8s.io/controller-runtime/pkg/cache"
 	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+	"sigs.k8s.io/controller-runtime/pkg/client/config"
 	"sigs.k8s.io/controller-runtime/pkg/healthz"
 	logf "sigs.k8s.io/controller-runtime/pkg/log"
 	"sigs.k8s.io/controller-runtime/pkg/log/zap"
@@ -93,7 +94,14 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) {
 	watchNamespace, err := getWatchNamespace()
 	exitOnError(err, "failed to get watch namespace")
 
-	c, err := client.NewClient(false)
+	cfg, err := config.GetConfig()
+	exitOnError(err, "cannot get client config")
+	// Increase maximum burst that is used by client-side throttling,
+	// to prevent the requests made to apply the bundled Kamelets
+	// from being throttled.
+	cfg.QPS = 20
+	cfg.Burst = 200
+	c, err := client.NewClientWithConfig(false, cfg)
 	exitOnError(err, "cannot initialize client")
 
 	// We do not rely on the event broadcaster managed by controller runtime,
@@ -178,7 +186,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) {
 	exitOnError(controller.AddToManager(mgr), "")
 
 	log.Info("Installing operator resources")
-	installCtx, installCancel := context.WithTimeout(context.TODO(), 1*time.Minute)
+	installCtx, installCancel := context.WithTimeout(context.Background(), 1*time.Minute)
 	defer installCancel()
 	install.OperatorStartupOptionalTools(installCtx, c, watchNamespace, operatorNamespace, log)
 

[camel-k] 04/04: chore: Update Kamelet update e2e test

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit cc924ded5198f141b349b2a69b137e5b3dc37cbb
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Tue Dec 7 09:28:51 2021 +0100

    chore: Update Kamelet update e2e test
---
 ...elet_upgrade_test.go => kamelet_update_test.go} | 24 +++++++++++-----------
 e2e/support/test_support.go                        | 14 ++++---------
 2 files changed, 16 insertions(+), 22 deletions(-)

diff --git a/e2e/common/kamelet_upgrade_test.go b/e2e/common/kamelet_update_test.go
similarity index 67%
rename from e2e/common/kamelet_upgrade_test.go
rename to e2e/common/kamelet_update_test.go
index a426f46..c39a981 100644
--- a/e2e/common/kamelet_upgrade_test.go
+++ b/e2e/common/kamelet_update_test.go
@@ -32,26 +32,26 @@ import (
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 )
 
-const preExistingKameletMarker = "pre-existing-kamelet"
-
-func TestKameletUpgrade(t *testing.T) {
+const customLabel = "custom-label"
 
+func TestBundleKameletUpdate(t *testing.T) {
 	WithNewTestNamespace(t, func(ns string) {
-		Expect(createOperatorManagedKamelet(ns, "http-sink")()).To(Succeed()) // Going to be replaced
-		Expect(createUserManagedKamelet(ns, "ftp-sink")()).To(Succeed())      // Left intact by the operator
-		// Leverages the fact that the default kamelet catalog contains embedded "http-sink" and "ftp-sink"
+		Expect(createBundleKamelet(ns, "http-sink")()).To(Succeed()) // Going to be replaced
+		Expect(createUserKamelet(ns, "user-sink")()).To(Succeed())   // Left intact by the operator
 
 		Expect(Kamel("install", "-n", ns).Execute()).To(Succeed())
 
-		Eventually(KameletHasLabel("http-sink", ns, preExistingKameletMarker)).Should(BeFalse())
-		Consistently(KameletHasLabel("ftp-sink", ns, preExistingKameletMarker), 5*time.Second, 1*time.Second).Should(BeTrue())
+		Eventually(Kamelet("http-sink", ns)).
+			Should(WithTransform(KameletLabels, HaveKeyWithValue(customLabel, "true")))
+		Consistently(Kamelet("user-sink", ns), 5*time.Second, 1*time.Second).
+			Should(WithTransform(KameletLabels, HaveKeyWithValue(customLabel, "true")))
 
 		// Cleanup
 		Expect(Kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil())
 	})
 }
 
-func createOperatorManagedKamelet(ns string, name string) func() error {
+func createBundleKamelet(ns string, name string) func() error {
 	flow := map[string]interface{}{
 		"from": map[string]interface{}{
 			"uri": "kamelet:source",
@@ -59,13 +59,13 @@ func createOperatorManagedKamelet(ns string, name string) func() error {
 	}
 
 	labels := map[string]string{
-		preExistingKameletMarker:     "true",
+		customLabel:                  "true",
 		v1alpha1.KameletBundledLabel: "true",
 	}
 	return CreateKamelet(ns, name, flow, nil, labels)
 }
 
-func createUserManagedKamelet(ns string, name string) func() error {
+func createUserKamelet(ns string, name string) func() error {
 	flow := map[string]interface{}{
 		"from": map[string]interface{}{
 			"uri": "kamelet:source",
@@ -73,7 +73,7 @@ func createUserManagedKamelet(ns string, name string) func() error {
 	}
 
 	labels := map[string]string{
-		preExistingKameletMarker: "true",
+		customLabel: "true",
 	}
 	return CreateKamelet(ns, name, flow, nil, labels)
 }
diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go
index 9147233..8d3ab96 100644
--- a/e2e/support/test_support.go
+++ b/e2e/support/test_support.go
@@ -1367,17 +1367,11 @@ func Kamelet(name string, ns string) func() *v1alpha1.Kamelet {
 	}
 }
 
-func KameletHasLabel(name string, ns string, label string) func() bool {
-	return func() bool {
-		k := Kamelet(name, ns)()
-		if k == nil {
-			return false
-		}
-		if _, ok := k.Labels[label]; ok {
-			return true
-		}
-		return false
+func KameletLabels(kamelet *v1alpha1.Kamelet) map[string]string {
+	if kamelet == nil {
+		return map[string]string{}
 	}
+	return kamelet.GetLabels()
 }
 
 func ClusterDomainName() (string, error) {

[camel-k] 01/04: chore: Parallel Kamelets install

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 03bb989d9ad9753f3b2354e0d35d418e853d1cdd
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Mon Dec 6 15:09:04 2021 +0100

    chore: Parallel Kamelets install
---
 go.mod                  |   1 +
 pkg/install/kamelets.go | 106 ++++++++++++++++++++++++++++--------------------
 2 files changed, 63 insertions(+), 44 deletions(-)

diff --git a/go.mod b/go.mod
index 34497e8..e11b03b 100644
--- a/go.mod
+++ b/go.mod
@@ -43,6 +43,7 @@ require (
 	go.uber.org/multierr v1.6.0
 	go.uber.org/zap v1.19.1
 	golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
+	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
 	gopkg.in/inf.v0 v0.9.1
 	gopkg.in/yaml.v2 v2.4.0
 	k8s.io/api v0.21.4
diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go
index 91bebee..b2ad752 100644
--- a/pkg/install/kamelets.go
+++ b/pkg/install/kamelets.go
@@ -1,12 +1,12 @@
 /*
 Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
+contributor license agreements. See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
+the License. You may obtain a copy of the License at
 
-   http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,19 +20,22 @@ package install
 import (
 	"context"
 	"fmt"
-	"io/ioutil"
+	"io/fs"
 	"os"
 	"path"
+	"path/filepath"
 	"strings"
 
-	"github.com/apache/camel-k/pkg/util"
+	"golang.org/x/sync/errgroup"
 
-	"github.com/pkg/errors"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/types"
 
+	"github.com/pkg/errors"
+
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/client"
+	"github.com/apache/camel-k/pkg/util"
 	"github.com/apache/camel-k/pkg/util/defaults"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
 )
@@ -42,7 +45,7 @@ const (
 	defaultKameletDir = "/kamelets/"
 )
 
-// KameletCatalog installs the bundled KameletCatalog into one namespace.
+// KameletCatalog installs the bundled Kamelets into the specified namespace.
 func KameletCatalog(ctx context.Context, c client.Client, namespace string) error {
 	kameletDir := os.Getenv(kameletDirEnv)
 	if kameletDir == "" {
@@ -58,57 +61,72 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
 		return fmt.Errorf("kamelet directory %q is a file", kameletDir)
 	}
 
-	files, err := ioutil.ReadDir(kameletDir)
+	g, gCtx := errgroup.WithContext(ctx)
+
+	err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error {
+		if err != nil {
+			return err
+		}
+		if f.IsDir() && f.Name() != d.Name() {
+			return fs.SkipDir
+		}
+		if !(strings.HasSuffix(f.Name(), ".yaml") || strings.HasSuffix(f.Name(), ".yml")) {
+			return nil
+		}
+		// We may want to throttle the creation of Go routines if the number of bundled Kamelets increases.
+		g.Go(func() error {
+			return createOrReplaceKamelet(gCtx, c, path.Join(kameletDir, f.Name()), namespace)
+		})
+		return nil
+	})
 	if err != nil {
 		return err
 	}
 
-	for _, file := range files {
-		if file.IsDir() || !(strings.HasSuffix(file.Name(), ".yaml") || strings.HasSuffix(file.Name(), ".yml")) {
-			continue
-		}
+	return g.Wait()
+}
 
-		content, err := util.ReadFile(path.Join(kameletDir, file.Name()))
-		if err != nil {
-			return err
-		}
+func createOrReplaceKamelet(ctx context.Context, c client.Client, path string, namespace string) error {
+	fmt.Printf("Install file: %s in %s", path, namespace)
+
+	content, err := util.ReadFile(path)
+	if err != nil {
+		return err
+	}
 
-		obj, err := kubernetes.LoadResourceFromYaml(c.GetScheme(), string(content))
+	obj, err := kubernetes.LoadResourceFromYaml(c.GetScheme(), string(content))
+	if err != nil {
+		return err
+	}
+	if k, ok := obj.(*v1alpha1.Kamelet); ok {
+		existing := &v1alpha1.Kamelet{}
+		err = c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: k.Name}, existing)
 		if err != nil {
-			return err
+			if k8serrors.IsNotFound(err) {
+				existing = nil
+			} else {
+				return err
+			}
 		}
-		if k, ok := obj.(*v1alpha1.Kamelet); ok {
-			existing := &v1alpha1.Kamelet{}
-			err = c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: k.Name}, existing)
-			if err != nil {
-				if k8serrors.IsNotFound(err) {
-					existing = nil
-				} else {
-					return err
-				}
+
+		if existing == nil || existing.Labels[v1alpha1.KameletBundledLabel] == "true" {
+			if k.GetAnnotations() == nil {
+				k.SetAnnotations(make(map[string]string))
 			}
+			k.GetAnnotations()[kamelVersionAnnotation] = defaults.Version
 
-			if existing == nil || existing.Labels[v1alpha1.KameletBundledLabel] == "true" {
-				if k.GetAnnotations() == nil {
-					k.SetAnnotations(make(map[string]string))
-				}
-				k.GetAnnotations()[kamelVersionAnnotation] = defaults.Version
-
-				if k.GetLabels() == nil {
-					k.SetLabels(make(map[string]string))
-				}
-				k.GetLabels()[v1alpha1.KameletBundledLabel] = "true"
-				k.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true"
-
-				err := ObjectOrCollect(ctx, c, namespace, nil, true, k)
-				if err != nil {
-					return errors.Wrapf(err, "could not create resource from file %q", path.Join(kameletDir, file.Name()))
-				}
+			if k.GetLabels() == nil {
+				k.SetLabels(make(map[string]string))
 			}
+			k.GetLabels()[v1alpha1.KameletBundledLabel] = "true"
+			k.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true"
 
+			err := ObjectOrCollect(ctx, c, namespace, nil, true, k)
+			if err != nil {
+				return errors.Wrapf(err, "could not create resource from file %q", path)
+			}
 		}
 	}
-
 	return nil
 }