You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2021/04/29 13:12:18 UTC

[camel-k] 04/30: feat(kamelets): error handling kamelets binding support

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 1849ca5223a865eb1cf3b7acab7a78c3540f8ea0
Author: Pasquale Congiusti <pa...@gmail.com>
AuthorDate: Tue Apr 13 14:20:45 2021 +0200

    feat(kamelets): error handling kamelets binding support
    
    Mark error-handler kamelet source with errorHandler source type
---
 pkg/apis/camel/v1/common_types.go       |   5 +-
 pkg/controller/kameletbinding/common.go |  52 ++++++++--------
 pkg/trait/kamelets.go                   | 106 ++++++++++++++++++++++++--------
 pkg/util/source/inspector_yaml.go       |   4 --
 pkg/util/source/types.go                |   2 -
 5 files changed, 109 insertions(+), 60 deletions(-)

diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go
index a94d604..e5b327b 100644
--- a/pkg/apis/camel/v1/common_types.go
+++ b/pkg/apis/camel/v1/common_types.go
@@ -206,8 +206,9 @@ type SourceSpec struct {
 type SourceType string
 
 const (
-	SourceTypeDefault  SourceType = ""
-	SourceTypeTemplate SourceType = "template"
+	SourceTypeDefault      SourceType = ""
+	SourceTypeTemplate     SourceType = "template"
+	SourceTypeErrorHandler SourceType = "errorHandler"
 )
 
 // DataSpec --
diff --git a/pkg/controller/kameletbinding/common.go b/pkg/controller/kameletbinding/common.go
index 32d7182..fdeab95 100644
--- a/pkg/controller/kameletbinding/common.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -22,6 +22,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"sort"
+	"strings"
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -79,12 +80,17 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *
 	if err != nil {
 		return nil, errors.Wrap(err, "could not determine sink URI")
 	}
-	var onError *bindings.Binding
+	var errorHandler *bindings.Binding
 	if kameletbinding.Spec.ErrorHandler.Ref != nil || kameletbinding.Spec.ErrorHandler.URI != nil {
-		onError, err = bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler)
+		errorHandler, err = bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler)
 		if err != nil {
 			return nil, errors.Wrap(err, "could not determine error handler URI")
 		}
+
+		err = setErrorHandlerKamelet(errorHandler, kameletbinding.Spec.ErrorHandler)
+		if err != nil {
+			return nil, errors.Wrap(err, "could not set error handler")
+		}
 	}
 
 	steps := make([]*bindings.Binding, 0, len(kameletbinding.Spec.Steps))
@@ -104,8 +110,8 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *
 	allBindings = append(allBindings, from)
 	allBindings = append(allBindings, steps...)
 	allBindings = append(allBindings, to)
-	if onError != nil {
-		allBindings = append(allBindings, onError)
+	if errorHandler != nil {
+		allBindings = append(allBindings, errorHandler)
 	}
 
 	propList := make([]string, 0)
@@ -139,27 +145,6 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *
 		"to": to.URI,
 	})
 
