You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2020/09/17 09:10:05 UTC

[camel-k] 10/21: kamelet: compelete e2e example

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 0fbbdd93342bcfb6e654723766d3144486961d5a
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Jul 10 12:24:42 2020 +0200

    kamelet: compelete e2e example
---
 e2e/knative/knative_platform_test.go               |  4 +-
 examples/kamelets/fake-usage.groovy                | 21 +++++++++
 examples/kamelets/usage.groovy                     | 20 ++++++++
 go.sum                                             |  2 +
 pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go   |  2 +-
 pkg/cmd/run.go                                     |  2 +-
 pkg/controller/kamelet/kamelet_controller.go       |  1 +
 .../flow_test.go => controller/kamelet/monitor.go} | 53 ++++++++++------------
 pkg/metadata/metadata.go                           | 28 ++----------
 pkg/trait/dependencies.go                          |  7 ++-
 pkg/trait/init.go                                  |  2 +-
 pkg/trait/kamelets.go                              | 33 ++++++++++----
 pkg/trait/kamelets_test.go                         | 22 ++++-----
 pkg/trait/knative.go                               | 43 +++++++++++++-----
 pkg/util/digest/digest.go                          |  2 +-
 pkg/util/flow/flow.go                              | 14 +++---
 pkg/util/flow/flow_test.go                         |  4 +-
 pkg/util/kubernetes/resolver.go                    | 19 +++++++-
 18 files changed, 175 insertions(+), 104 deletions(-)

