You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2018/12/06 16:06:49 UTC

[camel-k] 02/06: Fix #219: use deployment in Knative when cannot scale to 0

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 46fe56be0cb06238d02401b52514100cd6b9a106
Author: nferraro <ni...@gmail.com>
AuthorDate: Mon Dec 3 15:21:47 2018 +0100

    Fix #219: use deployment in Knative when cannot scale to 0
---
 pkg/metadata/http.go               | 131 +++++++++++++++++++++++++
 pkg/metadata/metadata.go           |  57 ++++++++++-
 pkg/metadata/metadata_http_test.go | 195 +++++++++++++++++++++++++++++++++++++
 pkg/metadata/types.go              |   4 +
 pkg/trait/catalog.go               |  10 +-
 pkg/trait/knative.go               |  70 ++++++++++---
 pkg/trait/service.go               |  54 +++-------
 pkg/trait/trait_test.go            |  22 +++--
 pkg/util/kubernetes/collection.go  |  33 +++++++
 9 files changed, 508 insertions(+), 68 deletions(-)

diff --git a/pkg/metadata/http.go b/pkg/metadata/http.go
new file mode 100644
index 0000000..6ca7065
--- /dev/null
+++ b/pkg/metadata/http.go
@@ -0,0 +1,131 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package metadata
+
+import (
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"regexp"
+	"strings"
+)
+
+var httpURIs = map[string]bool{
+	"ahc":                  true,
+	"ahc-ws":               true,
+	"atmosphere-websocket": true,
+	"cxf":         true,
+	"cxfrs":       true,
+	"grpc":        true,
+	"jetty":       true,
+	"netty-http":  true,
+	"netty4-http": true,
+	"rest":        true,
+	"restlet":     true,
+	"servlet":     true,
+	"spark-rest":  true,
+	"spring-ws":   true,
+	"undertow":    true,
+	"websocket":   true,
+	"knative":     true,
+}
+
+var passiveURIs = map[string]bool{
+	"bean":       true,
+	"binding":    true,
+	"browse":     true,
+	"class":      true,
+	"controlbus": true,
+	"dataformat": true,
+	"dataset":    true,
+	"direct":     true,
+	"direct-vm":  true,
+	"language":   true,
+	"log":        true,
+	"mock":       true,
+	"properties": true,
+	"ref":        true,
+	"seda":       true,
+	"stub":       true,
+	"test":       true,
+	"validator":  true,
+	"vm":         true,
+}
+
+var restIndicator = regexp.MustCompile(".*rest\\s*\\([^)]*\\).*")
+var xmlRestIndicator = regexp.MustCompile(".*<\\s*rest\\s+[^>]*>.*")
+
+// requiresHTTPService returns true if the integration needs to expose itself through HTTP
+func requiresHTTPService(source v1alpha1.SourceSpec, fromURIs []string) bool {
+	if hasRestIndicator(source) {
+		return true
+	}
+	return containsHTTPURIs(fromURIs)
+}
+
+// hasOnlyPassiveEndpoints returns true if the integration has no endpoint that needs to remain always active
+func hasOnlyPassiveEndpoints(source v1alpha1.SourceSpec, fromURIs []string) bool {
+	passivePlusHTTP := make(map[string]bool)
+	for k, v := range passiveURIs {
+		passivePlusHTTP[k] = v
+	}
+	for k, v := range httpURIs {
+		passivePlusHTTP[k] = v
+	}
+	return containsOnlyURIsIn(fromURIs, passivePlusHTTP)
+}
+
+func containsHTTPURIs(fromURI []string) bool {
+	for _, uri := range fromURI {
+		prefix := getURIPrefix(uri)
+		if enabled, ok := httpURIs[prefix]; ok && enabled {
+			return true
+		}
+	}
+	return false
+}
+
+func containsOnlyURIsIn(fromURI []string, allowed map[string]bool) bool {
+	for _, uri := range fromURI {
+		prefix := getURIPrefix(uri)
+		if enabled, ok := allowed[prefix]; !ok || !enabled {
+			return false
+		}
+	}
+	return true
+}
+
+func getURIPrefix(uri string) string {
+	parts := strings.SplitN(uri, ":", 2)
+	if len(parts) > 0 {
+		return parts[0]
+	}
+	return ""
+}
+
+func hasRestIndicator(source v1alpha1.SourceSpec) bool {
+	pat := getRestIndicatorRegexpsForLanguage(source.Language)
+	return pat.MatchString(source.Content)
+}
+
+func getRestIndicatorRegexpsForLanguage(language v1alpha1.Language) *regexp.Regexp {
+	switch language {
+	case v1alpha1.LanguageXML:
+		return xmlRestIndicator
+	default:
+		return restIndicator
+	}
+}
diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go
index 6e43b0b..9eec7bd 100644
--- a/pkg/metadata/metadata.go
+++ b/pkg/metadata/metadata.go
@@ -19,19 +19,68 @@ package metadata
 
 import (
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"sort"
 )
 
