You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2020/09/17 09:10:05 UTC
[camel-k] 10/21: kamelet: compelete e2e example
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 0fbbdd93342bcfb6e654723766d3144486961d5a
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Jul 10 12:24:42 2020 +0200
kamelet: compelete e2e example
---
e2e/knative/knative_platform_test.go | 4 +-
examples/kamelets/fake-usage.groovy | 21 +++++++++
examples/kamelets/usage.groovy | 20 ++++++++
go.sum | 2 +
pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go | 2 +-
pkg/cmd/run.go | 2 +-
pkg/controller/kamelet/kamelet_controller.go | 1 +
.../flow_test.go => controller/kamelet/monitor.go} | 53 ++++++++++------------
pkg/metadata/metadata.go | 28 ++----------
pkg/trait/dependencies.go | 7 ++-
pkg/trait/init.go | 2 +-
pkg/trait/kamelets.go | 33 ++++++++++----
pkg/trait/kamelets_test.go | 22 ++++-----
pkg/trait/knative.go | 43 +++++++++++++-----
pkg/util/digest/digest.go | 2 +-
pkg/util/flow/flow.go | 14 +++---
pkg/util/flow/flow_test.go | 4 +-
pkg/util/kubernetes/resolver.go | 19 +++++++-
18 files changed, 175 insertions(+), 104 deletions(-)
diff --git a/e2e/knative/knative_platform_test.go b/e2e/knative/knative_platform_test.go
index 857a72f..c32711d 100644
--- a/e2e/knative/knative_platform_test.go
+++ b/e2e/knative/knative_platform_test.go
@@ -58,10 +58,10 @@ func TestKnativePlatformTest(t *testing.T) {
// Change something in the integration to produce a redeploy
Expect(UpdateIntegration(ns, "yaml", func(it *v1.Integration) {
it.Spec.Profile = ""
- content, err := flow.Marshal(it.Spec.Flows)
+ content, err := flow.ToYamlDSL(it.Spec.Flows)
assert.NoError(t, err)
newData := strings.ReplaceAll(string(content), "string!", "string!!!")
- newFlows, err := flow.UnmarshalString(newData)
+ newFlows, err := flow.FromYamlDSLString(newData)
assert.NoError(t, err)
it.Spec.Flows = newFlows
})).To(BeNil())
diff --git a/examples/kamelets/fake-usage.groovy b/examples/kamelets/fake-usage.groovy
new file mode 100755
index 0000000..bff523b
--- /dev/null
+++ b/examples/kamelets/fake-usage.groovy
@@ -0,0 +1,21 @@
+// camel-k: language=groovy trait=kamelets.list=timer
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The integration should contain the kamelet as source
+
+// Until the kamelet component is added in runtime
\ No newline at end of file
diff --git a/examples/kamelets/usage.groovy b/examples/kamelets/usage.groovy
new file mode 100755
index 0000000..15090c9
--- /dev/null
+++ b/examples/kamelets/usage.groovy
@@ -0,0 +1,20 @@
+// camel-k: language=groovy
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+from('kamelet:timer')
+ .to('log:info')
diff --git a/go.sum b/go.sum
index 40b5e55..34d32df 100644
--- a/go.sum
+++ b/go.sum
@@ -984,6 +984,8 @@ github.com/opencontainers/runc v1.0.0-rc2.0.20190611121236-6cc515888830/go.mod h
github.com/opencontainers/runtime-spec v0.1.2-0.20190507144316-5b71a03e2700/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opencontainers/runtime-tools v0.0.0-20181011054405-1d69bd0f9c39/go.mod h1:r3f7wjNzSs2extwzU3Y+6pKfobzPh+kKFJ3ofN+3nfs=
github.com/openshift/api v0.0.0-20200205133042-34f0ec8dab87/go.mod h1:fT6U/JfG8uZzemTRwZA2kBDJP5nWz7v05UHnty/D+pk=
+github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f h1:ATPK7UhEwglONJc8qGsq41TbPk0XA4Kpm7XZZ3mlhAY=
+github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f/go.mod h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY=
github.com/openshift/api v3.9.1-0.20190927182313-d4a64ec2cbd8+incompatible h1:YwFnUQ5RQ17CmkxHyjpQnWAQOGkLKXY0shOUEyqaCGk=
github.com/openshift/api v3.9.1-0.20190927182313-d4a64ec2cbd8+incompatible/go.mod h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY=
github.com/openshift/client-go v0.0.0-20190923180330-3b6373338c9b/go.mod h1:6rzn+JTr7+WYS2E1TExP4gByoABxMznR6y2SnUIkmxk=
diff --git a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
index 1292dc3..e6c991c 100644
--- a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
@@ -477,7 +477,7 @@ func (in *KameletSpec) DeepCopyInto(out *KameletSpec) {
if in.Flow != nil {
in, out := &in.Flow, &out.Flow
*out = new(v1.Flow)
- **out = **in
+ (*in).DeepCopyInto(*out)
}
out.Authorization = in.Authorization
if in.Types != nil {
diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go
index 5c7e4c3..744f635 100644
--- a/pkg/cmd/run.go
+++ b/pkg/cmd/run.go
@@ -498,7 +498,7 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string,
}
if o.UseFlows && (strings.HasSuffix(source, ".yaml") || strings.HasSuffix(source, ".yml")) {
- flows, err := flow.UnmarshalString(data)
+ flows, err := flow.FromYamlDSLString(data)
if err != nil {
return nil, err
}
diff --git a/pkg/controller/kamelet/kamelet_controller.go b/pkg/controller/kamelet/kamelet_controller.go
index 048cf22..baad186 100644
--- a/pkg/controller/kamelet/kamelet_controller.go
+++ b/pkg/controller/kamelet/kamelet_controller.go
@@ -129,6 +129,7 @@ func (r *ReconcileKamelet) Reconcile(request reconcile.Request) (reconcile.Resul
actions := []Action{
NewInitializeAction(),
+ NewMonitorAction(),
}
var targetPhase v1alpha1.KameletPhase
diff --git a/pkg/util/flow/flow_test.go b/pkg/controller/kamelet/monitor.go
similarity index 52%
copy from pkg/util/flow/flow_test.go
copy to pkg/controller/kamelet/monitor.go
index c9248cb..49935d2 100644
--- a/pkg/util/flow/flow_test.go
+++ b/pkg/controller/kamelet/monitor.go
@@ -15,39 +15,32 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package flow
+package kamelet
import (
- "bytes"
- "encoding/json"
- "testing"
+ "context"
- "github.com/stretchr/testify/assert"
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
)
-func TestReadWriteYaml(t *testing.T) {
- // yaml in conventional form as marshalled by the go runtime
- yaml := `- from:
- steps:
- - to: log:info
- uri: timer:tick
-`
-
- yamlReader := bytes.NewReader([]byte(yaml))
- flows, err := Unmarshal(yamlReader)
- assert.NoError(t, err)
- assert.NotNil(t, flows)
- assert.Len(t, flows, 1)
-
- flow := map[string]interface{}{}
- err = json.Unmarshal(flows[0].RawMessage, &flow)
- assert.NoError(t, err)
-
- assert.NotNil(t, flow["from"])
- assert.Nil(t, flow["xx"])
-
- data, err := Marshal(flows)
- assert.NoError(t, err)
- assert.NotNil(t, data)
- assert.Equal(t, yaml, string(data))
+// NewMonitorAction returns an action that monitors the kamelet after it's fully initialized
+func NewMonitorAction() Action {
+ return &monitorAction{}
+}
+
+type monitorAction struct {
+ baseAction
+}
+
+func (action *monitorAction) Name() string {
+ return "monitor"
+}
+
+func (action *monitorAction) CanHandle(kamelet *v1alpha1.Kamelet) bool {
+ return kamelet.Status.Phase == v1alpha1.KameletPhaseReady
+}
+
+func (action *monitorAction) Handle(ctx context.Context, kamelet *v1alpha1.Kamelet) (*v1alpha1.Kamelet, error) {
+ // Doing nothing for now
+ return kamelet, nil
}
diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go
index 27a2c8d..03c8ca1 100644
--- a/pkg/metadata/metadata.go
+++ b/pkg/metadata/metadata.go
@@ -22,9 +22,6 @@ import (
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util/camel"
- "github.com/apache/camel-k/pkg/util/gzip"
- "github.com/apache/camel-k/pkg/util/log"
-
src "github.com/apache/camel-k/pkg/util/source"
)
@@ -64,10 +61,11 @@ func merge(m1 src.Metadata, m2 src.Metadata) src.Metadata {
// Extract returns metadata information from the source code
func Extract(catalog *camel.RuntimeCatalog, source v1.SourceSpec) IntegrationMetadata {
- var err error
- source, err = uncompress(source)
- if err != nil {
- log.Errorf(err, "unable to uncompress source %s: %v", source.Name, err)
+ if source.ContentRef != "" {
+ panic("source must be dereferenced before calling this method")
+ }
+ if source.Compression {
+ panic("source must be uncompressed before calling this method")
}
language := source.InferLanguage()
@@ -94,19 +92,3 @@ func Each(catalog *camel.RuntimeCatalog, sources []v1.SourceSpec, consumer func(
}
}
}
-
-func uncompress(spec v1.SourceSpec) (v1.SourceSpec, error) {
- if spec.Compression {
- data := []byte(spec.Content)
- var uncompressed []byte
- var err error
- if uncompressed, err = gzip.UncompressBase64(data); err != nil {
- return spec, err
- }
- newSpec := spec
- newSpec.Compression = false
- newSpec.Content = string(uncompressed)
- return newSpec, nil
- }
- return spec, nil
-}
diff --git a/pkg/trait/dependencies.go b/pkg/trait/dependencies.go
index 8a13087..4a704b6 100644
--- a/pkg/trait/dependencies.go
+++ b/pkg/trait/dependencies.go
@@ -19,6 +19,7 @@ package trait
import (
"fmt"
+ "github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/metadata"
@@ -66,7 +67,11 @@ func (t *dependenciesTrait) Apply(e *Environment) error {
dependencies.Add(fmt.Sprintf("mvn:%s/%s", d.GroupID, d.ArtifactID))
}
- for _, s := range e.Integration.Sources() {
+ sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+ if err != nil {
+ return err
+ }
+ for _, s := range sources {
meta := metadata.Extract(e.CamelCatalog, s)
lang := s.InferLanguage()
diff --git a/pkg/trait/init.go b/pkg/trait/init.go
index 32236e9..9685832 100644
--- a/pkg/trait/init.go
+++ b/pkg/trait/init.go
@@ -54,7 +54,7 @@ func (t *initTrait) Apply(e *Environment) error {
// Flows need to be turned into a generated source
if len(e.Integration.Spec.Flows) > 0 {
- content, err := flow.Marshal(e.Integration.Spec.Flows)
+ content, err := flow.ToYamlDSL(e.Integration.Spec.Flows)
if err != nil {
return err
}
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index 80d4bc2..994ddcb 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -20,6 +20,8 @@ package trait
import (
"encoding/json"
"fmt"
+ "github.com/apache/camel-k/pkg/util/flow"
+ "github.com/apache/camel-k/pkg/util/kubernetes"
"regexp"
"sort"
"strconv"
@@ -83,10 +85,18 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) {
return false, nil
}
+ if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+ return false, nil
+ }
+
if t.Auto == nil || *t.Auto {
if t.List == "" {
var kamelets []string
- metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+ sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+ if err != nil {
+ return false, err
+ }
+ metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.FromURIs))
util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ToURIs))
return true
@@ -121,6 +131,10 @@ func (t *kameletsTrait) addKamelets(e *Environment) error {
return err
}
+ if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
+ return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
+ }
+
if err := t.addKameletAsSource(e, kamelet); err != nil {
return err
}
@@ -137,21 +151,20 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet v1alpha1.Kame
var sources []v1.SourceSpec
if kamelet.Spec.Flow != nil {
- // TODO fixme removed for changes to Flow
- //flowData, err := flows.Marshal([]v1.Flow{*kamelet.Spec.Flow})
- //if err != nil {
- // return err
- //}
+
+ flowData, err := flow.ToYamlDSL([]v1.Flow{*kamelet.Spec.Flow})
+ if err != nil {
+ return err
+ }
flowSource := v1.SourceSpec{
DataSpec: v1.DataSpec{
- Name: fmt.Sprintf("%s.yaml", kamelet.Name),
- //Content: string(flowData),
- Content: string(*kamelet.Spec.Flow),
+ Name: fmt.Sprintf("%s.yaml", kamelet.Name),
+ Content: string(flowData),
},
Language: v1.LanguageYaml,
Type: v1.SourceTypeKamelet,
}
- flowSource, err := integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name))
+ flowSource, err = integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name))
if err != nil {
return err
}
diff --git a/pkg/trait/kamelets_test.go b/pkg/trait/kamelets_test.go
index 59657be..d2a1321 100644
--- a/pkg/trait/kamelets_test.go
+++ b/pkg/trait/kamelets_test.go
@@ -19,7 +19,7 @@ package trait
import (
"context"
- "gopkg.in/yaml.v2"
+ "encoding/json"
"testing"
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
@@ -88,7 +88,7 @@ func TestKameletLookup(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: deleteMeAtSomePoint(map[string]interface{}{
+ Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
@@ -131,7 +131,7 @@ func TestKameletSecondarySourcesLookup(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: deleteMeAtSomePoint(map[string]interface{}{
+ Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
@@ -236,7 +236,7 @@ func TestErrorMultipleKameletSources(t *testing.T) {
Type: v1.SourceTypeKamelet,
},
},
- Flow: deleteMeAtSomePoint(map[string]interface{}{
+ Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
@@ -265,7 +265,7 @@ func TestMultipleKamelets(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: deleteMeAtSomePoint(map[string]interface{}{
+ Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
@@ -290,7 +290,7 @@ func TestMultipleKamelets(t *testing.T) {
Name: "logger",
},
Spec: v1alpha1.KameletSpec{
- Flow: deleteMeAtSomePoint(map[string]interface{}{
+ Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "tbd:endpoint",
"steps": []interface{}{
@@ -358,7 +358,7 @@ func TestKameletConfigLookup(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: deleteMeAtSomePoint(map[string]interface{}{
+ Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
@@ -420,7 +420,7 @@ func TestKameletNamedConfigLookup(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: deleteMeAtSomePoint(map[string]interface{}{
+ Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
@@ -511,11 +511,11 @@ func createKameletsTestEnvironment(flow string, objects ...runtime.Object) (*kam
return trait, environment
}
-func deleteMeAtSomePoint(flow map[string]interface{}) *v1.Flow {
- data, err := yaml.Marshal(flow)
+func marshalOrFail(flow map[string]interface{}) *v1.Flow {
+ data, err := json.Marshal(flow)
if err != nil {
panic(err)
}
- f := v1.Flow(data)
+ f := v1.Flow{data}
return &f
}
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index 92103f4..5ff9c46 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -23,6 +23,7 @@ import (
"reflect"
"strings"
+ "github.com/apache/camel-k/pkg/util/kubernetes"
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1/knative"
"github.com/apache/camel-k/pkg/metadata"
@@ -110,8 +111,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
if t.Auto == nil || *t.Auto {
if len(t.ChannelSources) == 0 {
items := make([]string, 0)
-
- metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+ sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+ if err != nil {
+ return false, err
+ }
+ metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeChannel)...)
return true
})
@@ -120,8 +124,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
}
if len(t.ChannelSinks) == 0 {
items := make([]string, 0)
-
- metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+ sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+ if err != nil {
+ return false, err
+ }
+ metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeChannel)...)
return true
})
@@ -130,8 +137,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
}
if len(t.EndpointSources) == 0 {
items := make([]string, 0)
-
- metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+ sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+ if err != nil {
+ return false, err
+ }
+ metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEndpoint)...)
return true
})
@@ -140,8 +150,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
}
if len(t.EndpointSinks) == 0 {
items := make([]string, 0)
-
- metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+ sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+ if err != nil {
+ return false, err
+ }
+ metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEndpoint)...)
return true
})
@@ -150,8 +163,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
}
if len(t.EventSources) == 0 {
items := make([]string, 0)
-
- metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+ sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+ if err != nil {
+ return false, err
+ }
+ metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEvent)...)
return true
})
@@ -160,8 +176,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
}
if len(t.EventSinks) == 0 {
items := make([]string, 0)
-
- metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool {
+ sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+ if err != nil {
+ return false, err
+ }
+ metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEvent)...)
return true
})
diff --git a/pkg/util/digest/digest.go b/pkg/util/digest/digest.go
index 6ccd3ed..4b2dfba 100644
--- a/pkg/util/digest/digest.go
+++ b/pkg/util/digest/digest.go
@@ -71,7 +71,7 @@ func ComputeForIntegration(integration *v1.Integration) (string, error) {
// Integration flows
if len(integration.Spec.Flows) > 0 {
- flows, err := flow.Marshal(integration.Spec.Flows)
+ flows, err := flow.ToYamlDSL(integration.Spec.Flows)
if err != nil {
return "", err
}
diff --git a/pkg/util/flow/flow.go b/pkg/util/flow/flow.go
index e495d55..fa963cf 100644
--- a/pkg/util/flow/flow.go
+++ b/pkg/util/flow/flow.go
@@ -31,13 +31,13 @@ import (
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
)
-// UnmarshalString reads flows contained in a string
-func UnmarshalString(flowsString string) ([]v1.Flow, error) {
- return Unmarshal(bytes.NewReader([]byte(flowsString)))
+// FromYamlDSLString creates a slice of flows from a Camel YAML DSL string
+func FromYamlDSLString(flowsString string) ([]v1.Flow, error) {
+ return FromYamlDSL(bytes.NewReader([]byte(flowsString)))
}
-// Unmarshal flows from a stream
-func Unmarshal(reader io.Reader) ([]v1.Flow, error) {
+// FromYamlDSL creates a slice of flows from a Camel YAML DSL stream
+func FromYamlDSL(reader io.Reader) ([]v1.Flow, error) {
buffered, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
@@ -56,8 +56,8 @@ func Unmarshal(reader io.Reader) ([]v1.Flow, error) {
return flows, err
}
-// Marshal flows as byte array
-func Marshal(flows []v1.Flow) ([]byte, error) {
+// ToYamlDSL converts a flow into its Camel YAML DSL equivalent
+func ToYamlDSL(flows []v1.Flow) ([]byte, error) {
data, err := json.Marshal(&flows)
if err != nil {
return nil, err
diff --git a/pkg/util/flow/flow_test.go b/pkg/util/flow/flow_test.go
index c9248cb..731e2ff 100644
--- a/pkg/util/flow/flow_test.go
+++ b/pkg/util/flow/flow_test.go
@@ -34,7 +34,7 @@ func TestReadWriteYaml(t *testing.T) {
`
yamlReader := bytes.NewReader([]byte(yaml))
- flows, err := Unmarshal(yamlReader)
+ flows, err := FromYamlDSL(yamlReader)
assert.NoError(t, err)
assert.NotNil(t, flows)
assert.Len(t, flows, 1)
@@ -46,7 +46,7 @@ func TestReadWriteYaml(t *testing.T) {
assert.NotNil(t, flow["from"])
assert.Nil(t, flow["xx"])
- data, err := Marshal(flows)
+ data, err := ToYamlDSL(flows)
assert.NoError(t, err)
assert.NotNil(t, data)
assert.Equal(t, yaml, string(data))
diff --git a/pkg/util/kubernetes/resolver.go b/pkg/util/kubernetes/resolver.go
index fbb3eff..ae7621a 100644
--- a/pkg/util/kubernetes/resolver.go
+++ b/pkg/util/kubernetes/resolver.go
@@ -22,7 +22,7 @@ import (
"fmt"
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
-
+ "github.com/apache/camel-k/pkg/util/gzip"
corev1 "k8s.io/api/core/v1"
controller "sigs.k8s.io/controller-runtime/pkg/client"
)
@@ -71,10 +71,25 @@ func Resolve(data *v1.DataSpec, mapLookup func(string) (*corev1.ConfigMap, error
//
// Replace ref source content with real content
//
- data.Content = cm.Data["content"]
+ key := data.ContentKey
+ if key == "" {
+ key = "content"
+ }
+ data.Content = cm.Data[key]
data.ContentRef = ""
}
+ if data.Compression {
+ cnt := []byte(data.Content)
+ var uncompressed []byte
+ var err error
+ if uncompressed, err = gzip.UncompressBase64(cnt); err != nil {
+ return err
+ }
+ data.Compression = false
+ data.Content = string(uncompressed)
+ }
+
return nil
}