diff --git a/e2e/knative/knative_platform_test.go b/e2e/knative/knative_platform_test.go
index 857a72f..c32711d 100644
--- a/e2e/knative/knative_platform_test.go
+++ b/e2e/knative/knative_platform_test.go
@@ -58,10 +58,10 @@ func TestKnativePlatformTest(t *testing.T) {
 			// Change something in the integration to produce a redeploy
 			Expect(UpdateIntegration(ns, "yaml", func(it *v1.Integration) {
 				it.Spec.Profile = ""
-				content, err := flow.Marshal(it.Spec.Flows)
+				content, err := flow.ToYamlDSL(it.Spec.Flows)
 				assert.NoError(t, err)
 				newData := strings.ReplaceAll(string(content), "string!", "string!!!")
-				newFlows, err := flow.UnmarshalString(newData)
+				newFlows, err := flow.FromYamlDSLString(newData)
 				assert.NoError(t, err)
 				it.Spec.Flows = newFlows
 			})).To(BeNil())
diff --git a/examples/kamelets/fake-usage.groovy b/examples/kamelets/fake-usage.groovy
new file mode 100755
index 0000000..bff523b
--- /dev/null
+++ b/examples/kamelets/fake-usage.groovy
@@ -0,0 +1,21 @@
+// camel-k: language=groovy trait=kamelets.list=timer
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The integration should contain the kamelet as source
+
+// Until the kamelet component is added in runtime
\ No newline at end of file
diff --git a/examples/kamelets/usage.groovy b/examples/kamelets/usage.groovy
new file mode 100755
index 0000000..15090c9
--- /dev/null
+++ b/examples/kamelets/usage.groovy
@@ -0,0 +1,20 @@
+// camel-k: language=groovy
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+from('kamelet:timer')
+    .to('log:info')
diff --git a/go.sum b/go.sum
index 40b5e55..34d32df 100644
--- a/go.sum
+++ b/go.sum
@@ -984,6 +984,8 @@ github.com/opencontainers/runc v1.0.0-rc2.0.20190611121236-6cc515888830/go.mod h
 github.com/opencontainers/runtime-spec v0.1.2-0.20190507144316-5b71a03e2700/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
 github.com/opencontainers/runtime-tools v0.0.0-20181011054405-1d69bd0f9c39/go.mod h1:r3f7wjNzSs2extwzU3Y+6pKfobzPh+kKFJ3ofN+3nfs=
 github.com/openshift/api v0.0.0-20200205133042-34f0ec8dab87/go.mod h1:fT6U/JfG8uZzemTRwZA2kBDJP5nWz7v05UHnty/D+pk=
+github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f h1:ATPK7UhEwglONJc8qGsq41TbPk0XA4Kpm7XZZ3mlhAY=
+github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f/go.mod h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY=
 github.com/openshift/api v3.9.1-0.20190927182313-d4a64ec2cbd8+incompatible h1:YwFnUQ5RQ17CmkxHyjpQnWAQOGkLKXY0shOUEyqaCGk=
 github.com/openshift/api v3.9.1-0.20190927182313-d4a64ec2cbd8+incompatible/go.mod h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY=
 github.com/openshift/client-go v0.0.0-20190923180330-3b6373338c9b/go.mod h1:6rzn+JTr7+WYS2E1TExP4gByoABxMznR6y2SnUIkmxk=
diff --git a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
index 1292dc3..e6c991c 100644
--- a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
@@ -477,7 +477,7 @@ func (in *KameletSpec) DeepCopyInto(out *KameletSpec) {
 	if in.Flow != nil {
 		in, out := &in.Flow, &out.Flow
 		*out = new(v1.Flow)
-		**out = **in
+		(*in).DeepCopyInto(*out)
 	}
 	out.Authorization = in.Authorization
 	if in.Types != nil {
diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go
index 5c7e4c3..744f635 100644
--- a/pkg/cmd/run.go
+++ b/pkg/cmd/run.go
@@ -498,7 +498,7 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string,
 		}
 
 		if o.UseFlows && (strings.HasSuffix(source, ".yaml") || strings.HasSuffix(source, ".yml")) {
-			flows, err := flow.UnmarshalString(data)
+			flows, err := flow.FromYamlDSLString(data)
 			if err != nil {
 				return nil, err
 			}
diff --git a/pkg/controller/kamelet/kamelet_controller.go b/pkg/controller/kamelet/kamelet_controller.go
index 048cf22..baad186 100644
--- a/pkg/controller/kamelet/kamelet_controller.go
+++ b/pkg/controller/kamelet/kamelet_controller.go
@@ -129,6 +129,7 @@ func (r *ReconcileKamelet) Reconcile(request reconcile.Request) (reconcile.Resul
 
 	actions := []Action{
 		NewInitializeAction(),
+		NewMonitorAction(),
 	}
 
 	var targetPhase v1alpha1.KameletPhase
diff --git a/pkg/util/flow/flow_test.go b/pkg/controller/kamelet/monitor.go
similarity index 52%
copy from pkg/util/flow/flow_test.go
copy to pkg/controller/kamelet/monitor.go
index c9248cb..49935d2 100644
--- a/pkg/util/flow/flow_test.go
+++ b/pkg/controller/kamelet/monitor.go
@@ -15,39 +15,32 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package flow
+package kamelet
 
 import (
-	"bytes"
-	"encoding/json"
-	"testing"
+	"context"
 
-	"github.com/stretchr/testify/assert"
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 )
 
-func TestReadWriteYaml(t *testing.T) {
-	// yaml in conventional form as marshalled by the go runtime
-	yaml := `- from:
-    steps:
-    - to: log:info
-    uri: timer:tick
-`
-
-	yamlReader := bytes.NewReader([]byte(yaml))
-	flows, err := Unmarshal(yamlReader)
-	assert.NoError(t, err)
-	assert.NotNil(t, flows)
-	assert.Len(t, flows, 1)
-
-	flow := map[string]interface{}{}
-	err = json.Unmarshal(flows[0].RawMessage, &flow)
-	assert.NoError(t, err)
-
-	assert.NotNil(t, flow["from"])
-	assert.Nil(t, flow["xx"])
-
-	data, err := Marshal(flows)
-	assert.NoError(t, err)
-	assert.NotNil(t, data)
-	assert.Equal(t, yaml, string(data))
+// NewMonitorAction returns an action that monitors the kamelet after it's fully initialized
+func NewMonitorAction() Action {
+	return &monitorAction{}
+}
+
+type monitorAction struct {
+	baseAction
+}
+
+func (action *monitorAction) Name() string {
+	return "monitor"
+}
+
+func (action *monitorAction) CanHandle(kamelet *v1alpha1.Kamelet) bool {
+	return kamelet.Status.Phase == v1alpha1.KameletPhaseReady
+}
+
+func (action *monitorAction) Handle(ctx context.Context, kamelet *v1alpha1.Kamelet) (*v1alpha1.Kamelet, error) {
+	// Doing nothing for now
+	return kamelet, nil
 }
diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go
index 27a2c8d..03c8ca1 100644
--- a/pkg/metadata/metadata.go
+++ b/pkg/metadata/metadata.go
@@ -22,9 +22,6 @@ import (
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/util/camel"
-	"github.com/apache/camel-k/pkg/util/gzip"
-	"github.com/apache/camel-k/pkg/util/log"
-
 	src "github.com/apache/camel-k/pkg/util/source"
 )
 
@@ -64,10 +61,11 @@ func merge(m1 src.Metadata, m2 src.Metadata) src.Metadata {
 
 // Extract returns metadata information from the source code
 func Extract(catalog *camel.RuntimeCatalog, source v1.SourceSpec) IntegrationMetadata {
-	var err error
-	source, err = uncompress(source)
-	if err != nil {
-		log.Errorf(err, "unable to uncompress source %s: %v", source.Name, err)
+	if source.ContentRef != "" {
+		panic("source must be dereferenced before calling this method")
+	}
+	if source.Compression {
+		panic("source must be uncompressed before calling this method")
 	}
 
 	language := source.InferLanguage()
@@ -94,19 +92,3 @@ func Each(catalog *camel.RuntimeCatalog, sources []v1.SourceSpec, consumer func(
 		}
 	}
 }
-
-func uncompress(spec v1.SourceSpec) (v1.SourceSpec, error) {
-	if spec.Compression {
-		data := []byte(spec.Content)
-		var uncompressed []byte
-		var err error
-		if uncompressed, err = gzip.UncompressBase64(data); err != nil {
-			return spec, err
-		}
-		newSpec := spec
-		newSpec.Compression = false
-		newSpec.Content = string(uncompressed)
-		return newSpec, nil
-	}
-	return spec, nil
-}
diff --git a/pkg/trait/dependencies.go b/pkg/trait/dependencies.go
index 8a13087..4a704b6 100644
--- a/pkg/trait/dependencies.go
+++ b/pkg/trait/dependencies.go
@@ -19,6 +19,7 @@ package trait
 
 import (
 	"fmt"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
 
 	"github.com/apache/camel-k/pkg/metadata"
 
@@ -66,7 +67,11 @@ func (t *dependenciesTrait) Apply(e *Environment) error {
 		dependencies.Add(fmt.Sprintf("mvn:%s/%s", d.GroupID, d.ArtifactID))
 	}
 
-	for _, s := range e.Integration.Sources() {
+	sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+	if err != nil {
+		return err
+	}
+	for _, s := range sources {
 		meta := metadata.Extract(e.CamelCatalog, s)
 		lang := s.InferLanguage()
 
diff --git a/pkg/trait/init.go b/pkg/trait/init.go
index 32236e9..9685832 100644
--- a/pkg/trait/init.go
+++ b/pkg/trait/init.go
@@ -54,7 +54,7 @@ func (t *initTrait) Apply(e *Environment) error {
 
 		// Flows need to be turned into a generated source
 		if len(e.Integration.Spec.Flows) > 0 {
-			content, err := flow.Marshal(e.Integration.Spec.Flows)
+			content, err := flow.ToYamlDSL(e.Integration.Spec.Flows)
 			if err != nil {
 				return err
 			}
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index 80d4bc2..994ddcb 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -20,6 +20,8 @@ package trait
 import (
 	"encoding/json"
 	"fmt"
+	"github.com/apache/camel-k/pkg/util/flow"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"regexp"
 	"sort"
 	"strconv"
@@ -83,10 +85,18 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) {
 		return false, nil
 	}
 
+	if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+		return false, nil
+	}
+
 	if t.Auto == nil || *t.Auto {
 		if t.List == "" {
 			var kamelets []string
-			metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+			sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+			if err != nil {
+				return false, err
+			}
+			metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
 				util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.FromURIs))
 				util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ToURIs))
 				return true
@@ -121,6 +131,10 @@ func (t *kameletsTrait) addKamelets(e *Environment) error {
 			return err
 		}
 
+		if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
+			return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
+		}
+
 		if err := t.addKameletAsSource(e, kamelet); err != nil {
 			return err
 		}
@@ -137,21 +151,20 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet v1alpha1.Kame
 	var sources []v1.SourceSpec
 
 	if kamelet.Spec.Flow != nil {
-		// TODO fixme removed for changes to Flow
-		//flowData, err := flows.Marshal([]v1.Flow{*kamelet.Spec.Flow})
-		//if err != nil {
-		//	return err
-		//}
+
+		flowData, err := flow.ToYamlDSL([]v1.Flow{*kamelet.Spec.Flow})
+		if err != nil {
+			return err
+		}
 		flowSource := v1.SourceSpec{
 			DataSpec: v1.DataSpec{
-				Name: fmt.Sprintf("%s.yaml", kamelet.Name),
-				//Content: string(flowData),
-				Content: string(*kamelet.Spec.Flow),
+				Name:    fmt.Sprintf("%s.yaml", kamelet.Name),
+				Content: string(flowData),
 			},
 			Language: v1.LanguageYaml,
 			Type:     v1.SourceTypeKamelet,
 		}
-		flowSource, err := integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name))
+		flowSource, err = integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name))
 		if err != nil {
 			return err
 		}
diff --git a/pkg/trait/kamelets_test.go b/pkg/trait/kamelets_test.go
index 59657be..d2a1321 100644
--- a/pkg/trait/kamelets_test.go
+++ b/pkg/trait/kamelets_test.go
@@ -19,7 +19,7 @@ package trait
 
 import (
 	"context"
-	"gopkg.in/yaml.v2"
+	"encoding/json"
 	"testing"
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
@@ -88,7 +88,7 @@ func TestKameletLookup(t *testing.T) {
 			Name:      "timer",
 		},
 		Spec: v1alpha1.KameletSpec{
-			Flow: deleteMeAtSomePoint(map[string]interface{}{
+			Flow: marshalOrFail(map[string]interface{}{
 				"from": map[string]interface{}{
 					"uri": "timer:tick",
 				},
@@ -131,7 +131,7 @@ func TestKameletSecondarySourcesLookup(t *testing.T) {
 			Name:      "timer",
 		},
 		Spec: v1alpha1.KameletSpec{
-			Flow: deleteMeAtSomePoint(map[string]interface{}{
+			Flow: marshalOrFail(map[string]interface{}{
 				"from": map[string]interface{}{
 					"uri": "timer:tick",
 				},
@@ -236,7 +236,7 @@ func TestErrorMultipleKameletSources(t *testing.T) {
 					Type: v1.SourceTypeKamelet,
 				},
 			},
-			Flow: deleteMeAtSomePoint(map[string]interface{}{
+			Flow: marshalOrFail(map[string]interface{}{
 				"from": map[string]interface{}{
 					"uri": "timer:tick",
 				},
@@ -265,7 +265,7 @@ func TestMultipleKamelets(t *testing.T) {
 			Name:      "timer",
 		},
 		Spec: v1alpha1.KameletSpec{
-			Flow: deleteMeAtSomePoint(map[string]interface{}{
+			Flow: marshalOrFail(map[string]interface{}{
 				"from": map[string]interface{}{
 					"uri": "timer:tick",
 				},
@@ -290,7 +290,7 @@ func TestMultipleKamelets(t *testing.T) {
 			Name:      "logger",
 		},
 		Spec: v1alpha1.KameletSpec{
-			Flow: deleteMeAtSomePoint(map[string]interface{}{
+			Flow: marshalOrFail(map[string]interface{}{
 				"from": map[string]interface{}{
 					"uri": "tbd:endpoint",
 					"steps": []interface{}{
@@ -358,7 +358,7 @@ func TestKameletConfigLookup(t *testing.T) {
 			Name:      "timer",
 		},
 		Spec: v1alpha1.KameletSpec{
-			Flow: deleteMeAtSomePoint(map[string]interface{}{
+			Flow: marshalOrFail(map[string]interface{}{
 				"from": map[string]interface{}{
 					"uri": "timer:tick",
 				},
@@ -420,7 +420,7 @@ func TestKameletNamedConfigLookup(t *testing.T) {
 			Name:      "timer",
 		},
 		Spec: v1alpha1.KameletSpec{
-			Flow: deleteMeAtSomePoint(map[string]interface{}{
+			Flow: marshalOrFail(map[string]interface{}{
 				"from": map[string]interface{}{
 					"uri": "timer:tick",
 				},
@@ -511,11 +511,11 @@ func createKameletsTestEnvironment(flow string, objects ...runtime.Object) (*kam
 	return trait, environment
 }
 
-func deleteMeAtSomePoint(flow map[string]interface{}) *v1.Flow {
-	data, err := yaml.Marshal(flow)
+func marshalOrFail(flow map[string]interface{}) *v1.Flow {
+	data, err := json.Marshal(flow)
 	if err != nil {
 		panic(err)
 	}
-	f := v1.Flow(data)
+	f := v1.Flow{data}
 	return &f
 }
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index 92103f4..5ff9c46 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -23,6 +23,7 @@ import (
 	"reflect"
 	"strings"
 
+	"github.com/apache/camel-k/pkg/util/kubernetes"
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1/knative"
 	"github.com/apache/camel-k/pkg/metadata"
@@ -110,8 +111,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
 	if t.Auto == nil || *t.Auto {
 		if len(t.ChannelSources) == 0 {
 			items := make([]string, 0)
-
-			metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+			sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+			if err != nil {
+				return false, err
+			}
+			metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
 				items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeChannel)...)
 				return true
 			})
@@ -120,8 +124,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
 		}
 		if len(t.ChannelSinks) == 0 {
 			items := make([]string, 0)
-
-			metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+			sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+			if err != nil {
+				return false, err
+			}
+			metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
 				items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeChannel)...)
 				return true
 			})
@@ -130,8 +137,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
 		}
 		if len(t.EndpointSources) == 0 {
 			items := make([]string, 0)
-
-			metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+			sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+			if err != nil {
+				return false, err
+			}
+			metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
 				items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEndpoint)...)
 				return true
 			})
@@ -140,8 +150,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
 		}
 		if len(t.EndpointSinks) == 0 {
 			items := make([]string, 0)
-
-			metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+			sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+			if err != nil {
+				return false, err
+			}
+			metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
 				items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEndpoint)...)
 				return true
 			})
@@ -150,8 +163,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
 		}
 		if len(t.EventSources) == 0 {
 			items := make([]string, 0)
-
-			metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+			sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+			if err != nil {
+				return false, err
+			}
+			metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
 				items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEvent)...)
 				return true
 			})
@@ -160,8 +176,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
 		}
 		if len(t.EventSinks) == 0 {
 			items := make([]string, 0)
-
-			metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+			sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+			if err != nil {
+				return false, err
+			}
+			metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
 				items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEvent)...)
 				return true
 			})
diff --git a/pkg/util/digest/digest.go b/pkg/util/digest/digest.go
index 6ccd3ed..4b2dfba 100644
--- a/pkg/util/digest/digest.go
+++ b/pkg/util/digest/digest.go
@@ -71,7 +71,7 @@ func ComputeForIntegration(integration *v1.Integration) (string, error) {
 
 	// Integration flows
 	if len(integration.Spec.Flows) > 0 {
-		flows, err := flow.Marshal(integration.Spec.Flows)
+		flows, err := flow.ToYamlDSL(integration.Spec.Flows)
 		if err != nil {
 			return "", err
 		}
diff --git a/pkg/util/flow/flow.go b/pkg/util/flow/flow.go
index e495d55..fa963cf 100644
--- a/pkg/util/flow/flow.go
+++ b/pkg/util/flow/flow.go
@@ -31,13 +31,13 @@ import (
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 )
 
-// UnmarshalString reads flows contained in a string
-func UnmarshalString(flowsString string) ([]v1.Flow, error) {
-	return Unmarshal(bytes.NewReader([]byte(flowsString)))
+// FromYamlDSLString creates a slice of flows from a Camel YAML DSL string
+func FromYamlDSLString(flowsString string) ([]v1.Flow, error) {
+	return FromYamlDSL(bytes.NewReader([]byte(flowsString)))
 }
 
-// Unmarshal flows from a stream
-func Unmarshal(reader io.Reader) ([]v1.Flow, error) {
+// FromYamlDSL creates a slice of flows from a Camel YAML DSL stream
+func FromYamlDSL(reader io.Reader) ([]v1.Flow, error) {
 	buffered, err := ioutil.ReadAll(reader)
 	if err != nil {
 		return nil, err
@@ -56,8 +56,8 @@ func Unmarshal(reader io.Reader) ([]v1.Flow, error) {
 	return flows, err
 }
 
-// Marshal flows as byte array
-func Marshal(flows []v1.Flow) ([]byte, error) {
+// ToYamlDSL converts a flow into its Camel YAML DSL equivalent
+func ToYamlDSL(flows []v1.Flow) ([]byte, error) {
 	data, err := json.Marshal(&flows)
 	if err != nil {
 		return nil, err
diff --git a/pkg/util/flow/flow_test.go b/pkg/util/flow/flow_test.go
index c9248cb..731e2ff 100644
--- a/pkg/util/flow/flow_test.go
+++ b/pkg/util/flow/flow_test.go
@@ -34,7 +34,7 @@ func TestReadWriteYaml(t *testing.T) {
 `
 
 	yamlReader := bytes.NewReader([]byte(yaml))
-	flows, err := Unmarshal(yamlReader)
+	flows, err := FromYamlDSL(yamlReader)
 	assert.NoError(t, err)
 	assert.NotNil(t, flows)
 	assert.Len(t, flows, 1)
@@ -46,7 +46,7 @@ func TestReadWriteYaml(t *testing.T) {
 	assert.NotNil(t, flow["from"])
 	assert.Nil(t, flow["xx"])
 
-	data, err := Marshal(flows)
+	data, err := ToYamlDSL(flows)
 	assert.NoError(t, err)
 	assert.NotNil(t, data)
 	assert.Equal(t, yaml, string(data))
diff --git a/pkg/util/kubernetes/resolver.go b/pkg/util/kubernetes/resolver.go
index fbb3eff..ae7621a 100644
--- a/pkg/util/kubernetes/resolver.go
+++ b/pkg/util/kubernetes/resolver.go
@@ -22,7 +22,7 @@ import (
 	"fmt"
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
-
+	"github.com/apache/camel-k/pkg/util/gzip"
 	corev1 "k8s.io/api/core/v1"
 	controller "sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -71,10 +71,25 @@ func Resolve(data *v1.DataSpec, mapLookup func(string) (*corev1.ConfigMap, error
 		//
 		// Replace ref source content with real content
 		//
-		data.Content = cm.Data["content"]
+		key := data.ContentKey
+		if key == "" {
+			key = "content"
+		}
+		data.Content = cm.Data[key]
 		data.ContentRef = ""
 	}
 
+	if data.Compression {
+		cnt := []byte(data.Content)
+		var uncompressed []byte
+		var err error
+		if uncompressed, err = gzip.UncompressBase64(cnt); err != nil {
+			return err
+		}
+		data.Compression = false
+		data.Content = string(uncompressed)
+	}
+
 	return nil
 }