You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2019/01/16 14:04:32 UTC

[camel-k] branch master updated: Add a -w flag to kamel install #135

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f32342  Add a -w flag to kamel install #135
3f32342 is described below

commit 3f323423c8d5478f439d8698ef5e419d0ed09b69
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Wed Jan 16 13:43:44 2019 +0100

    Add a -w flag to kamel install #135
---
 pkg/cmd/install.go                               | 34 +++++++++
 pkg/cmd/run.go                                   |  2 +-
 pkg/util/kubernetes/customclient/customclient.go | 18 +++++
 pkg/util/watch/watch.go                          | 88 ++++++++++++++++++++----
 4 files changed, 128 insertions(+), 14 deletions(-)

diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go
index f9bbff2..29b7015 100644
--- a/pkg/cmd/install.go
+++ b/pkg/cmd/install.go
@@ -21,6 +21,9 @@ import (
 	"fmt"
 	"strings"
 
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/util/watch"
+
 	"github.com/apache/camel-k/pkg/client"
 	"github.com/apache/camel-k/pkg/install"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
@@ -40,6 +43,7 @@ func newCmdInstall(rootCmdOptions *RootCmdOptions) *cobra.Command {
 		RunE:  impl.install,
 	}
 
+	cmd.Flags().BoolVarP(&impl.wait, "wait", "w", false, "Waits for the platform to be running")
 	cmd.Flags().BoolVar(&impl.clusterSetupOnly, "cluster-setup", false, "Execute cluster-wide operations only (may require admin rights)")
 	cmd.Flags().BoolVar(&impl.skipClusterSetup, "skip-cluster-setup", false, "Skip the cluster-setup phase")
 	cmd.Flags().BoolVar(&impl.exampleSetup, "example", false, "Install example integration")
@@ -66,6 +70,7 @@ func newCmdInstall(rootCmdOptions *RootCmdOptions) *cobra.Command {
 
 type installCmdOptions struct {
 	*RootCmdOptions
+	wait             bool
 	clusterSetupOnly bool
 	skipClusterSetup bool
 	exampleSetup     bool
@@ -155,6 +160,13 @@ func (o *installCmdOptions) install(cmd *cobra.Command, args []string) error {
 		}
 
 		if collection == nil {
+			if o.wait {
+				err = o.waitForPlatformReady(platform)
+				if err != nil {
+					return err
+				}
+			}
+
 			fmt.Println("Camel K installed in namespace", namespace)
 		}
 	}
@@ -186,3 +198,25 @@ func (o *installCmdOptions) printOutput(collection *kubernetes.Collection) error
 	}
 	return nil
 }
+
+func (o *installCmdOptions) waitForPlatformReady(platform *v1alpha1.IntegrationPlatform) error {
+	handler := func(i *v1alpha1.IntegrationPlatform) bool {
+		if i.Status.Phase != "" {
+			fmt.Println("platform \""+platform.Name+"\" in phase", i.Status.Phase)
+
+			if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady {
+				// TODO display some error info when available in the status
+				return false
+			}
+
+			if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseError {
+				fmt.Println("platform installation failed")
+				return false
+			}
+		}
+
+		return true
+	}
+
+	return watch.HandlePlatformStateChanges(o.Context, platform, handler)
+}
diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go
index 079016e..b4c34a2 100644
--- a/pkg/cmd/run.go
+++ b/pkg/cmd/run.go
@@ -227,7 +227,7 @@ func (o *runCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integratio
 		return true
 	}
 
-	return watch.HandleStateChanges(o.Context, integration, handler)
+	return watch.HandleIntegrationStateChanges(o.Context, integration, handler)
 }
 
 func (o *runCmdOptions) syncIntegration(c client.Client, sources []string) error {
diff --git a/pkg/util/kubernetes/customclient/customclient.go b/pkg/util/kubernetes/customclient/customclient.go
index 345be1d..3d41155 100644
--- a/pkg/util/kubernetes/customclient/customclient.go
+++ b/pkg/util/kubernetes/customclient/customclient.go
@@ -18,6 +18,7 @@ limitations under the License.
 package customclient
 
 import (
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"k8s.io/apimachinery/pkg/runtime/schema"
 	"k8s.io/client-go/dynamic"
 	"k8s.io/client-go/kubernetes"
@@ -65,3 +66,20 @@ func GetDynamicClientFor(group string, version string, kind string, namespace st
 		Resource: kind,
 	}).Namespace(namespace), nil
 }
