You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2020/10/28 14:05:34 UTC

[camel-k] branch release-1.2.x updated (7b4a998 -> c8c1fe6)

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a change to branch release-1.2.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git.


    from 7b4a998  Create 1.2.x release branch
     new 88cab28  Fix #1774: use direct HTTP binding when Knative is not in use
     new d5e491c  Fix #1778: allow pushing to broker via KameletBinding
     new 5ec2dd4  Fix #1785: propagate klb changes to integrations
     new 36a68b8  chore(resources): regen
     new c8c1fe6  chore(release): enable actions on release branches

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/build.yml                        |   2 +
 .github/workflows/builder.yml                      |   2 +
 .github/workflows/knative.yml                      |   2 +
 .github/workflows/kubernetes.yml                   |   2 +
 .github/workflows/openshift.yml                    |   2 +
 .github/workflows/validate.yml                     |   2 +
 addons/strimzi/strimzi_test.go                     |   3 +
 deploy/resources.go                                |   4 +-
 .../files/{knativech2.groovy => display.groovy}    |   3 +-
 e2e/knative/kamelet_test.go                        |  60 +++++++++
 e2e/support/test_support.go                        |  90 ++++++++++++-
 .../common/kamelet-binding-broker/kamelet.feature  |   5 +
 .../logger-sink-binding.yaml                       |   8 +-
 .../logger-sink.kamelet.yaml                       |   0
 .../timer-source-binding.yaml                      |  10 +-
 .../timer-source.kamelet.yaml                      |   0
 .../yaks-config.yaml                               |  11 +-
 pkg/cmd/reset.go                                   |  16 +--
 pkg/controller/kameletbinding/common.go            | 139 +++++++++++++++++++++
 pkg/controller/kameletbinding/initialize.go        |  69 +---------
 .../kameletbinding/kamelet_binding_controller.go   |  10 ++
 pkg/controller/kameletbinding/monitor.go           |  39 +++++-
 pkg/util/bindings/api.go                           |   2 +
 pkg/util/bindings/bindings_test.go                 |  15 +++
 pkg/util/bindings/knative_ref.go                   |  12 +-
 pkg/util/bindings/knative_uri.go                   |   4 +
 pkg/util/knative/apis_test.go                      |   4 +-
 pkg/util/knative/knative.go                        |   2 +-
 pkg/util/knative/uri.go                            |  11 +-
 pkg/util/knative/uri_test.go                       |  10 +-
 30 files changed, 432 insertions(+), 107 deletions(-)
 copy e2e/knative/files/{knativech2.groovy => display.groovy} (92%)
 create mode 100644 e2e/knative/kamelet_test.go
 create mode 100644 e2e/yaks/common/kamelet-binding-broker/kamelet.feature
 copy e2e/yaks/common/{kamelet-binding => kamelet-binding-broker}/logger-sink-binding.yaml (90%)
 copy e2e/yaks/common/{kamelet-binding => kamelet-binding-broker}/logger-sink.kamelet.yaml (100%)
 copy e2e/yaks/common/{kamelet-binding => kamelet-binding-broker}/timer-source-binding.yaml (88%)
 copy e2e/yaks/common/{kamelet => kamelet-binding-broker}/timer-source.kamelet.yaml (100%)
 copy e2e/yaks/common/{kamelet-binding => kamelet-binding-broker}/yaks-config.yaml (81%)
 create mode 100644 pkg/controller/kameletbinding/common.go


[camel-k] 01/05: Fix #1774: use direct HTTP binding when Knative is not in use

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch release-1.2.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 88cab28a9895ccc2a68f70df61868c089eb0d37f
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Fri Oct 23 12:07:58 2020 +0200

    Fix #1774: use direct HTTP binding when Knative is not in use
---
 addons/strimzi/strimzi_test.go              |  3 +++
 deploy/resources.go                         |  4 +--
 pkg/controller/kameletbinding/initialize.go | 39 +++++++++++++++++++++++++++++
 pkg/util/bindings/api.go                    |  2 ++
 pkg/util/bindings/bindings_test.go          | 15 +++++++++++
 pkg/util/bindings/knative_uri.go            |  4 +++
 6 files changed, 65 insertions(+), 2 deletions(-)

