You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2021/04/29 13:12:18 UTC
[camel-k] 04/30: feat(kamelets): error handling kamelets binding
support
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 1849ca5223a865eb1cf3b7acab7a78c3540f8ea0
Author: Pasquale Congiusti <pa...@gmail.com>
AuthorDate: Tue Apr 13 14:20:45 2021 +0200
feat(kamelets): error handling kamelets binding support
Mark error-handler kamelet source with errorHandler source type
---
pkg/apis/camel/v1/common_types.go | 5 +-
pkg/controller/kameletbinding/common.go | 52 ++++++++--------
pkg/trait/kamelets.go | 106 ++++++++++++++++++++++++--------
pkg/util/source/inspector_yaml.go | 4 --
pkg/util/source/types.go | 2 -
5 files changed, 109 insertions(+), 60 deletions(-)
diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go
index a94d604..e5b327b 100644
--- a/pkg/apis/camel/v1/common_types.go
+++ b/pkg/apis/camel/v1/common_types.go
@@ -206,8 +206,9 @@ type SourceSpec struct {
type SourceType string
const (
- SourceTypeDefault SourceType = ""
- SourceTypeTemplate SourceType = "template"
+ SourceTypeDefault SourceType = ""
+ SourceTypeTemplate SourceType = "template"
+ SourceTypeErrorHandler SourceType = "errorHandler"
)
// DataSpec --
diff --git a/pkg/controller/kameletbinding/common.go b/pkg/controller/kameletbinding/common.go
index 32d7182..fdeab95 100644
--- a/pkg/controller/kameletbinding/common.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"sort"
+ "strings"
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -79,12 +80,17 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *
if err != nil {
return nil, errors.Wrap(err, "could not determine sink URI")
}
- var onError *bindings.Binding
+ var errorHandler *bindings.Binding
if kameletbinding.Spec.ErrorHandler.Ref != nil || kameletbinding.Spec.ErrorHandler.URI != nil {
- onError, err = bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler)
+ errorHandler, err = bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler)
if err != nil {
return nil, errors.Wrap(err, "could not determine error handler URI")
}
+
+ err = setErrorHandlerKamelet(errorHandler, kameletbinding.Spec.ErrorHandler)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not set error handler")
+ }
}
steps := make([]*bindings.Binding, 0, len(kameletbinding.Spec.Steps))
@@ -104,8 +110,8 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *
allBindings = append(allBindings, from)
allBindings = append(allBindings, steps...)
allBindings = append(allBindings, to)
- if onError != nil {
- allBindings = append(allBindings, onError)
+ if errorHandler != nil {
+ allBindings = append(allBindings, errorHandler)
}
propList := make([]string, 0)
@@ -139,27 +145,6 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *
"to": to.URI,
})
- // Append Error Handler flow, if it exists
- if onError != nil {
- flowErrorHandler := map[string]interface{}{
- "error-handler": map[string]interface{}{
- "dead-letter-channel": map[string]interface{}{
- "dead-letter-uri": onError.URI,
- "dead-letter-handle-new-exception": true,
- "async-delayed-redelivery": false,
- "use-original-message": true,
- },
- },
- }
- encodedErrorHandler, err := json.Marshal(flowErrorHandler)
-
- if err != nil {
- return nil, err
- }
- it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedErrorHandler})
- }
-
- // Append From flow: it must exist
flowFrom := map[string]interface{}{
"from": map[string]interface{}{
"uri": from.URI,
@@ -175,6 +160,23 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *
return &it, nil
}
+func setErrorHandlerKamelet(errorHandler *bindings.Binding, kameletSpec v1alpha1.Endpoint) error {
+ if errorHandler.ApplicationProperties == nil {
+ errorHandler.ApplicationProperties = make(map[string]string)
+ }
+ if kameletSpec.URI != nil {
+ if !strings.HasPrefix(*kameletSpec.URI, "kamelet") {
+ return fmt.Errorf("Kamelet Binding only supports kamelet as error handler, provided: %s", *kameletSpec.URI)
+ }
+
+ errorHandler.ApplicationProperties["camel.k.default-error-handler"] = *kameletSpec.URI
+ return nil
+ }
+
+ errorHandler.ApplicationProperties["camel.k.default-error-handler"] = kameletSpec.Ref.Name
+ return nil
+}
+
func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
return binding.Spec.Integration.Profile, nil
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index 9d55b19..053678d 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -47,6 +47,8 @@ type kameletsTrait struct {
Auto *bool `property:"auto"`
// Comma separated list of Kamelet names to load into the current integration
List string `property:"list"`
+ // Kamelet name used as error handler
+ ErrorHandler string `property:"error-handler"`
}
type configurationKey struct {
@@ -103,16 +105,37 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) {
metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.FromURIs))
util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ToURIs))
- util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ErrorHandlerURIs))
return true
})
sort.Strings(kamelets)
t.List = strings.Join(kamelets, ",")
}
+ if t.ErrorHandler == "" {
+ t.ErrorHandler = maybeKameletAsDefaultErrorHandler(e.Integration.Configurations())
+ }
+ }
+
+ return t.declareKamelets(), nil
+}
+func maybeKameletAsDefaultErrorHandler(properties []v1.ConfigurationSpec) string {
+ for _, property := range properties {
+ if strings.HasPrefix(property.Value, "camel.k.default-error-handler=") {
+ split := strings.Split(property.Value, "=")
+ if len(split) > 0 {
+ if strings.HasPrefix(split[1], "kamelet:") {
+ return extractKamelet(split[1])
+ }
+ return split[1]
+ }
+ }
}
- return len(t.getKameletKeys()) > 0, nil
+ return ""
+}
+
+func (t *kameletsTrait) declareKamelets() bool {
+ return len(t.getKameletKeys()) > 0 || t.ErrorHandler != ""
}
func (t *kameletsTrait) Apply(e *Environment) error {
@@ -133,44 +156,62 @@ func (t *kameletsTrait) Apply(e *Environment) error {
}
func (t *kameletsTrait) addKamelets(e *Environment) error {
- kameletKeys := t.getKameletKeys()
- if len(kameletKeys) > 0 {
+ if t.declareKamelets() {
repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
if err != nil {
return err
}
+
+ // Declared kamelets
for _, k := range t.getKameletKeys() {
- kamelet, err := repo.Get(e.C, k)
+ err := initializeKamelet(repo, e, k, t, v1.SourceTypeTemplate)
if err != nil {
return err
}
- if kamelet == nil {
- return fmt.Errorf("kamelet %s not found in any of the defined repositories: %s", k, repo.String())
- }
-
- // Initialize remote kamelets
- kamelet, err = kameletutils.Initialize(kamelet)
+ }
+ if t.ErrorHandler != "" {
+ // Possible error handler
+ err = initializeKamelet(repo, e, t.ErrorHandler, t, v1.SourceTypeErrorHandler)
if err != nil {
return err
}
-
- if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
- return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
- }
-
- if err := t.addKameletAsSource(e, kamelet); err != nil {
- return err
- }
-
- // Adding dependencies from Kamelets
- util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, kamelet.Spec.Dependencies)
}
+
// resort dependencies
sort.Strings(e.Integration.Status.Dependencies)
}
return nil
}
+func initializeKamelet(repo repository.KameletRepository, e *Environment, k string, t *kameletsTrait, sourceType v1.SourceType) error {
+ kamelet, err := repo.Get(e.C, k)
+ if err != nil {
+ return err
+ }
+ if kamelet == nil {
+ return fmt.Errorf("kamelet %s not found in any of the defined repositories: %s", k, repo.String())
+ }
+
+ // Initialize remote kamelets
+ kamelet, err = kameletutils.Initialize(kamelet)
+ if err != nil {
+ return err
+ }
+
+ if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
+ return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
+ }
+
+ if err := t.addKameletAsSource(e, kamelet, sourceType); err != nil {
+ return err
+ }
+
+ // Adding dependencies from Kamelets
+ util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, kamelet.Spec.Dependencies)
+
+ return nil
+}
+
func (t *kameletsTrait) configureApplicationProperties(e *Environment) error {
if len(t.getKameletKeys()) > 0 {
repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
@@ -215,7 +256,7 @@ func (t *kameletsTrait) configureApplicationProperties(e *Environment) error {
return nil
}
-func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kamelet) error {
+func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kamelet, sourceType v1.SourceType) error {
sources := make([]v1.SourceSpec, 0)
if kamelet.Spec.Flow != nil {
@@ -236,7 +277,7 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kam
Content: string(flowData),
},
Language: v1.LanguageYaml,
- Type: v1.SourceTypeTemplate,
+ Type: sourceType,
PropertyNames: propertyNames,
}
flowSource, err = integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name))
@@ -247,6 +288,9 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kam
}
for idx, s := range kamelet.Spec.Sources {
+ if sourceType == v1.SourceTypeErrorHandler {
+ s.Type = sourceType
+ }
intSource, err := integrationSourceFromKameletSource(e, kamelet, s, fmt.Sprintf("%s-kamelet-%s-%03d", e.Integration.Name, kamelet.Name, idx))
if err != nil {
return err
@@ -405,10 +449,18 @@ func integrationSourceFromKameletSource(e *Environment, kamelet *v1alpha1.Kamele
func extractKamelets(uris []string) (kamelets []string) {
for _, uri := range uris {
- matches := kameletNameRegexp.FindStringSubmatch(uri)
- if len(matches) > 1 {
- kamelets = append(kamelets, matches[1])
+ kamelet := extractKamelet(uri)
+ if kamelet != "" {
+ kamelets = append(kamelets, kamelet)
}
}
return
}
+
+func extractKamelet(uri string) (kamelet string) {
+ matches := kameletNameRegexp.FindStringSubmatch(uri)
+ if len(matches) > 1 {
+ return matches[1]
+ }
+ return ""
+}
diff --git a/pkg/util/source/inspector_yaml.go b/pkg/util/source/inspector_yaml.go
index 4c47dc2..1f96ae8 100644
--- a/pkg/util/source/inspector_yaml.go
+++ b/pkg/util/source/inspector_yaml.go
@@ -78,10 +78,6 @@ func (i YAMLInspector) parseStep(key string, content interface{}, meta *Metadata
}
}
}
- case "error-handler":
- deadLetterChannel := content.(map[interface{}]interface{})
- deadLetterURI := deadLetterChannel["dead-letter-channel"].(map[interface{}]interface{})
- meta.ErrorHandlerURIs = append(meta.ErrorHandlerURIs, deadLetterURI["dead-letter-uri"].(string))
}
var maybeURI string
diff --git a/pkg/util/source/types.go b/pkg/util/source/types.go
index 46074f8..f6f15e0 100644
--- a/pkg/util/source/types.go
+++ b/pkg/util/source/types.go
@@ -25,8 +25,6 @@ type Metadata struct {
FromURIs []string
// All end URIs of defined routes
ToURIs []string
- // All error handlers URIs of defined routes
- ErrorHandlerURIs []string
// All inferred dependencies required to run the integration
Dependencies *strset.Set
// ExposesHTTPServices indicates if a route defined by the source is exposed