You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2018/09/13 05:31:58 UTC
[camel-k] 01/02: Add a flag to wait until the integration is running
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit c2840db015a2c449b5cfff4e1beceab1ddeb3c10
Author: nferraro <ni...@gmail.com>
AuthorDate: Wed Sep 12 18:32:54 2018 +0200
Add a flag to wait until the integration is running
---
cmd/kamel/kamel.go | 4 ++-
pkg/client/cmd/root.go | 8 +++--
pkg/client/cmd/run.go | 66 ++++++++++++++++++++++++++++++++++++---
pkg/util/watch/watch.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 152 insertions(+), 8 deletions(-)
diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go
index 3328370..ff56647 100644
--- a/cmd/kamel/kamel.go
+++ b/cmd/kamel/kamel.go
@@ -22,10 +22,12 @@ import (
"github.com/apache/camel-k/pkg/client/cmd"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"os"
+ "context"
)
func main() {
- rootCmd, err := cmd.NewKamelCommand()
+ ctx := context.Background()
+ rootCmd, err := cmd.NewKamelCommand(ctx)
exitOnError(err)
err = rootCmd.Execute()
diff --git a/pkg/client/cmd/root.go b/pkg/client/cmd/root.go
index e0e2a08..50cfb40 100644
--- a/pkg/client/cmd/root.go
+++ b/pkg/client/cmd/root.go
@@ -23,15 +23,19 @@ import (
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/pkg/errors"
"github.com/spf13/cobra"
+ "context"
)
type RootCmdOptions struct {
+ Context context.Context
KubeConfig string
Namespace string
}
-func NewKamelCommand() (*cobra.Command, error) {
- options := RootCmdOptions{}
+func NewKamelCommand(ctx context.Context) (*cobra.Command, error) {
+ options := RootCmdOptions{
+ Context: ctx,
+ }
var cmd = cobra.Command{
Use: "kamel",
Short: "Kamel is a awesome client tool for running Apache Camel integrations natively on Kubernetes",
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index d64a653..cee146e 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -31,6 +31,7 @@ import (
"github.com/spf13/cobra"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/apache/camel-k/pkg/util/watch"
)
type RunCmdOptions struct {
@@ -38,6 +39,7 @@ type RunCmdOptions struct {
Language string
IntegrationName string
Dependencies []string
+ Wait bool
}
func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
@@ -54,8 +56,10 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
}
cmd.Flags().StringVarP(&options.Language, "language", "l", "", "Programming Language used to write the file")
- cmd.Flags().StringVarP(&options.IntegrationName, "name", "", "", "The integration name")
+ cmd.Flags().StringVar(&options.IntegrationName, "name", "", "The integration name")
cmd.Flags().StringSliceVarP(&options.Dependencies, "dependency", "d", nil, "The integration dependency")
+ cmd.Flags().BoolVarP(&options.Wait, "wait", "w", false, "Waits for the integration to be running")
+ cmd.ParseFlags(os.Args)
return &cmd
}
@@ -74,10 +78,62 @@ func (*RunCmdOptions) validateArgs(cmd *cobra.Command, args []string) error {
}
func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error {
- code, err := o.loadCode(args[0])
+ integration, err := o.createIntegration(cmd, args)
if err != nil {
return err
}
+ if o.Wait {
+ err = o.waitForIntegrationReady(integration)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (o *RunCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integration) error {
+ // Block this goroutine until the integration is in a final status
+ changes, err := watch.WatchStateChanges(o.Context, integration)
+ if err != nil {
+ return err
+ }
+
+ var lastStatusSeen *v1alpha1.IntegrationStatus
+
+watcher:
+ for {
+ select {
+ case <-o.Context.Done():
+ return nil
+ case i, ok := <-changes:
+ if !ok {
+ break watcher
+ }
+ lastStatusSeen = &i.Status
+ phase := string(i.Status.Phase)
+ if phase != "" {
+ fmt.Println("integration \""+integration.Name+"\" in phase", phase)
+ // TODO when we add health checks, we should wait until they are passed
+ if i.Status.Phase == v1alpha1.IntegrationPhaseRunning || i.Status.Phase == v1alpha1.IntegrationPhaseError {
+ // TODO display some error info when available in the status
+ break watcher
+ }
+ }
+ }
+ }
+
+ // TODO we may not be able to reach this state, since the build will be done without sources (until we add health checks)
+ if lastStatusSeen != nil && lastStatusSeen.Phase == v1alpha1.IntegrationPhaseError {
+ return errors.New("integration deployment failed")
+ }
+ return nil
+}
+
+func (o *RunCmdOptions) createIntegration(cmd *cobra.Command, args []string) (*v1alpha1.Integration, error) {
+ code, err := o.loadCode(args[0])
+ if err != nil {
+ return nil, err
+ }
namespace := o.Namespace
@@ -124,14 +180,14 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error {
clone := integration.DeepCopy()
err = sdk.Get(clone)
if err != nil {
- return err
+ return nil, err
}
integration.ResourceVersion = clone.ResourceVersion
err = sdk.Update(&integration)
}
if err != nil {
- return err
+ return nil, err
}
if !existed {
@@ -139,7 +195,7 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error {
} else {
fmt.Printf("integration \"%s\" updated\n", name)
}
- return nil
+ return &integration, nil
}
func (*RunCmdOptions) loadCode(fileName string) (string, error) {
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
new file mode 100644
index 0000000..3ec0bda
--- /dev/null
+++ b/pkg/util/watch/watch.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+ "context"
+ "github.com/operator-framework/operator-sdk/pkg/k8sclient"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "github.com/operator-framework/operator-sdk/pkg/util/k8sutil"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "github.com/sirupsen/logrus"
+)
+
+// Watches a integration resource and send it through a channel when its status changes
+func WatchStateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-chan *v1alpha1.Integration, error) {
+ resourceClient, _, err := k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, integration.Namespace)
+ if err != nil {
+ return nil, err
+ }
+ watcher, err := resourceClient.Watch(metav1.ListOptions{
+ FieldSelector: "metadata.name=" + integration.Name,
+ })
+ if err != nil {
+ return nil, err
+ }
+ events := watcher.ResultChan()
+
+ out := make(chan *v1alpha1.Integration)
+ var lastObservedState *v1alpha1.IntegrationPhase
+
+ go func() {
+ defer close(out)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case e, ok := <-events:
+ if !ok {
+ return
+ }
+
+ if e.Object != nil {
+ if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok {
+ unstr := unstructured.Unstructured{
+ Object: runtimeUnstructured.UnstructuredContent(),
+ }
+ icopy := integration.DeepCopy()
+ err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy)
+ if err != nil {
+ logrus.Error("Unexpected error detected when watching resource", err)
+ return // closes the channel
+ }
+
+ if lastObservedState == nil || *lastObservedState != icopy.Status.Phase {
+ lastObservedState = &icopy.Status.Phase
+ out <- icopy
+ }
+ }
+ }
+ }
+ }
+ }()
+
+ return out, nil
+}