diff --git a/addons/strimzi/strimzi_test.go b/addons/strimzi/strimzi_test.go
index 627283c..13fd4ed 100644
--- a/addons/strimzi/strimzi_test.go
+++ b/addons/strimzi/strimzi_test.go
@@ -22,6 +22,7 @@ import (
 	"encoding/json"
 	"github.com/apache/camel-k/addons/strimzi/duck/v1beta1"
 	"github.com/apache/camel-k/addons/strimzi/duck/v1beta1/client/internalclientset/fake"
+	camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/util/bindings"
 	"github.com/apache/camel-k/pkg/util/test"
@@ -42,6 +43,7 @@ func TestStrimziDirect(t *testing.T) {
 		Ctx:       ctx,
 		Client:    client,
 		Namespace: "test",
+		Profile:   camelv1.TraitProfileKubernetes,
 	}
 
 	endpoint := v1alpha1.Endpoint{
@@ -102,6 +104,7 @@ func TestStrimziLookup(t *testing.T) {
 	bindingContext := bindings.BindingContext{
 		Ctx:       ctx,
 		Namespace: "test",
+		Profile:   camelv1.TraitProfileKubernetes,
 	}
 
 	endpoint := v1alpha1.Endpoint{
diff --git a/deploy/resources.go b/deploy/resources.go
index bb299ae..cf92514 100644
--- a/deploy/resources.go
+++ b/deploy/resources.go
@@ -161,9 +161,9 @@ var assets = func() http.FileSystem {
 		"/operator-deployment.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "operator-deployment.yaml",
 			modTime:          time.Time{},
-			uncompressedSize: 2139,
+			uncompressedSize: 2148,
 
-			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x54\x41\x6f\xe3\x36\x13\xbd\xeb\x57\x3c\x58\x97\x5d\x20\xb6\xbf\x7c\x47\xf5\xa4\x26\x0e\x56\x68\x2a\x1b\x96\xb7\xc1\x9e\x8a\x09\x35\x92\x88\x50\xa4\x4a\x52\xd1\xea\xdf\x17\x94\xed\xd8\xce\x66\xd3\x1e\x82\xf2\x24\x71\x86\x6f\xde\x9b\x37\x64\x8c\xf9\xc7\xad\x28\xc6\xbd\x14\xac\x1d\x97\xf0\x06\xbe\x61\xa4\x1d\x89\x86\x51\x98\xca\x0f\x64\x19\x77\xa6\xd7\x25\x79\x69\x34\x3e\xa5\xc5\xdd\x67\xf4\xba\x64\x0b\xa3\x19\xc6\x [...]
+			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x54\x41\x6f\xe3\x36\x13\xbd\xeb\x57\x3c\x58\x97\x5d\x20\xb6\x37\xdf\x77\x53\x4f\x6a\xe2\x20\x46\x53\xc9\xb0\xbc\x0d\xf6\x54\x4c\xa8\x91\x44\x84\x22\x55\x92\x8a\x56\xff\xbe\xa0\x6c\x27\x76\x36\x9b\xf6\x10\x94\x27\x9b\x33\xf3\xe6\xbd\x79\x23\xc6\x98\x7f\xdc\x89\x62\xdc\x49\xc1\xda\x71\x09\x6f\xe0\x1b\x46\xda\x91\x68\x18\x85\xa9\xfc\x40\x96\x71\x63\x7a\x5d\x92\x97\x46\xe3\x53\x5a\xdc\x7c\x46\xaf\x4b\xb6\x30\x9a\x61\x [...]
 		},
 		"/operator-role-binding-events.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "operator-role-binding-events.yaml",
diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/initialize.go
index d3874d6..8f794d6 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/initialize.go
@@ -24,11 +24,14 @@ import (
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/platform"
 	"github.com/apache/camel-k/pkg/util/bindings"
+	"github.com/apache/camel-k/pkg/util/knative"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"github.com/apache/camel-k/pkg/util/patch"
 	"github.com/pkg/errors"
 	corev1 "k8s.io/api/core/v1"
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/types"
 	"sigs.k8s.io/controller-runtime/pkg/client"
@@ -75,10 +78,17 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
 		it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
 	}
 
+	profile, err := action.determineProfile(ctx, kameletbinding)
+	if err != nil {
+		return nil, err
+	}
+	it.Spec.Profile = profile
+
 	bindingContext := bindings.BindingContext{
 		Ctx:       ctx,
 		Client:    action.client,
 		Namespace: it.Namespace,
+		Profile:   profile,
 	}
 
 	from, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSource, kameletbinding.Spec.Source)
@@ -183,3 +193,32 @@ func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1.
 	}
 	return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
 }
+
+func (action *initializeAction) determineProfile(ctx context.Context, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
+	if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
+		return binding.Spec.Integration.Profile, nil
+	}
+	pl, err := platform.GetCurrentPlatform(ctx, action.client, binding.Namespace)
+	if err != nil && !k8serrors.IsNotFound(err) {
+		return "", errors.Wrap(err, "error while retrieving the integration platform")
+	}
+	if pl != nil {
+		if pl.Status.Profile != "" {
+			return pl.Status.Profile, nil
+		}
+		if pl.Spec.Profile != "" {
+			return pl.Spec.Profile, nil
+		}
+	}
+	if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
+		return v1.TraitProfileKnative, nil
+	}
+	if pl != nil {
+		// Determine profile from cluster type
+		plProfile := platform.GetProfile(pl)
+		if plProfile != "" {
+			return plProfile, nil
+		}
+	}
+	return v1.DefaultTraitProfile, nil
+}
diff --git a/pkg/util/bindings/api.go b/pkg/util/bindings/api.go
index f07d8cc..05ea123 100644
--- a/pkg/util/bindings/api.go
+++ b/pkg/util/bindings/api.go
@@ -20,6 +20,7 @@ package bindings
 
 import (
 	"context"
+
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/client"
@@ -53,4 +54,5 @@ type BindingContext struct {
 	Ctx       context.Context
 	Client    client.Client
 	Namespace string
+	Profile   v1.TraitProfile
 }
diff --git a/pkg/util/bindings/bindings_test.go b/pkg/util/bindings/bindings_test.go
index 385febf..266402a 100644
--- a/pkg/util/bindings/bindings_test.go
+++ b/pkg/util/bindings/bindings_test.go
@@ -36,6 +36,7 @@ func TestBindings(t *testing.T) {
 	testcases := []struct {
 		endpointType v1alpha1.EndpointType
 		endpoint     v1alpha1.Endpoint
+		profile      camelv1.TraitProfile
 		uri          string
 		traits       map[string]camelv1.TraitSpec
 	}{
@@ -160,6 +161,14 @@ func TestBindings(t *testing.T) {
 		{
 			endpointType: v1alpha1.EndpointTypeSink,
 			endpoint: v1alpha1.Endpoint{
+				URI: asStringPointer("https://myurl/hey"),
+			},
+			profile: camelv1.TraitProfileKubernetes,
+			uri:     "https://myurl/hey",
+		},
+		{
+			endpointType: v1alpha1.EndpointTypeSink,
+			endpoint: v1alpha1.Endpoint{
 				URI: asStringPointer("docker://xxx"),
 			},
 			uri: "docker://xxx",
@@ -174,10 +183,16 @@ func TestBindings(t *testing.T) {
 			client, err := test.NewFakeClient()
 			assert.NoError(t, err)
 
+			profile := tc.profile
+			if profile == "" {
+				profile = camelv1.TraitProfileKnative
+			}
+
 			bindingContext := BindingContext{
 				Ctx:       ctx,
 				Client:    client,
 				Namespace: "test",
+				Profile:   profile,
 			}
 
 			binding, err := Translate(bindingContext, tc.endpointType, tc.endpoint)
diff --git a/pkg/util/bindings/knative_uri.go b/pkg/util/bindings/knative_uri.go
index 88bdd0c..01cafa5 100644
--- a/pkg/util/bindings/knative_uri.go
+++ b/pkg/util/bindings/knative_uri.go
@@ -40,6 +40,10 @@ func (k KnativeURIBindingProvider) Translate(ctx BindingContext, endpointType v1
 		// works only on uris
 		return nil, nil
 	}
+	if ctx.Profile != v1.TraitProfileKnative {
+		// use cloudevent binding only in Knative trait profile
+		return nil, nil
+	}
 	if !strings.HasPrefix(*e.URI, "http:") && !strings.HasPrefix(*e.URI, "https:") {
 		// only translates http/https uri to Knative calls
 		return nil, nil


[camel-k] 04/05: chore(resources): regen

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch release-1.2.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 36a68b80c3f9075b46fde8261a8d2340d0ac9ee5
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Mon Oct 26 16:12:42 2020 +0100

    chore(resources): regen
---
 deploy/resources.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/deploy/resources.go b/deploy/resources.go
index cf92514..aa22285 100644
--- a/deploy/resources.go
+++ b/deploy/resources.go
@@ -163,7 +163,7 @@ var assets = func() http.FileSystem {
 			modTime:          time.Time{},
 			uncompressedSize: 2148,
 
-			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x54\x41\x6f\xe3\x36\x13\xbd\xeb\x57\x3c\x58\x97\x5d\x20\xb6\x37\xdf\x77\x53\x4f\x6a\xe2\x20\x46\x53\xc9\xb0\xbc\x0d\xf6\x54\x4c\xa8\x91\x44\x84\x22\x55\x92\x8a\x56\xff\xbe\xa0\x6c\x27\x76\x36\x9b\xf6\x10\x94\x27\x9b\x33\xf3\xe6\xbd\x79\x23\xc6\x98\x7f\xdc\x89\x62\xdc\x49\xc1\xda\x71\x09\x6f\xe0\x1b\x46\xda\x91\x68\x18\x85\xa9\xfc\x40\x96\x71\x63\x7a\x5d\x92\x97\x46\xe3\x53\x5a\xdc\x7c\x46\xaf\x4b\xb6\x30\x9a\x61\x [...]
+			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x54\xc1\x6e\xe3\x36\x10\xbd\xeb\x2b\x1e\xac\xcb\x2e\x10\xdb\xcd\x1e\xd5\x93\x9a\x38\x88\xd1\x54\x32\x2c\x6f\x83\x3d\x15\x13\x6a\x24\x11\xa1\x48\x95\xa4\xa2\xd5\xdf\x17\x94\xed\xc4\xce\x66\xd3\x1e\x82\xf2\x64\x73\x66\xde\xbc\x37\x6f\xc4\x18\xf3\x8f\x3b\x51\x8c\x3b\x29\x58\x3b\x2e\xe1\x0d\x7c\xc3\x48\x3b\x12\x0d\xa3\x30\x95\x1f\xc8\x32\x6e\x4c\xaf\x4b\xf2\xd2\x68\x7c\x4a\x8b\x9b\xcf\xe8\x75\xc9\x16\x46\x33\x8c\x45\x [...]
 		},
 		"/operator-role-binding-events.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "operator-role-binding-events.yaml",


[camel-k] 05/05: chore(release): enable actions on release branches

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch release-1.2.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit c8c1fe682227f04a8e2232335b65cf92bcb33ccd
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Mon Oct 26 16:27:11 2020 +0100

    chore(release): enable actions on release branches
---
 .github/workflows/build.yml      | 2 ++
 .github/workflows/builder.yml    | 2 ++
 .github/workflows/knative.yml    | 2 ++
 .github/workflows/kubernetes.yml | 2 ++
 .github/workflows/openshift.yml  | 2 ++
 .github/workflows/validate.yml   | 2 ++
 6 files changed, 12 insertions(+)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 3174314..b0d087f 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -21,6 +21,7 @@ on:
   pull_request:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
@@ -29,6 +30,7 @@ on:
   push:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
diff --git a/.github/workflows/builder.yml b/.github/workflows/builder.yml
index bce238b..3bcd7e9 100644
--- a/.github/workflows/builder.yml
+++ b/.github/workflows/builder.yml
@@ -21,6 +21,7 @@ on:
   pull_request:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
@@ -29,6 +30,7 @@ on:
   push:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
diff --git a/.github/workflows/knative.yml b/.github/workflows/knative.yml
index 4e01d19..fb93c3a 100644
--- a/.github/workflows/knative.yml
+++ b/.github/workflows/knative.yml
@@ -21,6 +21,7 @@ on:
   pull_request:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
@@ -29,6 +30,7 @@ on:
   push:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
diff --git a/.github/workflows/kubernetes.yml b/.github/workflows/kubernetes.yml
index 78e6885..b138abc 100644
--- a/.github/workflows/kubernetes.yml
+++ b/.github/workflows/kubernetes.yml
@@ -21,6 +21,7 @@ on:
   pull_request:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
@@ -29,6 +30,7 @@ on:
   push:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
diff --git a/.github/workflows/openshift.yml b/.github/workflows/openshift.yml
index e41a078..7502f0a 100644
--- a/.github/workflows/openshift.yml
+++ b/.github/workflows/openshift.yml
@@ -21,6 +21,7 @@ on:
   pull_request:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
@@ -29,6 +30,7 @@ on:
   push:
     branches:
       - master
+      - "release-*"
     paths-ignore:
       - '**.adoc'
       - 'KEYS'
diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml
index f8c2be7..5decec1 100644
--- a/.github/workflows/validate.yml
+++ b/.github/workflows/validate.yml
@@ -21,9 +21,11 @@ on:
   pull_request:
     branches: 
       - master
+      - "release-*"
   push:
     branches:
       - master
+      - "release-*"
 jobs:
   build:
     runs-on: ubuntu-latest


[camel-k] 03/05: Fix #1785: propagate klb changes to integrations

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch release-1.2.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 5ec2dd4db12dffab14da857ab7df3d2450299e1d
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Mon Oct 26 15:11:41 2020 +0100

    Fix #1785: propagate klb changes to integrations
---
 e2e/knative/files/display.groovy                   |  20 ++++
 e2e/knative/kamelet_test.go                        |  60 ++++++++++++
 e2e/support/test_support.go                        |  90 ++++++++++++++++-
 pkg/cmd/reset.go                                   |  16 +--
 .../kameletbinding/{initialize.go => common.go}    | 101 ++-----------------
 pkg/controller/kameletbinding/initialize.go        | 108 +--------------------
 .../kameletbinding/kamelet_binding_controller.go   |  10 ++
 pkg/controller/kameletbinding/monitor.go           |  39 +++++++-
 8 files changed, 233 insertions(+), 211 deletions(-)

diff --git a/e2e/knative/files/display.groovy b/e2e/knative/files/display.groovy
new file mode 100644
index 0000000..43a595f
--- /dev/null
+++ b/e2e/knative/files/display.groovy
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+from('knative:channel/messages')
+    .convertBodyTo(String.class)
+    .to('log:info?showAll=false')
diff --git a/e2e/knative/kamelet_test.go b/e2e/knative/kamelet_test.go
new file mode 100644
index 0000000..cfc90f3
--- /dev/null
+++ b/e2e/knative/kamelet_test.go
@@ -0,0 +1,60 @@
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package knative
+
+import (
+	"testing"
+
+	. "github.com/apache/camel-k/e2e/support"
+	camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	. "github.com/onsi/gomega"
+	v1 "k8s.io/api/core/v1"
+	messaging "knative.dev/eventing/pkg/apis/messaging/v1beta1"
+)
+
+// Test that kamelet binding can be changed and changes propagated to integrations
+func TestKameletChange(t *testing.T) {
+
+	WithNewTestNamespace(t, func(ns string) {
+		RegisterTestingT(t)
+
+		Expect(Kamel("install", "-n", ns).Execute()).Should(BeNil())
+		Expect(CreateTimerKamelet(ns, "timer-source")()).Should(BeNil())
+		Expect(CreateKnativeChannelv1Beta1(ns, "messages")()).Should(BeNil())
+		Expect(Kamel("run", "-n", ns, "files/display.groovy", "-w").Execute()).Should(BeNil())
+		ref := v1.ObjectReference{
+			Kind:       "InMemoryChannel",
+			Name:       "messages",
+			APIVersion: messaging.SchemeGroupVersion.String(),
+		}
+		Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hello"})()).Should(BeNil())
+		Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
+		Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
+		Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hello"))
+
+		Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hi"})()).Should(BeNil())
+		Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
+		Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
+		Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hi"))
+	})
+
+}
diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go
index c070c87..e572602 100644
--- a/e2e/support/test_support.go
+++ b/e2e/support/test_support.go
@@ -23,6 +23,7 @@ package support
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
@@ -33,9 +34,10 @@ import (
 	"testing"
 	"time"
 
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"github.com/google/uuid"
 	"github.com/onsi/gomega"
-
 	"github.com/spf13/cobra"
 	appsv1 "k8s.io/api/apps/v1"
 	"k8s.io/api/batch/v1beta1"
@@ -891,6 +893,92 @@ func CreateKnativeChannelv1Beta1(ns string, name string) func() error {
 }
 
 /*
+	Kamelets
+*/
+
+func CreateTimerKamelet(ns string, name string) func() error {
+	return func() error {
+		kamelet := v1alpha1.Kamelet{
+			ObjectMeta: metav1.ObjectMeta{
+				Namespace: ns,
+				Name:      name,
+			},
+			Spec: v1alpha1.KameletSpec{
+				Definition: v1alpha1.JSONSchemaProps{
+					Properties: map[string]v1alpha1.JSONSchemaProps{
+						"message": {
+							Type: "string",
+						},
+					},
+				},
+				Flow: asFlow(map[string]interface{}{
+					"from": map[string]interface{}{
+						"uri": "timer:tick",
+						"steps": []map[string]interface{}{
+							{
+								"set-body": map[string]interface{}{
+									"constant": "{{message}}",
+								},
+							},
+							{
+								"to": "kamelet:sink",
+							},
+						},
+					},
+				}),
+			},
+		}
+		return TestClient.Create(TestContext, &kamelet)
+	}
+}
+
+func BindKameletTo(ns, name, from string, to corev1.ObjectReference, properties map[string]string) func() error {
+	return func() error {
+		kb := v1alpha1.KameletBinding{
+			ObjectMeta: metav1.ObjectMeta{
+				Namespace: ns,
+				Name:      name,
+			},
+			Spec: v1alpha1.KameletBindingSpec{
+				Source: v1alpha1.Endpoint{
+					Ref: &corev1.ObjectReference{
+						Kind:       "Kamelet",
+						APIVersion: v1alpha1.SchemeGroupVersion.String(),
+						Name:       from,
+					},
+					Properties: asEndpointProperties(properties),
+				},
+				Sink: v1alpha1.Endpoint{
+					Ref:        &to,
+					Properties: asEndpointProperties(map[string]string{}),
+				},
+			},
+		}
+		return kubernetes.ReplaceResource(TestContext, TestClient, &kb)
+	}
+}
+
+func asFlow(source map[string]interface{}) *v1.Flow {
+	bytes, err := json.Marshal(source)
+	if err != nil {
+		panic(err)
+	}
+	return &v1.Flow{
+		RawMessage: bytes,
+	}
+}
+
+func asEndpointProperties(props map[string]string) v1alpha1.EndpointProperties {
+	bytes, err := json.Marshal(props)
+	if err != nil {
+		panic(err)
+	}
+	return v1alpha1.EndpointProperties{
+		RawMessage: bytes,
+	}
+}
+
+/*
 	Namespace testing functions
 */
 
diff --git a/pkg/cmd/reset.go b/pkg/cmd/reset.go
index f6fa6f6..5831d85 100644
--- a/pkg/cmd/reset.go
+++ b/pkg/cmd/reset.go
@@ -61,6 +61,14 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
 	}
 
 	var n int
+	if !o.SkipKameletBindings {
+		if n, err = o.deleteAllKameletBindings(c); err != nil {
+			fmt.Print(err)
+			return
+		}
+		fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace)
+	}
+
 	if !o.SkipIntegrations {
 		if n, err = o.deleteAllIntegrations(c); err != nil {
 			fmt.Print(err)
@@ -77,14 +85,6 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
 		fmt.Printf("%d integration kits deleted from namespace %s\n", n, o.Namespace)
 	}
 
-	if !o.SkipKameletBindings {
-		if n, err = o.deleteAllKameletBindings(c); err != nil {
-			fmt.Print(err)
-			return
-		}
-		fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace)
-	}
-
 	if err = o.resetIntegrationPlatform(c); err != nil {
 		fmt.Println(err)
 		return
diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/common.go
similarity index 51%
copy from pkg/controller/kameletbinding/initialize.go
copy to pkg/controller/kameletbinding/common.go
index 8f794d6..a50b202 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -20,41 +20,19 @@ package kameletbinding
 import (
 	"context"
 	"encoding/json"
-	"strings"
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/client"
 	"github.com/apache/camel-k/pkg/platform"
 	"github.com/apache/camel-k/pkg/util/bindings"
 	"github.com/apache/camel-k/pkg/util/knative"
-	"github.com/apache/camel-k/pkg/util/kubernetes"
-	"github.com/apache/camel-k/pkg/util/patch"
 	"github.com/pkg/errors"
-	corev1 "k8s.io/api/core/v1"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/apimachinery/pkg/types"
-	"sigs.k8s.io/controller-runtime/pkg/client"
 )
 
-// NewInitializeAction returns a action that initializes the kamelet binding configuration when not provided by the user
-func NewInitializeAction() Action {
-	return &initializeAction{}
-}
-
-type initializeAction struct {
-	baseAction
-}
-
-func (action *initializeAction) Name() string {
-	return "initialize"
-}
-
-func (action *initializeAction) CanHandle(kameletbinding *v1alpha1.KameletBinding) bool {
-	return kameletbinding.Status.Phase == v1alpha1.KameletBindingPhaseNone
-}
-
-func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
+func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *v1alpha1.KameletBinding) (*v1.Integration, error) {
 	controller := true
 	blockOwnerDeletion := true
 	it := v1.Integration{
@@ -78,7 +56,7 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
 		it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
 	}
 
-	profile, err := action.determineProfile(ctx, kameletbinding)
+	profile, err := determineProfile(ctx, c, kameletbinding)
 	if err != nil {
 		return nil, err
 	}
@@ -86,7 +64,7 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
 
 	bindingContext := bindings.BindingContext{
 		Ctx:       ctx,
-		Client:    action.client,
+		Client:    c,
 		Namespace: it.Namespace,
 		Profile:   profile,
 	}
@@ -128,77 +106,14 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
 	}
 	it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
 
-	if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != nil {
-		return nil, errors.Wrap(err, "could not create integration for kamelet binding")
-	}
-
-	// propagate Kamelet icon (best effort)
-	action.propagateIcon(ctx, kameletbinding)
-
-	target := kameletbinding.DeepCopy()
-	target.Status.Phase = v1alpha1.KameletBindingPhaseCreating
-	return target, nil
-}
-
-func (action *initializeAction) propagateIcon(ctx context.Context, binding *v1alpha1.KameletBinding) {
-	icon, err := action.findIcon(ctx, binding)
-	if err != nil {
-		action.L.Errorf(err, "cannot find icon for kamelet binding %q", binding.Name)
-		return
-	}
-	if icon == "" {
-		return
-	}
-	// compute patch
-	clone := binding.DeepCopy()
-	clone.Annotations = make(map[string]string)
-	for k, v := range binding.Annotations {
-		clone.Annotations[k] = v
-	}
-	if _, ok := clone.Annotations[v1alpha1.AnnotationIcon]; !ok {
-		clone.Annotations[v1alpha1.AnnotationIcon] = icon
-	}
-	p, err := patch.PositiveMergePatch(binding, clone)
-	if err != nil {
-		action.L.Errorf(err, "cannot compute patch to update icon for kamelet binding %q", binding.Name)
-		return
-	}
-	if len(p) > 0 {
-		if err := action.client.Patch(ctx, clone, client.RawPatch(types.MergePatchType, p)); err != nil {
-			action.L.Errorf(err, "cannot apply merge patch to update icon for kamelet binding %q", binding.Name)
-			return
-		}
-	}
-}
-
-func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1.KameletBinding) (string, error) {
-	var kameletRef *corev1.ObjectReference
-	if binding.Spec.Source.Ref != nil && binding.Spec.Source.Ref.Kind == "Kamelet" && strings.HasPrefix(binding.Spec.Source.Ref.APIVersion, "camel.apache.org/") {
-		kameletRef = binding.Spec.Source.Ref
-	} else if binding.Spec.Sink.Ref != nil && binding.Spec.Sink.Ref.Kind == "Kamelet" && strings.HasPrefix(binding.Spec.Sink.Ref.APIVersion, "camel.apache.org/") {
-		kameletRef = binding.Spec.Sink.Ref
-	}
-
-	if kameletRef == nil {
-		return "", nil
-	}
-
-	key := client.ObjectKey{
-		Namespace: binding.Namespace,
-		Name:      kameletRef.Name,
-	}
-	var kamelet v1alpha1.Kamelet
-	if err := action.client.Get(ctx, key, &kamelet); err != nil {
-		return "", err
-	}
-	return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
+	return &it, nil
 }
 
-func (action *initializeAction) determineProfile(ctx context.Context, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
+func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
 	if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
 		return binding.Spec.Integration.Profile, nil
 	}
-	pl, err := platform.GetCurrentPlatform(ctx, action.client, binding.Namespace)
+	pl, err := platform.GetCurrentPlatform(ctx, c, binding.Namespace)
 	if err != nil && !k8serrors.IsNotFound(err) {
 		return "", errors.Wrap(err, "error while retrieving the integration platform")
 	}
@@ -210,7 +125,7 @@ func (action *initializeAction) determineProfile(ctx context.Context, binding *v
 			return pl.Spec.Profile, nil
 		}
 	}
-	if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
+	if knative.IsEnabledInNamespace(ctx, c, binding.Namespace) {
 		return v1.TraitProfileKnative, nil
 	}
 	if pl != nil {
diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/initialize.go
index 8f794d6..cc9c38f 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/initialize.go
@@ -19,20 +19,13 @@ package kameletbinding
 
 import (
 	"context"
-	"encoding/json"
 	"strings"
 
-	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
-	"github.com/apache/camel-k/pkg/platform"
-	"github.com/apache/camel-k/pkg/util/bindings"
-	"github.com/apache/camel-k/pkg/util/knative"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"github.com/apache/camel-k/pkg/util/patch"
 	"github.com/pkg/errors"
 	corev1 "k8s.io/api/core/v1"
-	k8serrors "k8s.io/apimachinery/pkg/api/errors"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/types"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -55,80 +48,12 @@ func (action *initializeAction) CanHandle(kameletbinding *v1alpha1.KameletBindin
 }
 
 func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
-	controller := true
-	blockOwnerDeletion := true
-	it := v1.Integration{
-		ObjectMeta: metav1.ObjectMeta{
-			Namespace: kameletbinding.Namespace,
-			Name:      kameletbinding.Name,
-			OwnerReferences: []metav1.OwnerReference{
-				{
-					APIVersion:         kameletbinding.APIVersion,
-					Kind:               kameletbinding.Kind,
-					Name:               kameletbinding.Name,
-					UID:                kameletbinding.UID,
-					Controller:         &controller,
-					BlockOwnerDeletion: &blockOwnerDeletion,
-				},
-			},
-		},
-	}
-	// start from the integration spec defined in the binding
-	if kameletbinding.Spec.Integration != nil {
-		it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
-	}
-
-	profile, err := action.determineProfile(ctx, kameletbinding)
-	if err != nil {
-		return nil, err
-	}
-	it.Spec.Profile = profile
-
-	bindingContext := bindings.BindingContext{
-		Ctx:       ctx,
-		Client:    action.client,
-		Namespace: it.Namespace,
-		Profile:   profile,
-	}
-
-	from, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSource, kameletbinding.Spec.Source)
-	if err != nil {
-		return nil, errors.Wrap(err, "could not determine source URI")
-	}
-	to, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSink, kameletbinding.Spec.Sink)
-	if err != nil {
-		return nil, errors.Wrap(err, "could not determine sink URI")
-	}
-
-	if len(from.Traits) > 0 || len(to.Traits) > 0 {
-		if it.Spec.Traits == nil {
-			it.Spec.Traits = make(map[string]v1.TraitSpec)
-		}
-		for k, v := range from.Traits {
-			it.Spec.Traits[k] = v
-		}
-		for k, v := range to.Traits {
-			it.Spec.Traits[k] = v
-		}
-	}
-
-	flow := map[string]interface{}{
-		"from": map[string]interface{}{
-			"uri": from.URI,
-			"steps": []map[string]interface{}{
-				{
-					"to": to.URI,
-				},
-			},
-		},
-	}
-	encodedFlow, err := json.Marshal(flow)
+	it, err := createIntegrationFor(ctx, action.client, kameletbinding)
 	if err != nil {
 		return nil, err
 	}
-	it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
 
-	if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != nil {
+	if err := kubernetes.ReplaceResource(ctx, action.client, it); err != nil {
 		return nil, errors.Wrap(err, "could not create integration for kamelet binding")
 	}
 
@@ -193,32 +118,3 @@ func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1.
 	}
 	return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
 }
-
-func (action *initializeAction) determineProfile(ctx context.Context, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
-	if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
-		return binding.Spec.Integration.Profile, nil
-	}
-	pl, err := platform.GetCurrentPlatform(ctx, action.client, binding.Namespace)
-	if err != nil && !k8serrors.IsNotFound(err) {
-		return "", errors.Wrap(err, "error while retrieving the integration platform")
-	}
-	if pl != nil {
-		if pl.Status.Profile != "" {
-			return pl.Status.Profile, nil
-		}
-		if pl.Spec.Profile != "" {
-			return pl.Spec.Profile, nil
-		}
-	}
-	if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
-		return v1.TraitProfileKnative, nil
-	}
-	if pl != nil {
-		// Determine profile from cluster type
-		plProfile := platform.GetProfile(pl)
-		if plProfile != "" {
-			return plProfile, nil
-		}
-	}
-	return v1.DefaultTraitProfile, nil
-}
diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go b/pkg/controller/kameletbinding/kamelet_binding_controller.go
index 459c69c..dbebfc3 100644
--- a/pkg/controller/kameletbinding/kamelet_binding_controller.go
+++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"time"
 
