You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/03/10 17:11:40 UTC

[camel-k] 05/09: Adapt cron trait to latest camel-k-runtime #1329

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 61fc9335c0b5fb10daea1e109a334b97982242a4
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Mon Mar 9 16:57:03 2020 +0100

    Adapt cron trait to latest camel-k-runtime #1329
---
 e2e/cron_test.go         |   9 ++
 pkg/trait/cron.go        |  79 ++++++++----
 pkg/trait/cron_test.go   | 328 ++++++++++++++++++++++++++++++++++++++++++++++-
 pkg/trait/trait_types.go |  11 +-
 4 files changed, 398 insertions(+), 29 deletions(-)

diff --git a/e2e/cron_test.go b/e2e/cron_test.go
index a64936a..857f7f9 100644
--- a/e2e/cron_test.go
+++ b/e2e/cron_test.go
@@ -58,5 +58,14 @@ func TestRunCronExample(t *testing.T) {
 			Eventually(integrationLogs(ns, "cron-fallback"), testTimeoutShort).Should(ContainSubstring("Magicstring!"))
 			Expect(kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil())
 		})
+
+		t.Run("cron-fallback-quarkus", func(t *testing.T) {
+			RegisterTestingT(t)
+
+			Expect(kamel("run", "-n", ns, "files/cron.groovy").Execute()).Should(BeNil())
+			Eventually(integrationPodPhase(ns, "cron"), testTimeoutMedium).Should(Equal(v1.PodRunning))
+			Eventually(integrationLogs(ns, "cron"), testTimeoutShort).Should(ContainSubstring("Magicstring!"))
+			Expect(kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil())
+		})
 	})
 }
diff --git a/pkg/trait/cron.go b/pkg/trait/cron.go
index 4445438..6b6ac85 100644
--- a/pkg/trait/cron.go
+++ b/pkg/trait/cron.go
@@ -20,6 +20,7 @@ package trait
 import (
 	"fmt"
 	"regexp"
+	"sort"
 	"strconv"
 	"strings"
 
@@ -31,7 +32,6 @@ import (
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/metadata"
 	"github.com/apache/camel-k/pkg/util"
-	"github.com/apache/camel-k/pkg/util/envvar"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"github.com/apache/camel-k/pkg/util/uri"
 )
@@ -125,7 +125,18 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
 		return false, nil
 	}
 
-	if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization) && !e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) {
+	if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization, v1.IntegrationPhaseDeploying) {
+		return false, nil
+	}
+
+	if _, ok := e.CamelCatalog.Runtime.Capabilities["cron"]; !ok {
+		e.Integration.Status.SetCondition(
+			v1.IntegrationConditionCronJobAvailable,
+			corev1.ConditionFalse,
+			v1.IntegrationConditionCronJobNotAvailableReason,
+			"the runtime provider %s does not declare 'cron' capability",
+		)
+
 		return false, nil
 	}
 
@@ -156,7 +167,14 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
 			t.ConcurrencyPolicy = string(v1beta1.ForbidConcurrent)
 		}
 
-		if t.Schedule == "" && t.Components == "" && t.Fallback == nil {
+		hasQuarkus := false
+
+		qt := e.GetTrait("quarkus")
+		if qt != nil {
+			hasQuarkus = qt.(*quarkusTrait).Enabled != nil && *(qt.(*quarkusTrait).Enabled)
+		}
+
+		if (hasQuarkus || (t.Schedule == "" && t.Components == "")) && t.Fallback == nil {
 			// If there's at least a `cron` endpoint, add a fallback implementation
 			fromURIs, err := t.getSourcesFromURIs(e)
 			if err != nil {
@@ -169,10 +187,8 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
 					break
 				}
 			}
-
 		}
 	}
