You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2018/09/13 05:31:58 UTC

[camel-k] 01/02: Add a flag to wait until the integration is running

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit c2840db015a2c449b5cfff4e1beceab1ddeb3c10
Author: nferraro <ni...@gmail.com>
AuthorDate: Wed Sep 12 18:32:54 2018 +0200

    Add a flag to wait until the integration is running
---
 cmd/kamel/kamel.go      |  4 ++-
 pkg/client/cmd/root.go  |  8 +++--
 pkg/client/cmd/run.go   | 66 ++++++++++++++++++++++++++++++++++++---
 pkg/util/watch/watch.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 152 insertions(+), 8 deletions(-)

diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go
index 3328370..ff56647 100644
--- a/cmd/kamel/kamel.go
+++ b/cmd/kamel/kamel.go
@@ -22,10 +22,12 @@ import (
 	"github.com/apache/camel-k/pkg/client/cmd"
 	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
 	"os"
+	"context"
 )
 
 func main() {
-	rootCmd, err := cmd.NewKamelCommand()
+	ctx := context.Background()
+	rootCmd, err := cmd.NewKamelCommand(ctx)
 	exitOnError(err)
 
 	err = rootCmd.Execute()
diff --git a/pkg/client/cmd/root.go b/pkg/client/cmd/root.go
index e0e2a08..50cfb40 100644
--- a/pkg/client/cmd/root.go
+++ b/pkg/client/cmd/root.go
@@ -23,15 +23,19 @@ import (
 	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"github.com/pkg/errors"
 	"github.com/spf13/cobra"
+	"context"
 )
 
 type RootCmdOptions struct {
+	Context    context.Context
 	KubeConfig string
 	Namespace  string
 }
 
-func NewKamelCommand() (*cobra.Command, error) {
-	options := RootCmdOptions{}
+func NewKamelCommand(ctx context.Context) (*cobra.Command, error) {
+	options := RootCmdOptions{
+		Context: ctx,
+	}
 	var cmd = cobra.Command{
 		Use:   "kamel",
 		Short: "Kamel is a awesome client tool for running Apache Camel integrations natively on Kubernetes",
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index d64a653..cee146e 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -31,6 +31,7 @@ import (
 	"github.com/spf13/cobra"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/apis/meta/v1"
+	"github.com/apache/camel-k/pkg/util/watch"
 )
 
 type RunCmdOptions struct {
@@ -38,6 +39,7 @@ type RunCmdOptions struct {
 	Language        string
 	IntegrationName string
 	Dependencies    []string
+	Wait            bool
 }
 
 func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
@@ -54,8 +56,10 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
 	}
 
 	cmd.Flags().StringVarP(&options.Language, "language", "l", "", "Programming Language used to write the file")
-	cmd.Flags().StringVarP(&options.IntegrationName, "name", "", "", "The integration name")
+	cmd.Flags().StringVar(&options.IntegrationName, "name", "", "The integration name")
 	cmd.Flags().StringSliceVarP(&options.Dependencies, "dependency", "d", nil, "The integration dependency")
+	cmd.Flags().BoolVarP(&options.Wait, "wait", "w", false, "Waits for the integration to be running")
+	cmd.ParseFlags(os.Args)
 
 	return &cmd
 }
@@ -74,10 +78,62 @@ func (*RunCmdOptions) validateArgs(cmd *cobra.Command, args []string) error {
 }
 
 func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error {
-	code, err := o.loadCode(args[0])
+	integration, err := o.createIntegration(cmd, args)
 	if err != nil {
 		return err
 	}
+	if o.Wait {
+		err = o.waitForIntegrationReady(integration)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (o *RunCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integration) error {
+	// Block this goroutine until the integration is in a final status
+	changes, err := watch.WatchStateChanges(o.Context, integration)
+	if err != nil {
+		return err
+	}
+
+	var lastStatusSeen *v1alpha1.IntegrationStatus
+
+watcher:
+	for {
+		select {
+		case <-o.Context.Done():
+			return nil
+		case i, ok := <-changes:
+			if !ok {
+				break watcher
+			}
+			lastStatusSeen = &i.Status
+			phase := string(i.Status.Phase)
+			if phase != "" {
+				fmt.Println("integration \""+integration.Name+"\" in phase", phase)
+				// TODO when we add health checks, we should wait until they are passed
+				if i.Status.Phase == v1alpha1.IntegrationPhaseRunning || i.Status.Phase == v1alpha1.IntegrationPhaseError {
+					// TODO display some error info when available in the status
+					break watcher
+				}
+			}
+		}
+	}
+
+	// TODO we may not be able to reach this state, since the build will be done without sources (until we add health checks)
+	if lastStatusSeen != nil && lastStatusSeen.Phase == v1alpha1.IntegrationPhaseError {
+		return errors.New("integration deployment failed")
+	}
+	return nil
+}
+
+func (o *RunCmdOptions) createIntegration(cmd *cobra.Command, args []string) (*v1alpha1.Integration, error) {
+	code, err := o.loadCode(args[0])
+	if err != nil {
+		return nil, err
+	}
 
 	namespace := o.Namespace
 
@@ -124,14 +180,14 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error {
 		clone := integration.DeepCopy()
 		err = sdk.Get(clone)
 		if err != nil {
-			return err
+			return nil, err
 		}
 		integration.ResourceVersion = clone.ResourceVersion
 		err = sdk.Update(&integration)
 	}
 
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	if !existed {
@@ -139,7 +195,7 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error {
 	} else {
 		fmt.Printf("integration \"%s\" updated\n", name)
 	}
-	return nil
+	return &integration, nil
 }
 
 func (*RunCmdOptions) loadCode(fileName string) (string, error) {
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
new file mode 100644
index 0000000..3ec0bda
--- /dev/null
+++ b/pkg/util/watch/watch.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"context"
+	"github.com/operator-framework/operator-sdk/pkg/k8sclient"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"github.com/operator-framework/operator-sdk/pkg/util/k8sutil"
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+	"github.com/sirupsen/logrus"
+)
+
+// Watches a integration resource and send it through a channel when its status changes
+func WatchStateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-chan *v1alpha1.Integration, error) {
+	resourceClient, _, err := k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, integration.Namespace)
+	if err != nil {
+		return nil, err
+	}
+	watcher, err := resourceClient.Watch(metav1.ListOptions{
+		FieldSelector: "metadata.name=" + integration.Name,
+	})
+	if err != nil {
+		return nil, err
+	}
+	events := watcher.ResultChan()
+
+	out := make(chan *v1alpha1.Integration)
+	var lastObservedState *v1alpha1.IntegrationPhase
+
+	go func() {
+		defer close(out)
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case e, ok := <-events:
+				if !ok {
+					return
+				}
+
+				if e.Object != nil {
+					if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
+						unstr := unstructured.Unstructured{
+							Object: runtimeUnstructured.UnstructuredContent(),
+						}
+						icopy := integration.DeepCopy()
+						err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy)
+						if err != nil {
+							logrus.Error("Unexpected error detected when watching resource", err)
+							return // closes the channel
+						}
+
+						if lastObservedState == nil || *lastObservedState != icopy.Status.Phase {
+							lastObservedState = &icopy.Status.Phase
+							out <- icopy
+						}
+					}
+				}
+			}
+		}
+	}()
+
+	return out, nil
+}