-	// Append Error Handler flow, if it exists
-	if onError != nil {
-		flowErrorHandler := map[string]interface{}{
-			"error-handler": map[string]interface{}{
-				"dead-letter-channel": map[string]interface{}{
-					"dead-letter-uri":                  onError.URI,
-					"dead-letter-handle-new-exception": true,
-					"async-delayed-redelivery":         false,
-					"use-original-message":             true,
-				},
-			},
-		}
-		encodedErrorHandler, err := json.Marshal(flowErrorHandler)
-
-		if err != nil {
-			return nil, err
-		}
-		it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedErrorHandler})
-	}
-
-	// Append From flow: it must exist
 	flowFrom := map[string]interface{}{
 		"from": map[string]interface{}{
 			"uri":   from.URI,
@@ -175,6 +160,23 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *
 	return &it, nil
 }
 
+func setErrorHandlerKamelet(errorHandler *bindings.Binding, kameletSpec v1alpha1.Endpoint) error {
+	if errorHandler.ApplicationProperties == nil {
+		errorHandler.ApplicationProperties = make(map[string]string)
+	}
+	if kameletSpec.URI != nil {
+		if !strings.HasPrefix(*kameletSpec.URI, "kamelet") {
+			return fmt.Errorf("Kamelet Binding only supports kamelet as error handler, provided: %s", *kameletSpec.URI)
+		}
+
+		errorHandler.ApplicationProperties["camel.k.default-error-handler"] = *kameletSpec.URI
+		return nil
+	}
+
+	errorHandler.ApplicationProperties["camel.k.default-error-handler"] = kameletSpec.Ref.Name
+	return nil
+}
+
 func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
 	if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
 		return binding.Spec.Integration.Profile, nil
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index 9d55b19..053678d 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -47,6 +47,8 @@ type kameletsTrait struct {
 	Auto *bool `property:"auto"`
 	// Comma separated list of Kamelet names to load into the current integration
 	List string `property:"list"`
+	// Kamelet name used as error handler
+	ErrorHandler string `property:"error-handler"`
 }
 
 type configurationKey struct {
@@ -103,16 +105,37 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) {
 			metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
 				util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.FromURIs))
 				util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ToURIs))
-				util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ErrorHandlerURIs))
 				return true
 			})
 			sort.Strings(kamelets)
 			t.List = strings.Join(kamelets, ",")
 		}
+		if t.ErrorHandler == "" {
+			t.ErrorHandler = maybeKameletAsDefaultErrorHandler(e.Integration.Configurations())
+		}
+	}
+
+	return t.declareKamelets(), nil
+}
 
+func maybeKameletAsDefaultErrorHandler(properties []v1.ConfigurationSpec) string {
+	for _, property := range properties {
+		if strings.HasPrefix(property.Value, "camel.k.default-error-handler=") {
+			split := strings.Split(property.Value, "=")
+			if len(split) > 0 {
+				if strings.HasPrefix(split[1], "kamelet:") {
+					return extractKamelet(split[1])
+				}
+				return split[1]
+			}
+		}
 	}
 
-	return len(t.getKameletKeys()) > 0, nil
+	return ""
+}
+
+func (t *kameletsTrait) declareKamelets() bool {
+	return len(t.getKameletKeys()) > 0 || t.ErrorHandler != ""
 }
 
 func (t *kameletsTrait) Apply(e *Environment) error {
@@ -133,44 +156,62 @@ func (t *kameletsTrait) Apply(e *Environment) error {
 }
 
 func (t *kameletsTrait) addKamelets(e *Environment) error {
-	kameletKeys := t.getKameletKeys()
-	if len(kameletKeys) > 0 {
+	if t.declareKamelets() {
 		repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
 		if err != nil {
 			return err
 		}
+
+		// Declared kamelets
 		for _, k := range t.getKameletKeys() {
-			kamelet, err := repo.Get(e.C, k)
+			err := initializeKamelet(repo, e, k, t, v1.SourceTypeTemplate)
 			if err != nil {
 				return err
 			}
-			if kamelet == nil {
-				return fmt.Errorf("kamelet %s not found in any of the defined repositories: %s", k, repo.String())
-			}
-
-			// Initialize remote kamelets
-			kamelet, err = kameletutils.Initialize(kamelet)
+		}
+		if t.ErrorHandler != "" {
+			// Possible error handler
+			err = initializeKamelet(repo, e, t.ErrorHandler, t, v1.SourceTypeErrorHandler)
 			if err != nil {
 				return err
 			}
-
-			if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
-				return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
-			}
-
-			if err := t.addKameletAsSource(e, kamelet); err != nil {
-				return err
-			}
-
-			// Adding dependencies from Kamelets
-			util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, kamelet.Spec.Dependencies)
 		}
+
 		// resort dependencies
 		sort.Strings(e.Integration.Status.Dependencies)
 	}
 	return nil
 }
 