+// ExtractAll returns metadata information from all listed source codes
+func ExtractAll(sources []v1alpha1.SourceSpec) IntegrationMetadata {
+	// neutral metadata
+	meta := IntegrationMetadata{
+		Language:            "",
+		Dependencies:        []string{},
+		FromURIs:            []string{},
+		ToURIs:              []string{},
+		PassiveEndpoints:    true,
+		RequiresHTTPService: false,
+	}
+	for _, source := range sources {
+		meta = merge(meta, Extract(source))
+	}
+	return meta
+}
+
+func merge(m1 IntegrationMetadata, m2 IntegrationMetadata) IntegrationMetadata {
+	language := m2.Language
+	if m1.Language != "" && m1.Language != language {
+		language = ""
+	}
+	deps := make(map[string]bool)
+	for _, d := range m1.Dependencies {
+		deps[d] = true
+	}
+	for _, d := range m2.Dependencies {
+		deps[d] = true
+	}
+	allDependencies := make([]string, 0)
+	for k := range deps {
+		allDependencies = append(allDependencies, k)
+	}
+	sort.Strings(allDependencies)
+	return IntegrationMetadata{
+		Language:            language,
+		FromURIs:            append(m1.FromURIs, m2.FromURIs...),
+		ToURIs:              append(m1.ToURIs, m2.ToURIs...),
+		Dependencies:        allDependencies,
+		RequiresHTTPService: m1.RequiresHTTPService || m2.RequiresHTTPService,
+		PassiveEndpoints:    m1.PassiveEndpoints && m2.PassiveEndpoints,
+	}
+}
+
 // Extract returns metadata information from the source code
 func Extract(source v1alpha1.SourceSpec) IntegrationMetadata {
 	language := discoverLanguage(source)
 	fromURIs := discoverFromURIs(source, language)
 	toURIs := discoverToURIs(source, language)
 	dependencies := discoverDependencies(source, fromURIs, toURIs)
+	requiresHTTPService := requiresHTTPService(source, fromURIs)
+	passiveEndpoints := hasOnlyPassiveEndpoints(source, fromURIs)
 	return IntegrationMetadata{
-		Language:     language,
-		FromURIs:     fromURIs,
-		ToURIs:       toURIs,
-		Dependencies: dependencies,
+		Language:            language,
+		FromURIs:            fromURIs,
+		ToURIs:              toURIs,
+		Dependencies:        dependencies,
+		RequiresHTTPService: requiresHTTPService,
+		PassiveEndpoints:    passiveEndpoints,
 	}
 }
 