+	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/client"
 	camelevent "github.com/apache/camel-k/pkg/event"
@@ -84,6 +85,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
 		return err
 	}
 
+	// Watch Integration to propagate changes downstream
+	err = c.Watch(&source.Kind{Type: &v1.Integration{}}, &handler.EnqueueRequestForOwner{
+		OwnerType:    &v1alpha1.KameletBinding{},
+		IsController: false,
+	})
+	if err != nil {
+		return err
+	}
+
 	return nil
 }
 
diff --git a/pkg/controller/kameletbinding/monitor.go b/pkg/controller/kameletbinding/monitor.go
index 9980dc5..9cd768a 100644
--- a/pkg/controller/kameletbinding/monitor.go
+++ b/pkg/controller/kameletbinding/monitor.go
@@ -19,12 +19,14 @@ package kameletbinding
 
 import (
 	"context"
+
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/pkg/errors"
 	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/equality"
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	"sigs.k8s.io/controller-runtime/pkg/client"
-
-	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 )
 
 // NewMonitorAction returns an action that monitors the kamelet binding after it's fully initialized
@@ -52,10 +54,41 @@ func (action *monitorAction) Handle(ctx context.Context, kameletbinding *v1alpha
 		Name:      kameletbinding.Name,
 	}
 	it := v1.Integration{}