+func initializeKamelet(repo repository.KameletRepository, e *Environment, k string, t *kameletsTrait, sourceType v1.SourceType) error {
+	kamelet, err := repo.Get(e.C, k)
+	if err != nil {
+		return err
+	}
+	if kamelet == nil {
+		return fmt.Errorf("kamelet %s not found in any of the defined repositories: %s", k, repo.String())
+	}
+
+	// Initialize remote kamelets
+	kamelet, err = kameletutils.Initialize(kamelet)
+	if err != nil {
+		return err
+	}
+
+	if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
+		return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
+	}
+
+	if err := t.addKameletAsSource(e, kamelet, sourceType); err != nil {
+		return err
+	}
+
+	// Adding dependencies from Kamelets
+	util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, kamelet.Spec.Dependencies)
+
+	return nil
+}
+
 func (t *kameletsTrait) configureApplicationProperties(e *Environment) error {
 	if len(t.getKameletKeys()) > 0 {
 		repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
@@ -215,7 +256,7 @@ func (t *kameletsTrait) configureApplicationProperties(e *Environment) error {
 	return nil
 }
 
-func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kamelet) error {
+func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kamelet, sourceType v1.SourceType) error {
 	sources := make([]v1.SourceSpec, 0)
 
 	if kamelet.Spec.Flow != nil {
@@ -236,7 +277,7 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kam
 				Content: string(flowData),
 			},
 			Language:      v1.LanguageYaml,
-			Type:          v1.SourceTypeTemplate,
+			Type:          sourceType,
 			PropertyNames: propertyNames,
 		}
 		flowSource, err = integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name))
@@ -247,6 +288,9 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kam
 	}
 
 	for idx, s := range kamelet.Spec.Sources {
+		if sourceType == v1.SourceTypeErrorHandler {
+			s.Type = sourceType
+		}
 		intSource, err := integrationSourceFromKameletSource(e, kamelet, s, fmt.Sprintf("%s-kamelet-%s-%03d", e.Integration.Name, kamelet.Name, idx))
 		if err != nil {
 			return err
@@ -405,10 +449,18 @@ func integrationSourceFromKameletSource(e *Environment, kamelet *v1alpha1.Kamele
 
 func extractKamelets(uris []string) (kamelets []string) {
 	for _, uri := range uris {
-		matches := kameletNameRegexp.FindStringSubmatch(uri)
-		if len(matches) > 1 {
-			kamelets = append(kamelets, matches[1])
+		kamelet := extractKamelet(uri)
+		if kamelet != "" {
+			kamelets = append(kamelets, kamelet)
 		}
 	}
 	return
 }
+
+func extractKamelet(uri string) (kamelet string) {
+	matches := kameletNameRegexp.FindStringSubmatch(uri)
+	if len(matches) > 1 {
+		return matches[1]
+	}
+	return ""
+}
diff --git a/pkg/util/source/inspector_yaml.go b/pkg/util/source/inspector_yaml.go
index 4c47dc2..1f96ae8 100644
--- a/pkg/util/source/inspector_yaml.go
+++ b/pkg/util/source/inspector_yaml.go
@@ -78,10 +78,6 @@ func (i YAMLInspector) parseStep(key string, content interface{}, meta *Metadata
 				}
 			}
 		}
-	case "error-handler":
-		deadLetterChannel := content.(map[interface{}]interface{})
-		deadLetterURI := deadLetterChannel["dead-letter-channel"].(map[interface{}]interface{})
-		meta.ErrorHandlerURIs = append(meta.ErrorHandlerURIs, deadLetterURI["dead-letter-uri"].(string))
 	}
 
 	var maybeURI string
diff --git a/pkg/util/source/types.go b/pkg/util/source/types.go
index 46074f8..f6f15e0 100644
--- a/pkg/util/source/types.go
+++ b/pkg/util/source/types.go
@@ -25,8 +25,6 @@ type Metadata struct {
 	FromURIs []string
 	// All end URIs of defined routes
 	ToURIs []string
-	// All error handlers URIs of defined routes
-	ErrorHandlerURIs []string
 	// All inferred dependencies required to run the integration
 	Dependencies *strset.Set
 	// ExposesHTTPServices indicates if a route defined by the source is exposed