diff --git a/pkg/metadata/metadata_http_test.go b/pkg/metadata/metadata_http_test.go
new file mode 100644
index 0000000..d75f57b
--- /dev/null
+++ b/pkg/metadata/metadata_http_test.go
@@ -0,0 +1,195 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package metadata
+
+import (
+	"testing"
+
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestHttpJavaSource(t *testing.T) {
+	code := v1alpha1.SourceSpec{
+		Name:     "Request.java",
+		Language: v1alpha1.LanguageJavaSource,
+		Content: `
+			from("telegram:bots/cippa").to("log:stash");
+			from("undertow:uri").to("log:stash");
+			from("ine:xistent").to("log:stash");
+		`,
+	}
+	meta := Extract(code)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.False(t, meta.PassiveEndpoints)
+}
+
+func TestHttpOnlyJavaSource(t *testing.T) {
+	code := v1alpha1.SourceSpec{
+		Name:     "Request.java",
+		Language: v1alpha1.LanguageJavaSource,
+		Content: `
+			from("direct:bots/cippa").to("log:stash");
+			from("undertow:uri").to("log:stash");
+			from("seda:path").to("log:stash");
+		`,
+	}
+	meta := Extract(code)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.True(t, meta.PassiveEndpoints)
+}
+
+func TestHttpOnlyJavaSourceRest(t *testing.T) {
+	code := v1alpha1.SourceSpec{
+		Name:     "Request.java",
+		Language: v1alpha1.LanguageJavaSource,
+		Content: `
+			from("direct:bots/cippa").to("log:stash");
+			rest().get("").to("log:stash");
+		`,
+	}
+	meta := Extract(code)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.True(t, meta.PassiveEndpoints)
+}
+
+func TestHttpOnlyJavaSourceRest2(t *testing.T) {
+	code := v1alpha1.SourceSpec{
+		Name:     "Request.java",
+		Language: v1alpha1.LanguageJavaSource,
+		Content: `
+			from("vm:bots/cippa").to("log:stash");
+			rest( ).get("").to("log:stash");
+		`,
+	}
+	meta := Extract(code)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.True(t, meta.PassiveEndpoints)
+}
+
+
+func TestNoHttpGroovySource(t *testing.T) {
+	code := v1alpha1.SourceSpec{
+		Name:     "Request.groovy",
+		Language: v1alpha1.LanguageGroovy,
+		Content: `
+			from('direct:bots/cippa').to("log:stash");
+			from('teelgram:uri').to("log:stash");
+			from('seda:path').to("log:stash");
+		`,
+	}
+	meta := Extract(code)
+	assert.False(t, meta.RequiresHTTPService)
+	assert.False(t, meta.PassiveEndpoints)
+}
+
+func TestHttpOnlyGroovySource(t *testing.T) {
+	code := v1alpha1.SourceSpec{
+		Name:     "Request.groovy",
+		Language: v1alpha1.LanguageGroovy,
+		Content: `
+			from('direct:bots/cippa').to("log:stash");
+			from('undertow:uri').to("log:stash");
+			from('seda:path').to("log:stash");
+		`,
+	}
+	meta := Extract(code)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.True(t, meta.PassiveEndpoints)
+}
+
+func TestHttpXMLSource(t *testing.T) {
+	code := v1alpha1.SourceSpec{
+		Name:     "routes.xml",
+		Language: v1alpha1.LanguageXML,
+		Content: `
+			<from uri="telegram:ciao" />
+			<rest path="/">
+			</rest>
+		`,
+	}
+	meta := Extract(code)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.False(t, meta.PassiveEndpoints)
+}
+
+func TestHttpOnlyXMLSource(t *testing.T) {
+	code := v1alpha1.SourceSpec{
+		Name:     "routes.xml",
+		Language: v1alpha1.LanguageXML,
+		Content: `
+			<from uri="direct:ciao" />
+			<rest path="/">
+			</rest>
+		`,
+	}
+	meta := Extract(code)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.True(t, meta.PassiveEndpoints)
+}
+
+
+
+func TestMultilangHTTPOnlySource(t *testing.T) {
+	codes := []v1alpha1.SourceSpec{
+		{
+			Name:     "routes.xml",
+			Language: v1alpha1.LanguageXML,
+			Content: `
+				<from uri="direct:ciao" />
+				<rest path="/">
+				</rest>
+			`,
+		},
+		{
+			Name:     "routes2.groovy",
+			Language: v1alpha1.LanguageGroovy,
+			Content: `
+				from('seda:in').to('seda:out')
+			`,
+		},
+	}
+	meta := ExtractAll(codes)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.True(t, meta.PassiveEndpoints)
+}
+
+func TestMultilangHTTPSource(t *testing.T) {
+	codes := []v1alpha1.SourceSpec{
+		{
+			Name:     "routes.xml",
+			Language: v1alpha1.LanguageXML,
+			Content: `
+				<from uri="direct:ciao" />
+				<rest path="/">
+				</rest>
+			`,
+		},
+		{
+			Name:     "routes2.groovy",
+			Language: v1alpha1.LanguageGroovy,
+			Content: `
+				from('seda:in').to('seda:out')
+				from('timer:tick').to('log:info')
+			`,
+		},
+	}
+	meta := ExtractAll(codes)
+	assert.True(t, meta.RequiresHTTPService)
+	assert.False(t, meta.PassiveEndpoints)
+}
\ No newline at end of file
diff --git a/pkg/metadata/types.go b/pkg/metadata/types.go
index de10bb1..04ebe1c 100644
--- a/pkg/metadata/types.go
+++ b/pkg/metadata/types.go
@@ -29,4 +29,8 @@ type IntegrationMetadata struct {
 	Dependencies []string
 	// The language in which the integration is written
 	Language v1alpha1.Language
+	// RequiresHTTPService indicates if the integration needs to be invoked through HTTP
+	RequiresHTTPService bool
+	// PassiveEndpoints indicates that the integration contains only passive endpoints that are activated from external calls, including HTTP (useful to determine if the integration can scale to 0)
+	PassiveEndpoints bool
 }
diff --git a/pkg/trait/catalog.go b/pkg/trait/catalog.go
index d0180be..1766929 100644
--- a/pkg/trait/catalog.go
+++ b/pkg/trait/catalog.go
@@ -78,31 +78,31 @@ func (c *Catalog) traitsFor(environment *Environment) []Trait {
 		return []Trait{
 			c.tDebug,
 			c.tDependencies,
-			c.tService,
-			c.tRoute,
 			c.tBuilder,
 			c.tSpringBoot,
 			c.tDeployment,
+			c.tService,
+			c.tRoute,
 			c.tOwner,
 		}
 	case v1alpha1.TraitProfileKubernetes:
 		return []Trait{
 			c.tDebug,
 			c.tDependencies,
-			c.tService,
-			c.tIngress,
 			c.tBuilder,
 			c.tSpringBoot,
 			c.tDeployment,
+			c.tService,
+			c.tIngress,
 			c.tOwner,
 		}
 	case v1alpha1.TraitProfileKnative:
 		return []Trait{
 			c.tDebug,
 			c.tDependencies,
-			c.tKnative,
 			c.tBuilder,
 			c.tSpringBoot,
+			c.tKnative,
 			c.tDeployment,
 			c.tOwner,
 		}
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index 0e211f1..992b889 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -24,6 +24,8 @@ import (
 
 	"github.com/operator-framework/operator-sdk/pkg/sdk"
 	"github.com/pkg/errors"
+	"k8s.io/api/apps/v1"
+	"strings"
 
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 
@@ -35,15 +37,23 @@ import (
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
 
+const (
+	knativeKindDeployment = "deployment"
+	knativeKindService    = "service"
+)
+
 type knativeTrait struct {
-	BaseTrait `property:",squash"`
-	Sources   string `property:"sources"`
-	Sinks     string `property:"sinks"`
+	BaseTrait          `property:",squash"`
+	Kind               string `property:"kind"`
+	Sources            string `property:"sources"`
+	Sinks              string `property:"sinks"`
+	deploymentDelegate *deploymentTrait
 }
 
 func newKnativeTrait() *knativeTrait {
 	return &knativeTrait{
-		BaseTrait: newBaseTrait("knative"),
+		BaseTrait:          newBaseTrait("knative"),
+		deploymentDelegate: newDeploymentTrait(),
 	}
 }
 
@@ -60,18 +70,58 @@ func (t *knativeTrait) autoconfigure(e *Environment) error {
 		channels := t.getSinkChannels(e)
 		t.Sinks = strings.Join(channels, ",")
 	}
+	if t.Kind == "" {
+		meta := metadata.ExtractAll(e.Integration.Spec.Sources)
+		if meta.RequiresHTTPService && meta.PassiveEndpoints {
+			t.Kind = knativeKindService
+		} else {
+			t.Kind = knativeKindDeployment
+		}
+	}
 	return nil
 }
 
 func (t *knativeTrait) apply(e *Environment) error {
+	if err := t.prepareEnvVars(e); err != nil {
+		return err
+	}
 	for _, sub := range t.getSubscriptionsFor(e) {
 		e.Resources.Add(sub)
 	}
-	svc, err := t.getServiceFor(e)
+	switch t.Kind {
+	case knativeKindService:
+		svc, err := t.getServiceFor(e)
+		if err != nil {
+			return err
+		}
+		e.Resources.Add(svc)
+		return nil
+	case knativeKindDeployment:
+		return t.addDeployment(e)
+	}
+	return nil
+}
+
+func (t *knativeTrait) prepareEnvVars(e *Environment) error {
+	// common env var for Knative integration
+	conf, err := t.getConfigurationSerialized(e)
 	if err != nil {
 		return err
 	}
-	e.Resources.Add(svc)
+	e.EnvVars["CAMEL_KNATIVE_CONFIGURATION"] = conf
+	return nil
+}
+
+func (t *knativeTrait) addDeployment(e *Environment) error {
+	if err := t.deploymentDelegate.apply(e); err != nil {
+		return err
+	}
+	e.Resources.VisitDeployment(func(d *v1.Deployment) {
+		if d.Spec.Template.Annotations == nil {
+			d.Spec.Template.Annotations = make(map[string]string)
+		}
+		d.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true"
+	})
 	return nil
 }
 
@@ -112,12 +162,10 @@ func (t *knativeTrait) getServiceFor(e *Environment) (*serving.Service, error) {
 	// optimizations
 	environment["AB_JOLOKIA_OFF"] = True
 
-	// Knative integration
-	conf, err := t.getConfigurationSerialized(e)
-	if err != nil {
-		return nil, err
+	// add env vars from traits
+	for k, v := range e.EnvVars {
+		environment[k] = v
 	}
-	environment["CAMEL_KNATIVE_CONFIGURATION"] = conf
 
 	labels := map[string]string{
 		"camel.apache.org/integration": e.Integration.Name,
diff --git a/pkg/trait/service.go b/pkg/trait/service.go
index a7c927a..df0d66f 100644
--- a/pkg/trait/service.go
+++ b/pkg/trait/service.go
@@ -19,24 +19,13 @@ package trait
 
 import (
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
-	"github.com/apache/camel-k/version"
+	"github.com/apache/camel-k/pkg/metadata"
+	"k8s.io/api/apps/v1"
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/util/intstr"
 )
 
-var webComponents = map[string]bool{
-	"camel:servlet":     true,
-	"camel:undertow":    true,
-	"camel:jetty":       true,
-	"camel:jetty9":      true,
-	"camel:netty-http":  true,
-	"camel:netty4-http": true,
-	"mvn:org.apache.camel.k:camel-knative:" + version.Version: true,
-	// TODO find a better way to discover need for exposure
-	// maybe using the resolved classpath of the context instead of the requested dependencies
-}
-
 type serviceTrait struct {
 	BaseTrait `property:",squash"`
 
@@ -56,8 +45,18 @@ func (s *serviceTrait) appliesTo(e *Environment) bool {
 
 func (s *serviceTrait) autoconfigure(e *Environment) error {
 	if s.Enabled == nil {
-		required := s.requiresService(e)
-		s.Enabled = &required
+		hasDeployment := false
+		e.Resources.VisitDeployment(func(s *v1.Deployment) {
+			hasDeployment = true
+		})
+		if hasDeployment {
+			meta := metadata.ExtractAll(e.Integration.Spec.Sources)
+			required := meta.RequiresHTTPService
+			s.Enabled = &required
+		} else {
+			enabled := false
+			s.Enabled = &enabled
+		}
 	}
 	return nil
 }
@@ -98,28 +97,3 @@ func (s *serviceTrait) getServiceFor(e *Environment) *corev1.Service {
 
 	return &svc
 }
-
-func (*serviceTrait) requiresService(environment *Environment) bool {
-	cweb := false
-	iweb := false
-
-	if environment.Context != nil {
-		for _, dep := range environment.Context.Spec.Dependencies {
-			if decision, present := webComponents[dep]; present {
-				cweb = decision
-				break
-			}
-		}
-	}
-
-	if environment.Integration != nil {
-		for _, dep := range environment.Integration.Spec.Dependencies {
-			if decision, present := webComponents[dep]; present {
-				iweb = decision
-				break
-			}
-		}
-	}
-
-	return cweb || iweb
-}
diff --git a/pkg/trait/trait_test.go b/pkg/trait/trait_test.go
index 3e78fad..a770e76 100644
--- a/pkg/trait/trait_test.go
+++ b/pkg/trait/trait_test.go
@@ -51,7 +51,7 @@ func TestOpenShiftTraits(t *testing.T) {
 }
 
 func TestOpenShiftTraitsWithWeb(t *testing.T) {
-	env := createTestEnv(v1alpha1.IntegrationPlatformClusterOpenShift, "camel:core", "camel:undertow")
+	env := createTestEnv(v1alpha1.IntegrationPlatformClusterOpenShift, "from('undertow:http').to('log:info')")
 	res := processTestEnv(t, env)
 	assert.Contains(t, env.ExecutedTraits, ID("deployment"))
 	assert.Contains(t, env.ExecutedTraits, ID("service"))
@@ -72,7 +72,7 @@ func TestOpenShiftTraitsWithWeb(t *testing.T) {
 }
 
 func TestOpenShiftTraitsWithWebAndConfig(t *testing.T) {
-	env := createTestEnv(v1alpha1.IntegrationPlatformClusterOpenShift, "camel:core", "camel:undertow")
+	env := createTestEnv(v1alpha1.IntegrationPlatformClusterOpenShift, "from('undertow:http').to('log:info')")
 	env.Integration.Spec.Traits = make(map[string]v1alpha1.IntegrationTraitSpec)
 	env.Integration.Spec.Traits["service"] = v1alpha1.IntegrationTraitSpec{
 		Configuration: map[string]string{
@@ -88,7 +88,7 @@ func TestOpenShiftTraitsWithWebAndConfig(t *testing.T) {
 }
 
 func TestOpenShiftTraitsWithWebAndDisabledTrait(t *testing.T) {
-	env := createTestEnv(v1alpha1.IntegrationPlatformClusterOpenShift, "camel:core", "camel:undertow")
+	env := createTestEnv(v1alpha1.IntegrationPlatformClusterOpenShift, "from('undertow:http').to('log:info')")
 	env.Integration.Spec.Traits = make(map[string]v1alpha1.IntegrationTraitSpec)
 	env.Integration.Spec.Traits["service"] = v1alpha1.IntegrationTraitSpec{
 		Configuration: map[string]string{
@@ -105,7 +105,7 @@ func TestOpenShiftTraitsWithWebAndDisabledTrait(t *testing.T) {
 }
 
 func TestKubernetesTraits(t *testing.T) {
-	env := createTestEnv(v1alpha1.IntegrationPlatformClusterKubernetes, "camel:core")
+	env := createTestEnv(v1alpha1.IntegrationPlatformClusterKubernetes, "from('timer:tick').to('log:info')")
 	res := processTestEnv(t, env)
 	assert.Contains(t, env.ExecutedTraits, ID("deployment"))
 	assert.NotContains(t, env.ExecutedTraits, ID("service"))
@@ -120,7 +120,7 @@ func TestKubernetesTraits(t *testing.T) {
 }
 
 func TestKubernetesTraitsWithWeb(t *testing.T) {
-	env := createTestEnv(v1alpha1.IntegrationPlatformClusterKubernetes, "camel:core", "camel:servlet")
+	env := createTestEnv(v1alpha1.IntegrationPlatformClusterKubernetes, "from('servlet:http').to('log:info')")
 	res := processTestEnv(t, env)
 	assert.Contains(t, env.ExecutedTraits, ID("deployment"))
 	assert.Contains(t, env.ExecutedTraits, ID("service"))
@@ -138,7 +138,7 @@ func TestKubernetesTraitsWithWeb(t *testing.T) {
 }
 
 func TestTraitDecode(t *testing.T) {
-	env := createTestEnv(v1alpha1.IntegrationPlatformClusterOpenShift)
+	env := createTestEnv(v1alpha1.IntegrationPlatformClusterOpenShift, "")
 	env.Integration.Spec.Traits = make(map[string]v1alpha1.IntegrationTraitSpec)
 	svcTrait := v1alpha1.IntegrationTraitSpec{
 		Configuration: map[string]string{
@@ -164,7 +164,7 @@ func processTestEnv(t *testing.T, env *Environment) *kubernetes.Collection {
 	return env.Resources
 }
 
-func createTestEnv(cluster v1alpha1.IntegrationPlatformCluster, dependencies ...string) *Environment {
+func createTestEnv(cluster v1alpha1.IntegrationPlatformCluster, script string) *Environment {
 	return &Environment{
 		Integration: &v1alpha1.Integration{
 			ObjectMeta: metav1.ObjectMeta{
@@ -172,7 +172,13 @@ func createTestEnv(cluster v1alpha1.IntegrationPlatformCluster, dependencies ...
 				Namespace: "ns",
 			},
 			Spec: v1alpha1.IntegrationSpec{
-				Dependencies: dependencies,
+				Sources: []v1alpha1.SourceSpec{
+					{
+						Language: v1alpha1.LanguageGroovy,
+						Name: "file.groovy",
+						Content: script,
+					},
+				},
 			},
 			Status: v1alpha1.IntegrationStatus{
 				Phase: v1alpha1.IntegrationPhaseDeploying,
diff --git a/pkg/util/kubernetes/collection.go b/pkg/util/kubernetes/collection.go
index fbc9098..4a770e5 100644
--- a/pkg/util/kubernetes/collection.go
+++ b/pkg/util/kubernetes/collection.go
@@ -18,6 +18,7 @@ limitations under the License.
 package kubernetes
 
 import (
+	serving "github.com/knative/serving/pkg/apis/serving/v1alpha1"
 	routev1 "github.com/openshift/api/route/v1"
 	appsv1 "k8s.io/api/apps/v1"
 	corev1 "k8s.io/api/core/v1"
@@ -146,6 +147,38 @@ func (c *Collection) GetRoute(filter func(*routev1.Route) bool) *routev1.Route {
 	return retValue
 }
 
+// VisitKnativeService executes the visitor function on all Knative serving Service resources
+func (c *Collection) VisitKnativeService(visitor func(*serving.Service)) {
+	c.Visit(func(res runtime.Object) {
+		if conv, ok := res.(*serving.Service); ok {
+			visitor(conv)
+		}
+	})
+}
+
+// VisitContainer executes the visitor function on all Containers inside deployments or other resources
+func (c *Collection) VisitContainer(visitor func(container *corev1.Container)) {
+	c.VisitDeployment(func(d *appsv1.Deployment) {
+		for _, c := range d.Spec.Template.Spec.Containers {
+			visitor(&c)
+		}
+	})
+	c.VisitKnativeService(func(s *serving.Service) {
+		if s.Spec.RunLatest != nil {
+			c := s.Spec.RunLatest.Configuration.RevisionTemplate.Spec.Container
+			visitor(&c)
+		}
+		if s.Spec.Pinned != nil {
+			c := s.Spec.Pinned.Configuration.RevisionTemplate.Spec.Container
+			visitor(&c)
+		}
+		if s.Spec.Release != nil {
+			c := s.Spec.Release.Configuration.RevisionTemplate.Spec.Container
+			visitor(&c)
+		}
+	})
+}
+
 // VisitMetaObject executes the visitor function on all meta.Object resources
 func (c *Collection) VisitMetaObject(visitor func(metav1.Object)) {
 	c.Visit(func(res runtime.Object) {