You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2018/10/12 17:08:29 UTC

[camel-k] 01/03: chore(kamel): simplify integration status watcher

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 4caea26e68f8b0c90dbd8f79b0cc2aef0b577d40
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 12 16:29:59 2018 +0200

    chore(kamel): simplify integration status watcher
---
 cmd/kamel/kamel.go      |  7 +++++-
 pkg/client/cmd/run.go   | 50 +++++++++++++-----------------------
 pkg/util/util.go        | 21 ++++++++++++++++
 pkg/util/watch/watch.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 112 insertions(+), 33 deletions(-)

diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go
index a89042f..502a978 100644
--- a/cmd/kamel/kamel.go
+++ b/cmd/kamel/kamel.go
@@ -31,7 +31,11 @@ import (
 func main() {
 	rand.Seed(time.Now().UTC().UnixNano())
 
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
+
+	// Cancel ctx as soon as main returns
+	defer cancel()
+
 	rootCmd, err := cmd.NewKamelCommand(ctx)
 	exitOnError(err)
 
@@ -42,6 +46,7 @@ func main() {
 func exitOnError(err error) {
 	if err != nil {
 		fmt.Println("Error:", err)
+
 		os.Exit(1)
 	}
 }
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index 5b6664d..a22b58f 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -19,6 +19,7 @@ package cmd
 
 import (
 	"fmt"
+	"io"
 	"io/ioutil"
 	"net/http"
 	"os"
@@ -34,8 +35,6 @@ import (
 	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
 
-	"io"
-
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"github.com/apache/camel-k/pkg/util/log"
@@ -165,41 +164,28 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args []string) error {
 }
 
 func (o *runCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integration) error {
-	// Block this goroutine until the integration is in a final status
-	changes, err := watch.StateChanges(o.Context, integration)
-	if err != nil {
-		return err
-	}
-
-	var lastStatusSeen *v1alpha1.IntegrationStatus
-
-watcher:
-	for {
-		select {
-		case <-o.Context.Done():
-			return nil
-		case i, ok := <-changes:
-			if !ok {
-				break watcher
+	handler := func(i *v1alpha1.Integration) bool {
+		//
+		// TODO when we add health checks, we should wait until they are passed
+		//
+		if i.Status.Phase != "" {
+			fmt.Println("integration \""+integration.Name+"\" in phase", i.Status.Phase)
+
+			if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
+				// TODO display some error info when available in the status
+				return false
 			}
-			lastStatusSeen = &i.Status
-			phase := string(i.Status.Phase)
-			if phase != "" {
-				fmt.Println("integration \""+integration.Name+"\" in phase", phase)
-				// TODO when we add health checks, we should wait until they are passed
-				if i.Status.Phase == v1alpha1.IntegrationPhaseRunning || i.Status.Phase == v1alpha1.IntegrationPhaseError {
-					// TODO display some error info when available in the status
-					break watcher
-				}
+
+			if i.Status.Phase == v1alpha1.IntegrationPhaseError {
+				fmt.Println("integration deployment failed")
+				return false
 			}
 		}
-	}
 
-	// TODO we may not be able to reach this state, since the build will be done without sources (until we add health checks)
-	if lastStatusSeen != nil && lastStatusSeen.Phase == v1alpha1.IntegrationPhaseError {
-		return errors.New("integration deployment failed")
+		return true
 	}
-	return nil
+
+	return watch.HandleStateChanges(o.Context, integration, handler)
 }
 
 func (o *runCmdOptions) printLogs(integration *v1alpha1.Integration) error {
diff --git a/pkg/util/util.go b/pkg/util/util.go
index 7792210..dcfbd36 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -17,6 +17,12 @@ limitations under the License.
 
 package util
 
+import (
+	"os"
+	"os/signal"
+	"syscall"
+)
+
 // StringSliceContains --
 func StringSliceContains(slice []string, items []string) bool {
 	for i := 0; i < len(items); i++ {
@@ -51,3 +57,18 @@ func StringSliceUniqueAdd(slice *[]string, item string) bool {
 
 	return true
 }
+
+// WaitForSignal --
+func WaitForSignal(sig chan os.Signal, exit func(int)) {
+	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)
+	go func() {
+		s := <-sig
+		switch s {
+		case syscall.SIGINT, syscall.SIGTERM:
+			exit(130) // Ctrl+c
+		case syscall.SIGPIPE:
+			exit(0)
+		}
+		exit(1)
+	}()
+}
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
index 1ed9671..73d865e 100644
--- a/pkg/util/watch/watch.go
+++ b/pkg/util/watch/watch.go
@@ -47,7 +47,9 @@ func StateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-cha
 	var lastObservedState *v1alpha1.IntegrationPhase
 
 	go func() {
+		defer watcher.Stop()
 		defer close(out)
+
 		for {
 			select {
 			case <-ctx.Done():
@@ -81,3 +83,68 @@ func StateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-cha
 
 	return out, nil
 }
+
+//
+// HandleStateChanges watches a integration resource and invoke the given handler when its status changes.
+//
+//     err := watch.HandleStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool {
+//         if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
+//			    return false
+//		    }
+//
+//		    return true
+//	    })
+//
+// This function blocks until the handler function returns true or either the events channel or the context is closed.
+//
+func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error {
+	resourceClient, _, err := k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, integration.Namespace)
+	if err != nil {
+		return err
+	}
+	watcher, err := resourceClient.Watch(metav1.ListOptions{
+		FieldSelector: "metadata.name=" + integration.Name,
+	})
+	if err != nil {
+		return err
+	}
+
+	defer watcher.Stop()
+	events := watcher.ResultChan()
+
+	var lastObservedState *v1alpha1.IntegrationPhase
+
+	for {
+		select {
+		case <-ctx.Done():
+			return nil
+		case e, ok := <-events:
+			if !ok {
+				return nil
+			}
+
+			if e.Object != nil {
+				if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
+					unstr := unstructured.Unstructured{
+						Object: runtimeUnstructured.UnstructuredContent(),
+					}
+					icopy := integration.DeepCopy()
+					err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy)
+					if err != nil {
+						logrus.Error("Unexpected error detected when watching resource", err)
+						return nil
+					}
+
+					if lastObservedState == nil || *lastObservedState != icopy.Status.Phase {
+						lastObservedState = &icopy.Status.Phase
+						if !handler(icopy) {
+							return nil
+						}
+					}
+				}
+			}
+		}
+	}
+
+	return nil
+}