You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2019/01/16 14:04:32 UTC
[camel-k] branch master updated: Add a -w flag to kamel install #135
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push:
new 3f32342 Add a -w flag to kamel install #135
3f32342 is described below
commit 3f323423c8d5478f439d8698ef5e419d0ed09b69
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Wed Jan 16 13:43:44 2019 +0100
Add a -w flag to kamel install #135
---
pkg/cmd/install.go | 34 +++++++++
pkg/cmd/run.go | 2 +-
pkg/util/kubernetes/customclient/customclient.go | 18 +++++
pkg/util/watch/watch.go | 88 ++++++++++++++++++++----
4 files changed, 128 insertions(+), 14 deletions(-)
diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go
index f9bbff2..29b7015 100644
--- a/pkg/cmd/install.go
+++ b/pkg/cmd/install.go
@@ -21,6 +21,9 @@ import (
"fmt"
"strings"
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/pkg/util/watch"
+
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/install"
"github.com/apache/camel-k/pkg/util/kubernetes"
@@ -40,6 +43,7 @@ func newCmdInstall(rootCmdOptions *RootCmdOptions) *cobra.Command {
RunE: impl.install,
}
+ cmd.Flags().BoolVarP(&impl.wait, "wait", "w", false, "Waits for the platform to be running")
cmd.Flags().BoolVar(&impl.clusterSetupOnly, "cluster-setup", false, "Execute cluster-wide operations only (may require admin rights)")
cmd.Flags().BoolVar(&impl.skipClusterSetup, "skip-cluster-setup", false, "Skip the cluster-setup phase")
cmd.Flags().BoolVar(&impl.exampleSetup, "example", false, "Install example integration")
@@ -66,6 +70,7 @@ func newCmdInstall(rootCmdOptions *RootCmdOptions) *cobra.Command {
type installCmdOptions struct {
*RootCmdOptions
+ wait bool
clusterSetupOnly bool
skipClusterSetup bool
exampleSetup bool
@@ -155,6 +160,13 @@ func (o *installCmdOptions) install(cmd *cobra.Command, args []string) error {
}
if collection == nil {
+ if o.wait {
+ err = o.waitForPlatformReady(platform)
+ if err != nil {
+ return err
+ }
+ }
+
fmt.Println("Camel K installed in namespace", namespace)
}
}
@@ -186,3 +198,25 @@ func (o *installCmdOptions) printOutput(collection *kubernetes.Collection) error
}
return nil
}
+
+func (o *installCmdOptions) waitForPlatformReady(platform *v1alpha1.IntegrationPlatform) error {
+ handler := func(i *v1alpha1.IntegrationPlatform) bool {
+ if i.Status.Phase != "" {
+ fmt.Println("platform \""+platform.Name+"\" in phase", i.Status.Phase)
+
+ if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady {
+ // TODO display some error info when available in the status
+ return false
+ }
+
+ if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseError {
+ fmt.Println("platform installation failed")
+ return false
+ }
+ }
+
+ return true
+ }
+
+ return watch.HandlePlatformStateChanges(o.Context, platform, handler)
+}
diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go
index 079016e..b4c34a2 100644
--- a/pkg/cmd/run.go
+++ b/pkg/cmd/run.go
@@ -227,7 +227,7 @@ func (o *runCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integratio
return true
}
- return watch.HandleStateChanges(o.Context, integration, handler)
+ return watch.HandleIntegrationStateChanges(o.Context, integration, handler)
}
func (o *runCmdOptions) syncIntegration(c client.Client, sources []string) error {
diff --git a/pkg/util/kubernetes/customclient/customclient.go b/pkg/util/kubernetes/customclient/customclient.go
index 345be1d..3d41155 100644
--- a/pkg/util/kubernetes/customclient/customclient.go
+++ b/pkg/util/kubernetes/customclient/customclient.go
@@ -18,6 +18,7 @@ limitations under the License.
package customclient
import (
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
@@ -65,3 +66,20 @@ func GetDynamicClientFor(group string, version string, kind string, namespace st
Resource: kind,
}).Namespace(namespace), nil
}
+
+// GetDefaultDynamicClientFor returns a dynamic client for a given kind
+func GetDefaultDynamicClientFor(kind string, namespace string) (dynamic.ResourceInterface, error) {
+ conf, err := config.GetConfig()
+ if err != nil {
+ return nil, err
+ }
+ dynamicClient, err := dynamic.NewForConfig(conf)
+ if err != nil {
+ return nil, err
+ }
+ return dynamicClient.Resource(schema.GroupVersionResource{
+ Group: v1alpha1.SchemeGroupVersion.Group,
+ Version: v1alpha1.SchemeGroupVersion.Version,
+ Resource: kind,
+ }).Namespace(namespace), nil
+}
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
index dcf35ae..f1bcd84 100644
--- a/pkg/util/watch/watch.go
+++ b/pkg/util/watch/watch.go
@@ -20,19 +20,20 @@ package watch
import (
"context"
+ "github.com/apache/camel-k/pkg/util/kubernetes"
+
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/kubernetes/customclient"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
)
//
-// HandleStateChanges watches a integration resource and invoke the given handler when its status changes.
+// HandleIntegrationStateChanges watches a integration resource and invoke the given handler when its status changes.
//
-// err := watch.HandleStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool {
+// err := watch.HandleIntegrationStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool {
// if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
// return false
// }
@@ -42,8 +43,8 @@ import (
//
// This function blocks until the handler function returns true or either the events channel or the context is closed.
//
-func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error {
- dynamicClient, err := customclient.GetDynamicClientFor(v1alpha1.SchemeGroupVersion.Group, v1alpha1.SchemeGroupVersion.Version, "integrations", integration.Namespace)
+func HandleIntegrationStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error {
+ dynamicClient, err := customclient.GetDefaultDynamicClientFor("integrations", integration.Namespace)
if err != nil {
return err
}
@@ -70,23 +71,84 @@ func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration,
if e.Object != nil {
if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
- unstr := unstructured.Unstructured{
- Object: runtimeUnstructured.UnstructuredContent(),
+ jsondata, err := kubernetes.ToJSON(runtimeUnstructured)
+ if err != nil {
+ return err
+ }
+ copy := integration.DeepCopy()
+ err = json.Unmarshal(jsondata, copy)
+ if err != nil {
+ logrus.Error("Unexpected error detected when watching resource", err)
+ return nil
}
- jsondata, err := unstr.MarshalJSON()
+
+ if lastObservedState == nil || *lastObservedState != copy.Status.Phase {
+ lastObservedState = ©.Status.Phase
+ if !handler(copy) {
+ return nil
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+//
+// HandlePlatformStateChanges watches a platform resource and invoke the given handler when its status changes.
+//
+// err := watch.HandlePlatformStateChanges(ctx, platform, func(i *v1alpha1.IntegrationPlatform) bool {
+// if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady {
+// return false
+// }
+//
+// return true
+// })
+//
+// This function blocks until the handler function returns true or either the events channel or the context is closed.
+//
+func HandlePlatformStateChanges(ctx context.Context, platform *v1alpha1.IntegrationPlatform, handler func(platform *v1alpha1.IntegrationPlatform) bool) error {
+ dynamicClient, err := customclient.GetDefaultDynamicClientFor("integrationplatforms", platform.Namespace)
+ if err != nil {
+ return err
+ }
+ watcher, err := dynamicClient.Watch(metav1.ListOptions{
+ FieldSelector: "metadata.name=" + platform.Name,
+ })
+ if err != nil {
+ return err
+ }
+
+ defer watcher.Stop()
+ events := watcher.ResultChan()
+
+ var lastObservedState *v1alpha1.IntegrationPlatformPhase
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case e, ok := <-events:
+ if !ok {
+ return nil
+ }
+
+ if e.Object != nil {
+ if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
+ jsondata, err := kubernetes.ToJSON(runtimeUnstructured)
if err != nil {
return err
}
- icopy := integration.DeepCopy()
- err = json.Unmarshal(jsondata, icopy)
+ copy := platform.DeepCopy()
+ err = json.Unmarshal(jsondata, copy)
if err != nil {
logrus.Error("Unexpected error detected when watching resource", err)
return nil
}
- if lastObservedState == nil || *lastObservedState != icopy.Status.Phase {
- lastObservedState = &icopy.Status.Phase
- if !handler(icopy) {
+ if lastObservedState == nil || *lastObservedState != copy.Status.Phase {
+ lastObservedState = ©.Status.Phase
+ if !handler(copy) {
return nil
}
}