-
 	dt := e.Catalog.GetTrait("deployer")
 	if dt != nil {
 		t.deployer = *dt.(*deployerTrait)
@@ -180,7 +196,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
 
 	// Fallback strategy can be implemented in any other controller
 	if t.Fallback != nil && *t.Fallback {
-		if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) {
+		if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
 			e.Integration.Status.SetCondition(
 				v1.IntegrationConditionCronJobAvailable,
 				corev1.ConditionFalse,
@@ -202,7 +218,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
 		return false, err
 	}
 	if strategy != ControllerStrategyCronJob {
-		if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) {
+		if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
 			e.Integration.Status.SetCondition(
 				v1.IntegrationConditionCronJobAvailable,
 				corev1.ConditionFalse,
@@ -217,30 +233,43 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
 }
 
 func (t *cronTrait) Apply(e *Environment) error {
-	if t.Fallback != nil && *t.Fallback {
-		if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+	if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+		if capability, ok := e.CamelCatalog.Runtime.Capabilities["cron"]; ok {
+			for _, dependency := range capability.Dependencies {
+				util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID))
+			}
+
+			// sort the dependencies to get always the same list if they don't change
+			sort.Strings(e.Integration.Status.Dependencies)
+		}
+
+		if t.Fallback != nil && *t.Fallback {
 			util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, genericCronComponentFallback)
 		}
-	} else {
-		if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
-			util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-cron")
-		} else if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) {
-			cronJob := t.getCronJobFor(e)
-			maps := e.ComputeConfigMaps()
+	}
 
-			e.Resources.AddAll(maps)
-			e.Resources.Add(cronJob)
+	if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
+		if e.ApplicationProperties == nil {
+			e.ApplicationProperties = make(map[string]string)
+		}
 
-			e.Integration.Status.SetCondition(
-				v1.IntegrationConditionCronJobAvailable,
-				corev1.ConditionTrue,
-				v1.IntegrationConditionCronJobAvailableReason,
-				fmt.Sprintf("CronJob name is %s", cronJob.Name),
-			)
+		e.ApplicationProperties["camel.main.duration-max-messages"] = "1"
+		e.ApplicationProperties["loader.interceptor.cron.overridable-components"] = t.Components
+		e.Interceptors = append(e.Interceptors, "cron")
 
-			envvar.SetVal(&e.EnvVars, "CAMEL_K_CRON_OVERRIDE", t.Components)
-		}
+		cronJob := t.getCronJobFor(e)
+		maps := e.ComputeConfigMaps()
+
+		e.Resources.AddAll(maps)
+		e.Resources.Add(cronJob)
+
+		e.Integration.Status.SetCondition(
+			v1.IntegrationConditionCronJobAvailable,
+			corev1.ConditionTrue,
+			v1.IntegrationConditionCronJobAvailableReason,
+			fmt.Sprintf("CronJob name is %s", cronJob.Name))
 	}
+
 	return nil
 }
 
diff --git a/pkg/trait/cron_test.go b/pkg/trait/cron_test.go
index f36791b..c077fe2 100644
--- a/pkg/trait/cron_test.go
+++ b/pkg/trait/cron_test.go
@@ -18,10 +18,21 @@ limitations under the License.
 package trait
 
 import (
+	"context"
+	"fmt"
 	"strings"
 	"testing"
 
-	"github.com/magiconair/properties/assert"
+	"github.com/apache/camel-k/pkg/util"
+
+	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	"github.com/apache/camel-k/pkg/util/camel"
+	k8sutils "github.com/apache/camel-k/pkg/util/kubernetes"
+	corev1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	passert "github.com/magiconair/properties/assert"
+	"github.com/stretchr/testify/assert"
 )
 
 func TestCronFromURI(t *testing.T) {
@@ -198,13 +209,324 @@ func TestCronFromURI(t *testing.T) {
 			if res != nil {
 				gotCron = res.schedule
 			}
-			assert.Equal(t, gotCron, thetest.cron)
+			passert.Equal(t, gotCron, thetest.cron)
 
 			gotComponents := ""
 			if res != nil {
 				gotComponents = strings.Join(res.components, ",")
 			}
-			assert.Equal(t, gotComponents, thetest.components)
+			passert.Equal(t, gotComponents, thetest.components)
 		})
 	}
 }