+
+// GetDefaultDynamicClientFor returns a dynamic client for a given kind
+func GetDefaultDynamicClientFor(kind string, namespace string) (dynamic.ResourceInterface, error) {
+	conf, err := config.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	dynamicClient, err := dynamic.NewForConfig(conf)
+	if err != nil {
+		return nil, err
+	}
+	return dynamicClient.Resource(schema.GroupVersionResource{
+		Group:    v1alpha1.SchemeGroupVersion.Group,
+		Version:  v1alpha1.SchemeGroupVersion.Version,
+		Resource: kind,
+	}).Namespace(namespace), nil
+}
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
index dcf35ae..f1bcd84 100644
--- a/pkg/util/watch/watch.go
+++ b/pkg/util/watch/watch.go
@@ -20,19 +20,20 @@ package watch
 import (
 	"context"
 
+	"github.com/apache/camel-k/pkg/util/kubernetes"
+
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/util/kubernetes/customclient"
 	"github.com/sirupsen/logrus"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/util/json"
 )
 
 //
-// HandleStateChanges watches a integration resource and invoke the given handler when its status changes.
+// HandleIntegrationStateChanges watches a integration resource and invoke the given handler when its status changes.
 //
-//     err := watch.HandleStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool {
+//     err := watch.HandleIntegrationStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool {
 //         if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
 //			    return false
 //		    }
@@ -42,8 +43,8 @@ import (
 //
 // This function blocks until the handler function returns true or either the events channel or the context is closed.
 //
-func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error {
-	dynamicClient, err := customclient.GetDynamicClientFor(v1alpha1.SchemeGroupVersion.Group, v1alpha1.SchemeGroupVersion.Version, "integrations", integration.Namespace)
+func HandleIntegrationStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error {
+	dynamicClient, err := customclient.GetDefaultDynamicClientFor("integrations", integration.Namespace)
 	if err != nil {
 		return err
 	}
@@ -70,23 +71,84 @@ func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration,
 
 			if e.Object != nil {
 				if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
-					unstr := unstructured.Unstructured{
-						Object: runtimeUnstructured.UnstructuredContent(),
+					jsondata, err := kubernetes.ToJSON(runtimeUnstructured)
+					if err != nil {
+						return err
+					}
+					copy := integration.DeepCopy()
+					err = json.Unmarshal(jsondata, copy)
+					if err != nil {
+						logrus.Error("Unexpected error detected when watching resource", err)
+						return nil
 					}
-					jsondata, err := unstr.MarshalJSON()
+
+					if lastObservedState == nil || *lastObservedState != copy.Status.Phase {
+						lastObservedState = &copy.Status.Phase
+						if !handler(copy) {
+							return nil
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+//
+// HandlePlatformStateChanges watches a platform resource and invoke the given handler when its status changes.
+//
+//     err := watch.HandlePlatformStateChanges(ctx, platform, func(i *v1alpha1.IntegrationPlatform) bool {
+//         if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady {
+//			    return false
+//		    }
+//
+//		    return true
+//	    })
+//
+// This function blocks until the handler function returns true or either the events channel or the context is closed.
+//
+func HandlePlatformStateChanges(ctx context.Context, platform *v1alpha1.IntegrationPlatform, handler func(platform *v1alpha1.IntegrationPlatform) bool) error {
+	dynamicClient, err := customclient.GetDefaultDynamicClientFor("integrationplatforms", platform.Namespace)
+	if err != nil {
+		return err
+	}
+	watcher, err := dynamicClient.Watch(metav1.ListOptions{
+		FieldSelector: "metadata.name=" + platform.Name,
+	})
+	if err != nil {
+		return err
+	}
+
+	defer watcher.Stop()
+	events := watcher.ResultChan()
+
+	var lastObservedState *v1alpha1.IntegrationPlatformPhase
+
+	for {
+		select {
+		case <-ctx.Done():
+			return nil
+		case e, ok := <-events:
+			if !ok {
+				return nil
+			}
+
+			if e.Object != nil {
+				if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
+					jsondata, err := kubernetes.ToJSON(runtimeUnstructured)
 					if err != nil {
 						return err
 					}
-					icopy := integration.DeepCopy()
-					err = json.Unmarshal(jsondata, icopy)
+					copy := platform.DeepCopy()
+					err = json.Unmarshal(jsondata, copy)
 					if err != nil {
 						logrus.Error("Unexpected error detected when watching resource", err)
 						return nil
 					}
 
-					if lastObservedState == nil || *lastObservedState != icopy.Status.Phase {
-						lastObservedState = &icopy.Status.Phase
-						if !handler(icopy) {
+					if lastObservedState == nil || *lastObservedState != copy.Status.Phase {
+						lastObservedState = &copy.Status.Phase
+						if !handler(copy) {
 							return nil
 						}
 					}