-	if err := action.client.Get(ctx, key, &it); err != nil {
+	if err := action.client.Get(ctx, key, &it); err != nil && k8serrors.IsNotFound(err) {
+		target := kameletbinding.DeepCopy()
+		// Rebuild the integration
+		target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+		target.Status.SetCondition(
+			v1alpha1.KameletBindingConditionReady,
+			corev1.ConditionFalse,
+			"",
+			"",
+		)
+		return target, nil
+	} else if err != nil {
 		return nil, errors.Wrapf(err, "could not load integration for KameletBinding %q", kameletbinding.Name)
 	}
 
+	// Check if the integration needs to be changed
+	expected, err := createIntegrationFor(ctx, action.client, kameletbinding)
+	if err != nil {
+		return nil, err
+	}
+
+	if !equality.Semantic.DeepDerivative(expected.Spec, it.Spec) {
+		// KameletBinding has changed and needs rebuild
+		target := kameletbinding.DeepCopy()
+		// Rebuild the integration
+		target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+		target.Status.SetCondition(
+			v1alpha1.KameletBindingConditionReady,
+			corev1.ConditionFalse,
+			"",
+			"",
+		)
+		return target, nil
+	}
+
 	// Map integration phases to KameletBinding phases
 	target := kameletbinding.DeepCopy()
 	if it.Status.Phase == v1.IntegrationPhaseRunning {


[camel-k] 02/05: Fix #1778: allow pushing to broker via KameletBinding

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch release-1.2.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit d5e491c8da271aa074a563d4410b44ee878a7f3d
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Fri Oct 23 14:19:59 2020 +0200

    Fix #1778: allow pushing to broker via KameletBinding
---
 .../common/kamelet-binding-broker/kamelet.feature  |  5 ++
 .../logger-sink-binding.yaml                       | 34 ++++++++++++++
 .../logger-sink.kamelet.yaml                       | 43 +++++++++++++++++
 .../timer-source-binding.yaml                      | 37 +++++++++++++++
 .../timer-source.kamelet.yaml                      | 54 ++++++++++++++++++++++
 .../common/kamelet-binding-broker/yaks-config.yaml | 36 +++++++++++++++
 pkg/util/bindings/knative_ref.go                   | 12 ++++-
 pkg/util/knative/apis_test.go                      |  4 +-
 pkg/util/knative/knative.go                        |  2 +-
 pkg/util/knative/uri.go                            | 11 ++---
 pkg/util/knative/uri_test.go                       | 10 +++-
 11 files changed, 237 insertions(+), 11 deletions(-)

diff --git a/e2e/yaks/common/kamelet-binding-broker/kamelet.feature b/e2e/yaks/common/kamelet-binding-broker/kamelet.feature
new file mode 100644
index 0000000..4fb1de3
--- /dev/null
+++ b/e2e/yaks/common/kamelet-binding-broker/kamelet.feature
@@ -0,0 +1,5 @@
+Feature: Camel K can bind Kamelets to the broker
+
+  Scenario: Sending event to the broker with KameletBinding
+    Given integration logger-sink-binding is running
+    Then integration logger-sink-binding should print message: Hello Custom Event
diff --git a/e2e/yaks/common/kamelet-binding-broker/logger-sink-binding.yaml b/e2e/yaks/common/kamelet-binding-broker/logger-sink-binding.yaml
new file mode 100644
index 0000000..6b0c0a0
--- /dev/null
+++ b/e2e/yaks/common/kamelet-binding-broker/logger-sink-binding.yaml
@@ -0,0 +1,34 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: logger-sink-binding
+spec:
+  source:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1beta1
+      name: default
+    properties:
+      type: custom-type
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: logger-sink
diff --git a/e2e/yaks/common/kamelet-binding-broker/logger-sink.kamelet.yaml b/e2e/yaks/common/kamelet-binding-broker/logger-sink.kamelet.yaml
new file mode 100644
index 0000000..edc710f
--- /dev/null
+++ b/e2e/yaks/common/kamelet-binding-broker/logger-sink.kamelet.yaml
@@ -0,0 +1,43 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: logger-sink
+  label:
+    camel.apache.org/kamelet.type: "sink"
+spec:
+  definition:
+    title: "Logger"
+    description: "Logs the received payload of each incoming event"
+    properties:
+      prefix:
+        title: Prefix
+        description: The prefix to prepend to the logged message
+        type: string
+        default: "message: "
+  types:
+    in:
+      mediaType: text/plain
+    out:
+      mediaType: text/plain
+  flow:
+    from:
+      uri: "kamelet:source"
+      steps:
+        - log: "{{prefix}}${body}"
diff --git a/e2e/yaks/common/kamelet-binding-broker/timer-source-binding.yaml b/e2e/yaks/common/kamelet-binding-broker/timer-source-binding.yaml
new file mode 100644
index 0000000..317ab02
--- /dev/null
+++ b/e2e/yaks/common/kamelet-binding-broker/timer-source-binding.yaml
@@ -0,0 +1,37 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: timer-source-binding
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: timer-source
+    properties:
+      message: Hello Custom Event
+      period: 1000
+  sink:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1beta1
+      name: default
+    properties:
+      type: custom-type
diff --git a/e2e/yaks/common/kamelet-binding-broker/timer-source.kamelet.yaml b/e2e/yaks/common/kamelet-binding-broker/timer-source.kamelet.yaml
new file mode 100644
index 0000000..dc64b57
--- /dev/null
+++ b/e2e/yaks/common/kamelet-binding-broker/timer-source.kamelet.yaml
@@ -0,0 +1,54 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: timer-source
+  label:
+    camel.apache.org/kamelet.type: "source"
+spec:
+  definition:
+    title: "Timer"
+    description: "Produces periodic events with a custom payload"
+    required:
+      - message
+    properties:
+      period:
+        title: Period
+        description: The time interval between two events
+        type: integer
+        default: 1000
+      message:
+        title: Message
+        description: The message to generate
+        type: string
+  types:
+    out:
+      mediaType: application/json
+      schema:
+        id: text.camel.apache.org
+        type: string
+  flow:
+    from:
+      uri: timer:tick
+      parameters:
+        period: "{{period}}"
+      steps:
+        - set-body:
+            constant: "{{message}}"
+        - to: "kamelet:sink"
diff --git a/e2e/yaks/common/kamelet-binding-broker/yaks-config.yaml b/e2e/yaks/common/kamelet-binding-broker/yaks-config.yaml
new file mode 100644
index 0000000..e8414f8
--- /dev/null
+++ b/e2e/yaks/common/kamelet-binding-broker/yaks-config.yaml
@@ -0,0 +1,36 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+config:
+  namespace:
+    temporary: true
+pre:
+- name: installation
+  run: |
+    # One of the two labels should work
+    oc label namespace $YAKS_NAMESPACE eventing.knative.dev/injection=enabled
+    oc label namespace $YAKS_NAMESPACE knative-eventing-injection=enabled
+
+    kamel install -n $YAKS_NAMESPACE
+
+    kubectl apply -f timer-source.kamelet.yaml -n $YAKS_NAMESPACE
+    kubectl apply -f logger-sink.kamelet.yaml -n $YAKS_NAMESPACE
+
+    kubectl apply -f timer-source-binding.yaml -n $YAKS_NAMESPACE
+    kubectl apply -f logger-sink-binding.yaml -n $YAKS_NAMESPACE
+    kubectl wait kameletbinding timer-source-binding --for=condition=Ready --timeout=10m -n $YAKS_NAMESPACE
+    kubectl wait kameletbinding logger-sink-binding --for=condition=Ready --timeout=10m -n $YAKS_NAMESPACE
diff --git a/pkg/util/bindings/knative_ref.go b/pkg/util/bindings/knative_ref.go
index e3ad420..07e9454 100644
--- a/pkg/util/bindings/knative_ref.go
+++ b/pkg/util/bindings/knative_ref.go
@@ -19,6 +19,7 @@ package bindings
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"net/url"
 
@@ -69,12 +70,21 @@ func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointType v1
 
 	var serviceURI string
 	if *serviceType == knativeapis.CamelServiceTypeEvent {
+		// TODO enable this when the runtime will support changing the broker name (https://github.com/apache/camel-k-runtime/issues/535)
+		//if props["name"] == "" {
+		//	props["name"] = e.Ref.Name
+		//}
 		if eventType, ok := props["type"]; ok {
 			// consume prop
 			delete(props, "type")
 			serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, eventType)
 		} else {
-			serviceURI = fmt.Sprintf("knative:%s", *serviceType)
+			if endpointType == v1alpha1.EndpointTypeSink {
+				// Allowing no event type, but it can fail. See https://github.com/apache/camel-k-runtime/issues/536
+				serviceURI = fmt.Sprintf("knative:%s", *serviceType)
+			} else {
+				return nil, errors.New(`property "type" must be provided when reading from the Broker`)
+			}
 		}
 	} else {
 		serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, url.PathEscape(e.Ref.Name))
diff --git a/pkg/util/knative/apis_test.go b/pkg/util/knative/apis_test.go
index 3f1730f..fd1331d 100644
--- a/pkg/util/knative/apis_test.go
+++ b/pkg/util/knative/apis_test.go
@@ -134,7 +134,7 @@ func TestAPIs(t *testing.T) {
 		Name:       "default",
 	}, refs[0])
 