+
+func TestCronDeps(t *testing.T) {
+	catalog, err := camel.DefaultCatalog()
+	assert.Nil(t, err)
+
+	traitCatalog := NewCatalog(context.TODO(), nil)
+
+	environment := Environment{
+		CamelCatalog: catalog,
+		Catalog:      traitCatalog,
+		Integration: &v1.Integration{
+			ObjectMeta: metav1.ObjectMeta{
+				Name:      "test",
+				Namespace: "ns",
+			},
+			Status: v1.IntegrationStatus{
+				Phase: v1.IntegrationPhaseInitialization,
+			},
+			Spec: v1.IntegrationSpec{
+				Profile: v1.TraitProfileKnative,
+				Sources: []v1.SourceSpec{
+					{
+						DataSpec: v1.DataSpec{
+							Name:    "routes.java",
+							Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`,
+						},
+						Language: v1.LanguageJavaSource,
+					},
+				},
+				Resources: []v1.ResourceSpec{},
+				Traits:    map[string]v1.TraitSpec{},
+			},
+		},
+		IntegrationKit: &v1.IntegrationKit{
+			Status: v1.IntegrationKitStatus{
+				Phase: v1.IntegrationKitPhaseReady,
+			},
+		},
+		Platform: &v1.IntegrationPlatform{
+			Spec: v1.IntegrationPlatformSpec{
+				Cluster: v1.IntegrationPlatformClusterOpenShift,
+				Build: v1.IntegrationPlatformBuildSpec{
+					PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I,
+					Registry:        v1.IntegrationPlatformRegistrySpec{Address: "registry"},
+				},
+				Profile: v1.TraitProfileKnative,
+			},
+		},
+		EnvVars:        make([]corev1.EnvVar, 0),
+		ExecutedTraits: make([]Trait, 0),
+		Resources:      k8sutils.NewCollection(),
+	}
+	environment.Platform.ResyncStatusFullConfig()
+
+	c, err := NewFakeClient("ns")
+	assert.Nil(t, err)
+
+	tc := NewCatalog(context.TODO(), c)
+
+	err = tc.apply(&environment)
+
+	assert.Nil(t, err)
+	assert.NotEmpty(t, environment.ExecutedTraits)
+
+	ct := environment.GetTrait("cron").(*cronTrait)
+	assert.NotNil(t, ct)
+	assert.Nil(t, ct.Fallback)
+
+	capability, ok := environment.CamelCatalog.Runtime.Capabilities["cron"]
+	assert.True(t, ok)
+
+	for _, dependency := range capability.Dependencies {
+		assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID)))
+	}
+}
+
+func TestCronDepsFallback(t *testing.T) {
+	catalog, err := camel.DefaultCatalog()
+	assert.Nil(t, err)
+
+	traitCatalog := NewCatalog(context.TODO(), nil)
+
+	environment := Environment{
+		CamelCatalog: catalog,
+		Catalog:      traitCatalog,
+		Integration: &v1.Integration{
+			ObjectMeta: metav1.ObjectMeta{
+				Name:      "test",
+				Namespace: "ns",
+			},
+			Status: v1.IntegrationStatus{
+				Phase: v1.IntegrationPhaseInitialization,
+			},
+			Spec: v1.IntegrationSpec{
+				Profile: v1.TraitProfileKnative,
+				Sources: []v1.SourceSpec{
+					{
+						DataSpec: v1.DataSpec{
+							Name:    "routes.java",
+							Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`,
+						},
+						Language: v1.LanguageJavaSource,
+					},
+				},
+				Resources: []v1.ResourceSpec{},
+				Traits: map[string]v1.TraitSpec{
+					"cron": {
+						Configuration: map[string]string{
+							"fallback": "true",
+						},
+					},
+				},
+			},
+		},
+		IntegrationKit: &v1.IntegrationKit{
+			Status: v1.IntegrationKitStatus{
+				Phase: v1.IntegrationKitPhaseReady,
+			},
+		},
+		Platform: &v1.IntegrationPlatform{
+			Spec: v1.IntegrationPlatformSpec{
+				Cluster: v1.IntegrationPlatformClusterOpenShift,
+				Build: v1.IntegrationPlatformBuildSpec{
+					PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I,
+					Registry:        v1.IntegrationPlatformRegistrySpec{Address: "registry"},
+				},
+				Profile: v1.TraitProfileKnative,
+			},
+		},
+		EnvVars:        make([]corev1.EnvVar, 0),
+		ExecutedTraits: make([]Trait, 0),
+		Resources:      k8sutils.NewCollection(),
+	}
+	environment.Platform.ResyncStatusFullConfig()
+
+	c, err := NewFakeClient("ns")
+	assert.Nil(t, err)
+
+	tc := NewCatalog(context.TODO(), c)
+
+	err = tc.apply(&environment)
+
+	assert.Nil(t, err)
+	assert.NotEmpty(t, environment.ExecutedTraits)
+
+	ct := environment.GetTrait("cron").(*cronTrait)
+	assert.NotNil(t, ct)
+	assert.NotNil(t, ct.Fallback)
+
+	capability, ok := environment.CamelCatalog.Runtime.Capabilities["cron"]
+	assert.True(t, ok)
+
+	for _, dependency := range capability.Dependencies {
+		assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID)))
+	}
+
+	assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, genericCronComponentFallback))
+}
+
+func TestCronWithMain(t *testing.T) {
+	catalog, err := camel.DefaultCatalog()
+	assert.Nil(t, err)
+
+	traitCatalog := NewCatalog(context.TODO(), nil)
+
+	environment := Environment{
+		CamelCatalog: catalog,
+		Catalog:      traitCatalog,
+		Integration: &v1.Integration{
+			ObjectMeta: metav1.ObjectMeta{
+				Name:      "test",
+				Namespace: "ns",
+			},
+			Status: v1.IntegrationStatus{
+				Phase: v1.IntegrationPhaseDeploying,
+			},
+			Spec: v1.IntegrationSpec{
+				Profile: v1.TraitProfileKnative,
+				Sources: []v1.SourceSpec{
+					{
+						DataSpec: v1.DataSpec{
+							Name:    "routes.java",
+							Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`,
+						},
+						Language: v1.LanguageJavaSource,
+					},
+				},
+				Resources: []v1.ResourceSpec{},
+				Traits: map[string]v1.TraitSpec{
+					"quarkus": {
+						Configuration: map[string]string{
+							"enabled": "false",
+						},
+					},
+				},
+			},
+		},
+		IntegrationKit: &v1.IntegrationKit{
+			Status: v1.IntegrationKitStatus{
+				Phase: v1.IntegrationKitPhaseReady,
+			},
+		},
+		Platform: &v1.IntegrationPlatform{
+			Spec: v1.IntegrationPlatformSpec{
+				Cluster: v1.IntegrationPlatformClusterOpenShift,
+				Build: v1.IntegrationPlatformBuildSpec{
+					PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I,
+					Registry:        v1.IntegrationPlatformRegistrySpec{Address: "registry"},
+				},
+				Profile: v1.TraitProfileKnative,
+			},
+		},
+		EnvVars:        make([]corev1.EnvVar, 0),
+		ExecutedTraits: make([]Trait, 0),
+		Resources:      k8sutils.NewCollection(),
+	}
+	environment.Platform.ResyncStatusFullConfig()
+
+	c, err := NewFakeClient("ns")
+	assert.Nil(t, err)
+
+	tc := NewCatalog(context.TODO(), c)
+
+	err = tc.apply(&environment)
+
+	assert.Nil(t, err)
+	assert.NotEmpty(t, environment.ExecutedTraits)
+	assert.Nil(t, environment.GetTrait("quarkus"))
+
+	ct := environment.GetTrait("cron").(*cronTrait)
+	assert.NotNil(t, ct)
+	assert.Nil(t, ct.Fallback)
+	assert.True(t, util.StringSliceExists(environment.Interceptors, "cron"))
+}
+
+func TestCronWithQuarkus(t *testing.T) {
+	catalog, err := camel.DefaultCatalog()
+	assert.Nil(t, err)
+
+	traitCatalog := NewCatalog(context.TODO(), nil)
+
+	environment := Environment{
+		CamelCatalog: catalog,
+		Catalog:      traitCatalog,
+		Integration: &v1.Integration{
+			ObjectMeta: metav1.ObjectMeta{
+				Name:      "test",
+				Namespace: "ns",
+			},
+			Status: v1.IntegrationStatus{
+				Phase: v1.IntegrationPhaseDeploying,
+			},
+			Spec: v1.IntegrationSpec{
+				Profile: v1.TraitProfileKnative,
+				Sources: []v1.SourceSpec{
+					{
+						DataSpec: v1.DataSpec{
+							Name:    "routes.java",
+							Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`,
+						},
+						Language: v1.LanguageJavaSource,
+					},
+				},
+				Resources: []v1.ResourceSpec{},
+				Traits: map[string]v1.TraitSpec{
+					"quarkus": {
+						Configuration: map[string]string{
+							"enabled": "true",
+						},
+					},
+				},
+			},
+		},
+		IntegrationKit: &v1.IntegrationKit{
+			Status: v1.IntegrationKitStatus{
+				Phase: v1.IntegrationKitPhaseReady,
+			},
+		},
+		Platform: &v1.IntegrationPlatform{
+			Spec: v1.IntegrationPlatformSpec{
+				Cluster: v1.IntegrationPlatformClusterOpenShift,
+				Build: v1.IntegrationPlatformBuildSpec{
+					PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I,
+					Registry:        v1.IntegrationPlatformRegistrySpec{Address: "registry"},
+				},
+				Profile: v1.TraitProfileKnative,
+			},
+		},
+		EnvVars:        make([]corev1.EnvVar, 0),
+		ExecutedTraits: make([]Trait, 0),
+		Resources:      k8sutils.NewCollection(),
+	}
+	environment.Platform.ResyncStatusFullConfig()
+
+	c, err := NewFakeClient("ns")
+	assert.Nil(t, err)
+
+	tc := NewCatalog(context.TODO(), c)
+
+	err = tc.apply(&environment)
+
+	assert.Nil(t, err)
+	assert.NotEmpty(t, environment.ExecutedTraits)
+	assert.NotNil(t, environment.GetTrait("quarkus"))
+
+	ct := environment.GetTrait("cron").(*cronTrait)
+	assert.NotNil(t, ct)
+	assert.NotNil(t, ct.Fallback)
+	assert.True(t, *ct.Fallback)
+	assert.True(t, util.StringSliceExists(environment.Interceptors, "cron"))
+}
diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go
index c3a50e3..e6d751e 100644
--- a/pkg/trait/trait_types.go
+++ b/pkg/trait/trait_types.go
@@ -209,6 +209,7 @@ type Environment struct {
 	ExecutedTraits        []Trait
 	EnvVars               []corev1.EnvVar
 	ApplicationProperties map[string]string
+	Interceptors          []string
 }
 
 // ControllerStrategy is used to determine the kind of controller that needs to be created for the integration
@@ -483,6 +484,7 @@ func (e *Environment) ComputeSourcesURI() []string {
 		srcName := strings.TrimPrefix(s.Name, "/")
 		src := path.Join(root, srcName)
 		src = "file:" + src
+		interceptors := make([]string, 0, len(s.Interceptors))
 
 		params := make([]string, 0)
 		if s.InferLanguage() != "" {
@@ -494,8 +496,15 @@ func (e *Environment) ComputeSourcesURI() []string {
 		if s.Compression {
 			params = append(params, "compression=true")
 		}
+
 		if s.Interceptors != nil {
-			params = append(params, "interceptors="+strings.Join(s.Interceptors, ","))
+			interceptors = append(interceptors, s.Interceptors...)
+		}
+		if e.Interceptors != nil {
+			interceptors = append(interceptors, e.Interceptors...)
+		}
+		if len(interceptors) > 0 {
+			params = append(params, "interceptors="+strings.Join(interceptors, ","))
 		}
 
 		if len(params) > 0 {