You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2018/10/12 17:08:29 UTC
[camel-k] 01/03: chore(kamel): simplify integration status watcher
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 4caea26e68f8b0c90dbd8f79b0cc2aef0b577d40
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 12 16:29:59 2018 +0200
chore(kamel): simplify integration status watcher
---
cmd/kamel/kamel.go | 7 +++++-
pkg/client/cmd/run.go | 50 +++++++++++++-----------------------
pkg/util/util.go | 21 ++++++++++++++++
pkg/util/watch/watch.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 112 insertions(+), 33 deletions(-)
diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go
index a89042f..502a978 100644
--- a/cmd/kamel/kamel.go
+++ b/cmd/kamel/kamel.go
@@ -31,7 +31,11 @@ import (
func main() {
rand.Seed(time.Now().UTC().UnixNano())
- ctx := context.Background()
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Cancel ctx as soon as main returns
+ defer cancel()
+
rootCmd, err := cmd.NewKamelCommand(ctx)
exitOnError(err)
@@ -42,6 +46,7 @@ func main() {
func exitOnError(err error) {
if err != nil {
fmt.Println("Error:", err)
+
os.Exit(1)
}
}
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index 5b6664d..a22b58f 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -19,6 +19,7 @@ package cmd
import (
"fmt"
+ "io"
"io/ioutil"
"net/http"
"os"
@@ -34,8 +35,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
- "io"
-
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/log"
@@ -165,41 +164,28 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args []string) error {
}
func (o *runCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integration) error {
- // Block this goroutine until the integration is in a final status
- changes, err := watch.StateChanges(o.Context, integration)
- if err != nil {
- return err
- }
-
- var lastStatusSeen *v1alpha1.IntegrationStatus
-
-watcher:
- for {
- select {
- case <-o.Context.Done():
- return nil
- case i, ok := <-changes:
- if !ok {
- break watcher
+ handler := func(i *v1alpha1.Integration) bool {
+ //
+ // TODO when we add health checks, we should wait until they are passed
+ //
+ if i.Status.Phase != "" {
+ fmt.Println("integration \""+integration.Name+"\" in phase", i.Status.Phase)
+
+ if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
+ // TODO display some error info when available in the status
+ return false
}
- lastStatusSeen = &i.Status
- phase := string(i.Status.Phase)
- if phase != "" {
- fmt.Println("integration \""+integration.Name+"\" in phase", phase)
- // TODO when we add health checks, we should wait until they are passed
- if i.Status.Phase == v1alpha1.IntegrationPhaseRunning || i.Status.Phase == v1alpha1.IntegrationPhaseError {
- // TODO display some error info when available in the status
- break watcher
- }
+
+ if i.Status.Phase == v1alpha1.IntegrationPhaseError {
+ fmt.Println("integration deployment failed")
+ return false
}
}
- }
- // TODO we may not be able to reach this state, since the build will be done without sources (until we add health checks)
- if lastStatusSeen != nil && lastStatusSeen.Phase == v1alpha1.IntegrationPhaseError {
- return errors.New("integration deployment failed")
+ return true
}
- return nil
+
+ return watch.HandleStateChanges(o.Context, integration, handler)
}
func (o *runCmdOptions) printLogs(integration *v1alpha1.Integration) error {
diff --git a/pkg/util/util.go b/pkg/util/util.go
index 7792210..dcfbd36 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -17,6 +17,12 @@ limitations under the License.
package util
+import (
+ "os"
+ "os/signal"
+ "syscall"
+)
+
// StringSliceContains --
func StringSliceContains(slice []string, items []string) bool {
for i := 0; i < len(items); i++ {
@@ -51,3 +57,18 @@ func StringSliceUniqueAdd(slice *[]string, item string) bool {
return true
}
+
+// WaitForSignal --
+func WaitForSignal(sig chan os.Signal, exit func(int)) {
+ signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)
+ go func() {
+ s := <-sig
+ switch s {
+ case syscall.SIGINT, syscall.SIGTERM:
+ exit(130) // Ctrl+c
+ case syscall.SIGPIPE:
+ exit(0)
+ }
+ exit(1)
+ }()
+}
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
index 1ed9671..73d865e 100644
--- a/pkg/util/watch/watch.go
+++ b/pkg/util/watch/watch.go
@@ -47,7 +47,9 @@ func StateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-cha
var lastObservedState *v1alpha1.IntegrationPhase
go func() {
+ defer watcher.Stop()
defer close(out)
+
for {
select {
case <-ctx.Done():
@@ -81,3 +83,68 @@ func StateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-cha
return out, nil
}
+
+//
+// HandleStateChanges watches a integration resource and invoke the given handler when its status changes.
+//
+// err := watch.HandleStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool {
+// if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
+// return false
+// }
+//
+// return true
+// })
+//
+// This function blocks until the handler function returns true or either the events channel or the context is closed.
+//
+func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error {
+ resourceClient, _, err := k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, integration.Namespace)
+ if err != nil {
+ return err
+ }
+ watcher, err := resourceClient.Watch(metav1.ListOptions{
+ FieldSelector: "metadata.name=" + integration.Name,
+ })
+ if err != nil {
+ return err
+ }
+
+ defer watcher.Stop()
+ events := watcher.ResultChan()
+
+ var lastObservedState *v1alpha1.IntegrationPhase
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case e, ok := <-events:
+ if !ok {
+ return nil
+ }
+
+ if e.Object != nil {
+ if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
+ unstr := unstructured.Unstructured{
+ Object: runtimeUnstructured.UnstructuredContent(),
+ }
+ icopy := integration.DeepCopy()
+ err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy)
+ if err != nil {
+ logrus.Error("Unexpected error detected when watching resource", err)
+ return nil
+ }
+
+ if lastObservedState == nil || *lastObservedState != icopy.Status.Phase {
+ lastObservedState = &icopy.Status.Phase
+ if !handler(icopy) {
+ return nil
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return nil
+}