You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/03/10 17:11:40 UTC
[camel-k] 05/09: Adapt cron trait to latest camel-k-runtime #1329
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 61fc9335c0b5fb10daea1e109a334b97982242a4
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Mon Mar 9 16:57:03 2020 +0100
Adapt cron trait to latest camel-k-runtime #1329
---
e2e/cron_test.go | 9 ++
pkg/trait/cron.go | 79 ++++++++----
pkg/trait/cron_test.go | 328 ++++++++++++++++++++++++++++++++++++++++++++++-
pkg/trait/trait_types.go | 11 +-
4 files changed, 398 insertions(+), 29 deletions(-)
diff --git a/e2e/cron_test.go b/e2e/cron_test.go
index a64936a..857f7f9 100644
--- a/e2e/cron_test.go
+++ b/e2e/cron_test.go
@@ -58,5 +58,14 @@ func TestRunCronExample(t *testing.T) {
Eventually(integrationLogs(ns, "cron-fallback"), testTimeoutShort).Should(ContainSubstring("Magicstring!"))
Expect(kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil())
})
+
+ t.Run("cron-fallback-quarkus", func(t *testing.T) {
+ RegisterTestingT(t)
+
+ Expect(kamel("run", "-n", ns, "files/cron.groovy").Execute()).Should(BeNil())
+ Eventually(integrationPodPhase(ns, "cron"), testTimeoutMedium).Should(Equal(v1.PodRunning))
+ Eventually(integrationLogs(ns, "cron"), testTimeoutShort).Should(ContainSubstring("Magicstring!"))
+ Expect(kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil())
+ })
})
}
diff --git a/pkg/trait/cron.go b/pkg/trait/cron.go
index 4445438..6b6ac85 100644
--- a/pkg/trait/cron.go
+++ b/pkg/trait/cron.go
@@ -20,6 +20,7 @@ package trait
import (
"fmt"
"regexp"
+ "sort"
"strconv"
"strings"
@@ -31,7 +32,6 @@ import (
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/metadata"
"github.com/apache/camel-k/pkg/util"
- "github.com/apache/camel-k/pkg/util/envvar"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/uri"
)
@@ -125,7 +125,18 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
return false, nil
}
- if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization) && !e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) {
+ if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization, v1.IntegrationPhaseDeploying) {
+ return false, nil
+ }
+
+ if _, ok := e.CamelCatalog.Runtime.Capabilities["cron"]; !ok {
+ e.Integration.Status.SetCondition(
+ v1.IntegrationConditionCronJobAvailable,
+ corev1.ConditionFalse,
+ v1.IntegrationConditionCronJobNotAvailableReason,
+ "the runtime provider %s does not declare 'cron' capability",
+ )
+
return false, nil
}
@@ -156,7 +167,14 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
t.ConcurrencyPolicy = string(v1beta1.ForbidConcurrent)
}
- if t.Schedule == "" && t.Components == "" && t.Fallback == nil {
+ hasQuarkus := false
+
+ qt := e.GetTrait("quarkus")
+ if qt != nil {
+ hasQuarkus = qt.(*quarkusTrait).Enabled != nil && *(qt.(*quarkusTrait).Enabled)
+ }
+
+ if (hasQuarkus || (t.Schedule == "" && t.Components == "")) && t.Fallback == nil {
// If there's at least a `cron` endpoint, add a fallback implementation
fromURIs, err := t.getSourcesFromURIs(e)
if err != nil {
@@ -169,10 +187,8 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
break
}
}
-
}
}
-
dt := e.Catalog.GetTrait("deployer")
if dt != nil {
t.deployer = *dt.(*deployerTrait)
@@ -180,7 +196,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
// Fallback strategy can be implemented in any other controller
if t.Fallback != nil && *t.Fallback {
- if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) {
+ if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
e.Integration.Status.SetCondition(
v1.IntegrationConditionCronJobAvailable,
corev1.ConditionFalse,
@@ -202,7 +218,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
return false, err
}
if strategy != ControllerStrategyCronJob {
- if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) {
+ if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
e.Integration.Status.SetCondition(
v1.IntegrationConditionCronJobAvailable,
corev1.ConditionFalse,
@@ -217,30 +233,43 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
}
func (t *cronTrait) Apply(e *Environment) error {
- if t.Fallback != nil && *t.Fallback {
- if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+ if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+ if capability, ok := e.CamelCatalog.Runtime.Capabilities["cron"]; ok {
+ for _, dependency := range capability.Dependencies {
+ util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID))
+ }
+
+ // sort the dependencies to get always the same list if they don't change
+ sort.Strings(e.Integration.Status.Dependencies)
+ }
+
+ if t.Fallback != nil && *t.Fallback {
util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, genericCronComponentFallback)
}
- } else {
- if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
- util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-cron")
- } else if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) {
- cronJob := t.getCronJobFor(e)
- maps := e.ComputeConfigMaps()
+ }
- e.Resources.AddAll(maps)
- e.Resources.Add(cronJob)
+ if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
+ if e.ApplicationProperties == nil {
+ e.ApplicationProperties = make(map[string]string)
+ }
- e.Integration.Status.SetCondition(
- v1.IntegrationConditionCronJobAvailable,
- corev1.ConditionTrue,
- v1.IntegrationConditionCronJobAvailableReason,
- fmt.Sprintf("CronJob name is %s", cronJob.Name),
- )
+ e.ApplicationProperties["camel.main.duration-max-messages"] = "1"
+ e.ApplicationProperties["loader.interceptor.cron.overridable-components"] = t.Components
+ e.Interceptors = append(e.Interceptors, "cron")
- envvar.SetVal(&e.EnvVars, "CAMEL_K_CRON_OVERRIDE", t.Components)
- }
+ cronJob := t.getCronJobFor(e)
+ maps := e.ComputeConfigMaps()
+
+ e.Resources.AddAll(maps)
+ e.Resources.Add(cronJob)
+
+ e.Integration.Status.SetCondition(
+ v1.IntegrationConditionCronJobAvailable,
+ corev1.ConditionTrue,
+ v1.IntegrationConditionCronJobAvailableReason,
+ fmt.Sprintf("CronJob name is %s", cronJob.Name))
}
+
return nil
}
diff --git a/pkg/trait/cron_test.go b/pkg/trait/cron_test.go
index f36791b..c077fe2 100644
--- a/pkg/trait/cron_test.go
+++ b/pkg/trait/cron_test.go
@@ -18,10 +18,21 @@ limitations under the License.
package trait
import (
+ "context"
+ "fmt"
"strings"
"testing"
- "github.com/magiconair/properties/assert"
+ "github.com/apache/camel-k/pkg/util"
+
+ v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/pkg/util/camel"
+ k8sutils "github.com/apache/camel-k/pkg/util/kubernetes"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ passert "github.com/magiconair/properties/assert"
+ "github.com/stretchr/testify/assert"
)
func TestCronFromURI(t *testing.T) {
@@ -198,13 +209,324 @@ func TestCronFromURI(t *testing.T) {
if res != nil {
gotCron = res.schedule
}
- assert.Equal(t, gotCron, thetest.cron)
+ passert.Equal(t, gotCron, thetest.cron)
gotComponents := ""
if res != nil {
gotComponents = strings.Join(res.components, ",")
}
- assert.Equal(t, gotComponents, thetest.components)
+ passert.Equal(t, gotComponents, thetest.components)
})
}
}
+
+func TestCronDeps(t *testing.T) {
+ catalog, err := camel.DefaultCatalog()
+ assert.Nil(t, err)
+
+ traitCatalog := NewCatalog(context.TODO(), nil)
+
+ environment := Environment{
+ CamelCatalog: catalog,
+ Catalog: traitCatalog,
+ Integration: &v1.Integration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test",
+ Namespace: "ns",
+ },
+ Status: v1.IntegrationStatus{
+ Phase: v1.IntegrationPhaseInitialization,
+ },
+ Spec: v1.IntegrationSpec{
+ Profile: v1.TraitProfileKnative,
+ Sources: []v1.SourceSpec{
+ {
+ DataSpec: v1.DataSpec{
+ Name: "routes.java",
+ Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`,
+ },
+ Language: v1.LanguageJavaSource,
+ },
+ },
+ Resources: []v1.ResourceSpec{},
+ Traits: map[string]v1.TraitSpec{},
+ },
+ },
+ IntegrationKit: &v1.IntegrationKit{
+ Status: v1.IntegrationKitStatus{
+ Phase: v1.IntegrationKitPhaseReady,
+ },
+ },
+ Platform: &v1.IntegrationPlatform{
+ Spec: v1.IntegrationPlatformSpec{
+ Cluster: v1.IntegrationPlatformClusterOpenShift,
+ Build: v1.IntegrationPlatformBuildSpec{
+ PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I,
+ Registry: v1.IntegrationPlatformRegistrySpec{Address: "registry"},
+ },
+ Profile: v1.TraitProfileKnative,
+ },
+ },
+ EnvVars: make([]corev1.EnvVar, 0),
+ ExecutedTraits: make([]Trait, 0),
+ Resources: k8sutils.NewCollection(),
+ }
+ environment.Platform.ResyncStatusFullConfig()
+
+ c, err := NewFakeClient("ns")
+ assert.Nil(t, err)
+
+ tc := NewCatalog(context.TODO(), c)
+
+ err = tc.apply(&environment)
+
+ assert.Nil(t, err)
+ assert.NotEmpty(t, environment.ExecutedTraits)
+
+ ct := environment.GetTrait("cron").(*cronTrait)
+ assert.NotNil(t, ct)
+ assert.Nil(t, ct.Fallback)
+
+ capability, ok := environment.CamelCatalog.Runtime.Capabilities["cron"]
+ assert.True(t, ok)
+
+ for _, dependency := range capability.Dependencies {
+ assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID)))
+ }
+}
+
+func TestCronDepsFallback(t *testing.T) {
+ catalog, err := camel.DefaultCatalog()
+ assert.Nil(t, err)
+
+ traitCatalog := NewCatalog(context.TODO(), nil)
+
+ environment := Environment{
+ CamelCatalog: catalog,
+ Catalog: traitCatalog,
+ Integration: &v1.Integration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test",
+ Namespace: "ns",
+ },
+ Status: v1.IntegrationStatus{
+ Phase: v1.IntegrationPhaseInitialization,
+ },
+ Spec: v1.IntegrationSpec{
+ Profile: v1.TraitProfileKnative,
+ Sources: []v1.SourceSpec{
+ {
+ DataSpec: v1.DataSpec{
+ Name: "routes.java",
+ Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`,
+ },
+ Language: v1.LanguageJavaSource,
+ },
+ },
+ Resources: []v1.ResourceSpec{},
+ Traits: map[string]v1.TraitSpec{
+ "cron": {
+ Configuration: map[string]string{
+ "fallback": "true",
+ },
+ },
+ },
+ },
+ },
+ IntegrationKit: &v1.IntegrationKit{
+ Status: v1.IntegrationKitStatus{
+ Phase: v1.IntegrationKitPhaseReady,
+ },
+ },
+ Platform: &v1.IntegrationPlatform{
+ Spec: v1.IntegrationPlatformSpec{
+ Cluster: v1.IntegrationPlatformClusterOpenShift,
+ Build: v1.IntegrationPlatformBuildSpec{
+ PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I,
+ Registry: v1.IntegrationPlatformRegistrySpec{Address: "registry"},
+ },
+ Profile: v1.TraitProfileKnative,
+ },
+ },
+ EnvVars: make([]corev1.EnvVar, 0),
+ ExecutedTraits: make([]Trait, 0),
+ Resources: k8sutils.NewCollection(),
+ }
+ environment.Platform.ResyncStatusFullConfig()
+
+ c, err := NewFakeClient("ns")
+ assert.Nil(t, err)
+
+ tc := NewCatalog(context.TODO(), c)
+
+ err = tc.apply(&environment)
+
+ assert.Nil(t, err)
+ assert.NotEmpty(t, environment.ExecutedTraits)
+
+ ct := environment.GetTrait("cron").(*cronTrait)
+ assert.NotNil(t, ct)
+ assert.NotNil(t, ct.Fallback)
+
+ capability, ok := environment.CamelCatalog.Runtime.Capabilities["cron"]
+ assert.True(t, ok)
+
+ for _, dependency := range capability.Dependencies {
+ assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID)))
+ }
+
+ assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, genericCronComponentFallback))
+}
+
+func TestCronWithMain(t *testing.T) {
+ catalog, err := camel.DefaultCatalog()
+ assert.Nil(t, err)
+
+ traitCatalog := NewCatalog(context.TODO(), nil)
+
+ environment := Environment{
+ CamelCatalog: catalog,
+ Catalog: traitCatalog,
+ Integration: &v1.Integration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test",
+ Namespace: "ns",
+ },
+ Status: v1.IntegrationStatus{
+ Phase: v1.IntegrationPhaseDeploying,
+ },
+ Spec: v1.IntegrationSpec{
+ Profile: v1.TraitProfileKnative,
+ Sources: []v1.SourceSpec{
+ {
+ DataSpec: v1.DataSpec{
+ Name: "routes.java",
+ Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`,
+ },
+ Language: v1.LanguageJavaSource,
+ },
+ },
+ Resources: []v1.ResourceSpec{},
+ Traits: map[string]v1.TraitSpec{
+ "quarkus": {
+ Configuration: map[string]string{
+ "enabled": "false",
+ },
+ },
+ },
+ },
+ },
+ IntegrationKit: &v1.IntegrationKit{
+ Status: v1.IntegrationKitStatus{
+ Phase: v1.IntegrationKitPhaseReady,
+ },
+ },
+ Platform: &v1.IntegrationPlatform{
+ Spec: v1.IntegrationPlatformSpec{
+ Cluster: v1.IntegrationPlatformClusterOpenShift,
+ Build: v1.IntegrationPlatformBuildSpec{
+ PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I,
+ Registry: v1.IntegrationPlatformRegistrySpec{Address: "registry"},
+ },
+ Profile: v1.TraitProfileKnative,
+ },
+ },
+ EnvVars: make([]corev1.EnvVar, 0),
+ ExecutedTraits: make([]Trait, 0),
+ Resources: k8sutils.NewCollection(),
+ }
+ environment.Platform.ResyncStatusFullConfig()
+
+ c, err := NewFakeClient("ns")
+ assert.Nil(t, err)
+
+ tc := NewCatalog(context.TODO(), c)
+
+ err = tc.apply(&environment)
+
+ assert.Nil(t, err)
+ assert.NotEmpty(t, environment.ExecutedTraits)
+ assert.Nil(t, environment.GetTrait("quarkus"))
+
+ ct := environment.GetTrait("cron").(*cronTrait)
+ assert.NotNil(t, ct)
+ assert.Nil(t, ct.Fallback)
+ assert.True(t, util.StringSliceExists(environment.Interceptors, "cron"))
+}
+
+func TestCronWithQuarkus(t *testing.T) {
+ catalog, err := camel.DefaultCatalog()
+ assert.Nil(t, err)
+
+ traitCatalog := NewCatalog(context.TODO(), nil)
+
+ environment := Environment{
+ CamelCatalog: catalog,
+ Catalog: traitCatalog,
+ Integration: &v1.Integration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test",
+ Namespace: "ns",
+ },
+ Status: v1.IntegrationStatus{
+ Phase: v1.IntegrationPhaseDeploying,
+ },
+ Spec: v1.IntegrationSpec{
+ Profile: v1.TraitProfileKnative,
+ Sources: []v1.SourceSpec{
+ {
+ DataSpec: v1.DataSpec{
+ Name: "routes.java",
+ Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`,
+ },
+ Language: v1.LanguageJavaSource,
+ },
+ },
+ Resources: []v1.ResourceSpec{},
+ Traits: map[string]v1.TraitSpec{
+ "quarkus": {
+ Configuration: map[string]string{
+ "enabled": "true",
+ },
+ },
+ },
+ },
+ },
+ IntegrationKit: &v1.IntegrationKit{
+ Status: v1.IntegrationKitStatus{
+ Phase: v1.IntegrationKitPhaseReady,
+ },
+ },
+ Platform: &v1.IntegrationPlatform{
+ Spec: v1.IntegrationPlatformSpec{
+ Cluster: v1.IntegrationPlatformClusterOpenShift,
+ Build: v1.IntegrationPlatformBuildSpec{
+ PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I,
+ Registry: v1.IntegrationPlatformRegistrySpec{Address: "registry"},
+ },
+ Profile: v1.TraitProfileKnative,
+ },
+ },
+ EnvVars: make([]corev1.EnvVar, 0),
+ ExecutedTraits: make([]Trait, 0),
+ Resources: k8sutils.NewCollection(),
+ }
+ environment.Platform.ResyncStatusFullConfig()
+
+ c, err := NewFakeClient("ns")
+ assert.Nil(t, err)
+
+ tc := NewCatalog(context.TODO(), c)
+
+ err = tc.apply(&environment)
+
+ assert.Nil(t, err)
+ assert.NotEmpty(t, environment.ExecutedTraits)
+ assert.NotNil(t, environment.GetTrait("quarkus"))
+
+ ct := environment.GetTrait("cron").(*cronTrait)
+ assert.NotNil(t, ct)
+ assert.NotNil(t, ct.Fallback)
+ assert.True(t, *ct.Fallback)
+ assert.True(t, util.StringSliceExists(environment.Interceptors, "cron"))
+}
diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go
index c3a50e3..e6d751e 100644
--- a/pkg/trait/trait_types.go
+++ b/pkg/trait/trait_types.go
@@ -209,6 +209,7 @@ type Environment struct {
ExecutedTraits []Trait
EnvVars []corev1.EnvVar
ApplicationProperties map[string]string
+ Interceptors []string
}
// ControllerStrategy is used to determine the kind of controller that needs to be created for the integration
@@ -483,6 +484,7 @@ func (e *Environment) ComputeSourcesURI() []string {
srcName := strings.TrimPrefix(s.Name, "/")
src := path.Join(root, srcName)
src = "file:" + src
+ interceptors := make([]string, 0, len(s.Interceptors))
params := make([]string, 0)
if s.InferLanguage() != "" {
@@ -494,8 +496,15 @@ func (e *Environment) ComputeSourcesURI() []string {
if s.Compression {
params = append(params, "compression=true")
}
+
if s.Interceptors != nil {
- params = append(params, "interceptors="+strings.Join(s.Interceptors, ","))
+ interceptors = append(interceptors, s.Interceptors...)
+ }
+ if e.Interceptors != nil {
+ interceptors = append(interceptors, e.Interceptors...)
+ }
+ if len(interceptors) > 0 {
+ params = append(params, "interceptors="+strings.Join(interceptors, ","))
}
if len(params) > 0 {