-	ref, err = ExtractObjectReference("knative:event/ciao?brokerApiVersion=xxx")
+	ref, err = ExtractObjectReference("knative:event/ciao?apiVersion=xxx")
 	assert.Nil(t, err)
 	refs = FillMissingReferenceData(knative.CamelServiceTypeEvent, ref)
 	checkValidRefs(t, refs)
@@ -144,7 +144,7 @@ func TestAPIs(t *testing.T) {
 		Name:       "default",
 	}, refs[0])
 
-	ref, err = ExtractObjectReference("knative:event/ciao?brokerName=aaa")
+	ref, err = ExtractObjectReference("knative:event/ciao?name=aaa")
 	assert.Nil(t, err)
 	refs = FillMissingReferenceData(knative.CamelServiceTypeEvent, ref)
 	checkValidRefs(t, refs)
diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go
index 2a45fb1..4f506ea 100644
--- a/pkg/util/knative/knative.go
+++ b/pkg/util/knative/knative.go
@@ -80,7 +80,7 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e
 		},
 		ObjectMeta: metav1.ObjectMeta{
 			Namespace: brokerReference.Namespace,
-			Name:      brokerReference.Name + "-" + serviceName + "-" + eventType,
+			Name:      brokerReference.Name + "-" + serviceName + "-" + kubernetesutils.SanitizeLabel(eventType),
 		},
 		Spec: eventing.TriggerSpec{
 			Filter: &eventing.TriggerFilter{
diff --git a/pkg/util/knative/uri.go b/pkg/util/knative/uri.go
index 3df11d6..5dd6e69 100644
--- a/pkg/util/knative/uri.go
+++ b/pkg/util/knative/uri.go
@@ -26,14 +26,13 @@ import (
 	v1 "k8s.io/api/core/v1"
 )
 
-var uriRegexp = regexp.MustCompile(`^knative:[/]*(channel|endpoint|event)(?:$|/([A-Za-z0-9.-]+)(?:[/?].*|$))`)
+var uriRegexp = regexp.MustCompile(`^knative:[/]*(channel|endpoint|event)(?:[?].*|$|/([A-Za-z0-9.-]+)(?:[/?].*|$))`)
 var plainNameRegexp = regexp.MustCompile(`^[A-Za-z0-9.-]+$`)
 
 const (
-	paramAPIVersion       = "apiVersion"
-	paramKind             = "kind"
-	paramBrokerName       = "brokerName"
-	paramBrokerAPIVersion = "brokerApiVersion"
+	paramAPIVersion = "apiVersion"
+	paramKind       = "kind"
+	paramBrokerName = "name"
 )
 
 // FilterURIs returns all Knative URIs of the given type from a slice
@@ -62,7 +61,7 @@ func ExtractObjectReference(uri string) (v1.ObjectReference, error) {
 		if name == "" {
 			name = "default"
 		}
-		apiVersion := uriutils.GetQueryParameter(uri, paramBrokerAPIVersion)
+		apiVersion := uriutils.GetQueryParameter(uri, paramAPIVersion)
 		return v1.ObjectReference{
 			Name:       name,
 			APIVersion: apiVersion,
diff --git a/pkg/util/knative/uri_test.go b/pkg/util/knative/uri_test.go
index ece603f..9951278 100644
--- a/pkg/util/knative/uri_test.go
+++ b/pkg/util/knative/uri_test.go
@@ -74,7 +74,7 @@ func TestChannelUri(t *testing.T) {
 		Name:       "ciao",
 	}, ref)
 
-	ref, err = ExtractObjectReference("knative://event/chuck?&brokerApiVersion=eventing.knative.dev/v1beta1&brokerName=broker2")
+	ref, err = ExtractObjectReference("knative://event/chuck?&apiVersion=eventing.knative.dev/v1beta1&name=broker2")
 	assert.Nil(t, err)
 	assert.Equal(t, v1.ObjectReference{
 		APIVersion: "eventing.knative.dev/v1beta1",
@@ -88,6 +88,14 @@ func TestChannelUri(t *testing.T) {
 		Name: "default",
 		Kind: "Broker",
 	}, ref)
+
+	ref, err = ExtractObjectReference("knative://event?&apiVersion=eventing.knative.dev/v1beta13&brokxerName=broker2")
+	assert.Nil(t, err)
+	assert.Equal(t, v1.ObjectReference{
+		APIVersion: "eventing.knative.dev/v1beta13",
+		Name:       "default",
+		Kind:       "Broker",
+	}, ref)
 }
 
 func TestNormalizeToUri(t *testing.T) {