You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2019/01/07 14:25:49 UTC
[camel-k] 12/13: Fix #237: fix installation from scratch
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit adf3eadec6d5b38590643e274f08c3240ea250c6
Author: nferraro <ni...@gmail.com>
AuthorDate: Mon Jan 7 14:37:39 2019 +0100
Fix #237: fix installation from scratch
---
pkg/client/client.go | 23 ++++++++---------
pkg/cmd/install.go | 22 ++++++++--------
pkg/cmd/root.go | 9 +++++--
pkg/install/cluster.go | 67 +++++++++++++++++++++++++++++++++++++++++++------
pkg/install/operator.go | 20 ---------------
test/testing_env.go | 10 +++++---
6 files changed, 94 insertions(+), 57 deletions(-)
diff --git a/pkg/client/client.go b/pkg/client/client.go
index 7a0c73a..cade334 100644
--- a/pkg/client/client.go
+++ b/pkg/client/client.go
@@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
+ clientscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"
@@ -49,6 +50,11 @@ type Injectable interface {
InjectClient(Client)
}
+// Provider is used to provide a new instance of the Client each time it's required
+type Provider struct {
+ Get func() (Client, error)
+}
+
type defaultClient struct {
controller.Client
kubernetes.Interface
@@ -68,30 +74,21 @@ func NewOutOfClusterClient(kubeconfig string) (Client, error) {
return nil, err
}
- options := manager.Options{
- LeaderElection: false,
- }
-
- // Create a new Cmd to provide shared dependencies and start components
- mgr, err := manager.New(cfg, options)
- if err != nil {
- return nil, err
- }
+ scheme := clientscheme.Scheme
// Setup Scheme for all resources
- if err := apis.AddToScheme(mgr.GetScheme()); err != nil {
+ if err := apis.AddToScheme(scheme); err != nil {
return nil, err
}
var clientset kubernetes.Interface
- if clientset, err = kubernetes.NewForConfig(mgr.GetConfig()); err != nil {
+ if clientset, err = kubernetes.NewForConfig(cfg); err != nil {
return nil, err
}
// Create a new client to avoid using cache (enabled by default on operator-sdk client)
clientOptions := controller.Options{
- Scheme: mgr.GetScheme(),
- Mapper: mgr.GetRESTMapper(),
+ Scheme: scheme,
}
dynClient, err := controller.New(cfg, clientOptions)
if err != nil {
diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go
index c097cb6..342c4a9 100644
--- a/pkg/cmd/install.go
+++ b/pkg/cmd/install.go
@@ -21,6 +21,7 @@ import (
"fmt"
"strings"
+ "github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/install"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/pkg/errors"
@@ -68,22 +69,16 @@ type installCmdOptions struct {
}
func (o *installCmdOptions) install(cmd *cobra.Command, args []string) error {
- // TODO verify if this is needed by running a installation from scratch
- // Let's use a fast refresh period when running with the CLI
- // k8sclient.ResetCacheEvery(8 * time.Second)
-
- c, err := o.GetCmdClient()
- if err != nil {
- return err
- }
-
var collection *kubernetes.Collection
if o.outputFormat != "" {
collection = kubernetes.NewCollection()
}
if !o.skipClusterSetup {
- err := install.SetupClusterwideResourcesOrCollect(o.Context, c, collection)
+ // Let's use a client provider during cluster installation, to eliminate the problem of CRD object caching
+ clientProvider := client.Provider{Get: o.NewCmdClient}
+
+ err := install.SetupClusterwideResourcesOrCollect(o.Context, clientProvider, collection)
if err != nil && k8serrors.IsForbidden(err) {
fmt.Println("Current user is not authorized to create cluster-wide objects like custom resource definitions or cluster roles: ", err)
@@ -99,9 +94,14 @@ func (o *installCmdOptions) install(cmd *cobra.Command, args []string) error {
fmt.Println("Camel K cluster setup completed successfully")
}
} else {
+ c, err := o.GetCmdClient()
+ if err != nil {
+ return err
+ }
+
namespace := o.Namespace
- err := install.OperatorOrCollect(o.Context, c, namespace, collection)
+ err = install.OperatorOrCollect(o.Context, c, namespace, collection)
if err != nil {
return err
}
diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go
index c718d94..bf6aeef 100644
--- a/pkg/cmd/root.go
+++ b/pkg/cmd/root.go
@@ -81,13 +81,18 @@ func (command *RootCmdOptions) preRun(cmd *cobra.Command, args []string) error {
return nil
}
-// GetCmdClient returns a client that can be used from command line tools
+// GetCmdClient returns the client that can be used from command line tools
func (command *RootCmdOptions) GetCmdClient() (client.Client, error) {
// Get the pre-computed client
if command._client != nil {
return command._client, nil
}
var err error
- command._client, err = client.NewOutOfClusterClient(command.KubeConfig)
+ command._client, err = command.NewCmdClient()
return command._client, err
}
+
+// NewCmdClient returns a new client that can be used from command line tools
+func (command *RootCmdOptions) NewCmdClient() (client.Client, error) {
+ return client.NewOutOfClusterClient(command.KubeConfig)
+}
diff --git a/pkg/install/cluster.go b/pkg/install/cluster.go
index 7adf008..31d1096 100644
--- a/pkg/install/cluster.go
+++ b/pkg/install/cluster.go
@@ -19,25 +19,33 @@ package install
import (
"context"
+ "errors"
+ "strconv"
+ "time"
"github.com/apache/camel-k/deploy"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/kubernetes/customclient"
"k8s.io/api/rbac/v1"
- "k8s.io/apimachinery/pkg/api/errors"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
)
// SetupClusterwideResources --
-func SetupClusterwideResources(ctx context.Context, c client.Client) error {
- return SetupClusterwideResourcesOrCollect(ctx, c, nil)
+func SetupClusterwideResources(ctx context.Context, clientProvider client.Provider) error {
+ return SetupClusterwideResourcesOrCollect(ctx, clientProvider, nil)
}
// SetupClusterwideResourcesOrCollect --
-func SetupClusterwideResourcesOrCollect(ctx context.Context, c client.Client, collection *kubernetes.Collection) error {
+func SetupClusterwideResourcesOrCollect(ctx context.Context, clientProvider client.Provider, collection *kubernetes.Collection) error {
+ // Get a client to install the CRD
+ c, err := clientProvider.Get()
+ if err != nil {
+ return err
+ }
// Install CRD for Integration Platform (if needed)
if err := installCRD(ctx, c, "IntegrationPlatform", "crd-integration-platform.yaml", collection); err != nil {
@@ -66,13 +74,56 @@ func SetupClusterwideResourcesOrCollect(ctx context.Context, c client.Client, co
}
}
+ // Wait for all CRDs to be installed before proceeding
+ if err := WaitForAllCRDInstallation(ctx, clientProvider, 25*time.Second); err != nil {
+ return err
+ }
+
return nil
}
-// IsCRDInstalled check if the given CRT kind is installed
+// WaitForAllCRDInstallation waits until all CRDs are installed
+func WaitForAllCRDInstallation(ctx context.Context, clientProvider client.Provider, timeout time.Duration) error {
+ deadline := time.Now().Add(timeout)
+ for {
+ var c client.Client
+ var err error
+ if c, err = clientProvider.Get(); err != nil {
+ return err
+ }
+ var inst bool
+ if inst, err = AreAllCRDInstalled(ctx, c); err != nil {
+ return err
+ } else if inst {
+ return nil
+ }
+ // Check after 2 seconds if not expired
+ if time.Now().After(deadline) {
+ return errors.New("cannot check CRD installation after " + strconv.FormatInt(timeout.Nanoseconds()/1000000000, 10) + " seconds")
+ }
+ time.Sleep(2 * time.Second)
+ }
+}
+
+// AreAllCRDInstalled check if all the required CRDs are installed
+func AreAllCRDInstalled(ctx context.Context, c client.Client) (bool, error) {
+ if ok, err := IsCRDInstalled(ctx, c, "IntegrationPlatform"); err != nil {
+ return ok, err
+ } else if !ok {
+ return false, nil
+ }
+ if ok, err := IsCRDInstalled(ctx, c, "IntegrationContext"); err != nil {
+ return ok, err
+ } else if !ok {
+ return false, nil
+ }
+ return IsCRDInstalled(ctx, c, "Integration")
+}
+
+// IsCRDInstalled check if the given CRD kind is installed
func IsCRDInstalled(ctx context.Context, c client.Client, kind string) (bool, error) {
lst, err := c.Discovery().ServerResourcesForGroupVersion("camel.apache.org/v1alpha1")
- if err != nil && errors.IsNotFound(err) {
+ if err != nil && k8serrors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
@@ -120,7 +171,7 @@ func installCRD(ctx context.Context, c client.Client, kind string, resourceName
Resource("customresourcedefinitions").
Do()
// Check result
- if result.Error() != nil && !errors.IsAlreadyExists(result.Error()) {
+ if result.Error() != nil && !k8serrors.IsAlreadyExists(result.Error()) {
return result.Error()
}
@@ -143,7 +194,7 @@ func IsClusterRoleInstalled(ctx context.Context, c client.Client) (bool, error)
return false, err
}
err = c.Get(ctx, key, &clusterRole)
- if err != nil && errors.IsNotFound(err) {
+ if err != nil && k8serrors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
diff --git a/pkg/install/operator.go b/pkg/install/operator.go
index 1c7f871..365592f 100644
--- a/pkg/install/operator.go
+++ b/pkg/install/operator.go
@@ -20,8 +20,6 @@ package install
import (
"context"
"errors"
- "strconv"
- "time"
"github.com/apache/camel-k/deploy"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -30,7 +28,6 @@ import (
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/minishift"
"github.com/apache/camel-k/pkg/util/openshift"
- k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
)
// Operator installs the operator resources in the given namespace
@@ -100,9 +97,6 @@ func Platform(ctx context.Context, c client.Client, namespace string, registry s
// PlatformOrCollect --
// nolint: lll
func PlatformOrCollect(ctx context.Context, c client.Client, namespace string, registry string, organization string, pushSecret string, collection *kubernetes.Collection) (*v1alpha1.IntegrationPlatform, error) {
- if err := waitForPlatformCRDAvailable(ctx, c, namespace, 25*time.Second); err != nil {
- return nil, err
- }
isOpenshift, err := openshift.IsOpenShift(c)
if err != nil {
return nil, err
@@ -143,20 +137,6 @@ func PlatformOrCollect(ctx context.Context, c client.Client, namespace string, r
return pl, nil
}
-func waitForPlatformCRDAvailable(ctx context.Context, c client.Client, namespace string, timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- for {
- pla := v1alpha1.NewIntegrationPlatformList()
- if err := c.List(ctx, &k8sclient.ListOptions{Namespace: namespace}, &pla); err == nil {
- return nil
- }
- if time.Now().After(deadline) {
- return errors.New("cannot list integration platforms after " + strconv.FormatInt(timeout.Nanoseconds()/1000000000, 10) + " seconds")
- }
- time.Sleep(2 * time.Second)
- }
-}
-
// Example --
func Example(ctx context.Context, c client.Client, namespace string) error {
return ExampleOrCollect(ctx, c, namespace, nil)
diff --git a/test/testing_env.go b/test/testing_env.go
index 3d393ed..dbeae00 100644
--- a/test/testing_env.go
+++ b/test/testing_env.go
@@ -38,15 +38,19 @@ import (
var testContext context.Context
var testClient client.Client
+func newTestClient() (client.Client, error) {
+ return client.NewOutOfClusterClient("")
+}
+
func init() {
- testContext = context.TODO()
var err error
- testClient, err = client.NewOutOfClusterClient("")
+ err = install.SetupClusterwideResources(testContext, client.Provider{Get: newTestClient})
if err != nil {
panic(err)
}
- err = install.SetupClusterwideResources(testContext, testClient)
+ testContext = context.TODO()
+ testClient, err = newTestClient()
if err != nil {
panic(err)
}