You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/25 05:23:06 UTC

[camel-k] branch main updated: Add preliminary support for the resume API

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e7a96387 Add preliminary support for the resume API
5e7a96387 is described below

commit 5e7a963876c24d788903719bd6b4b568247f9fd1
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue May 17 11:29:01 2022 +0200

    Add preliminary support for the resume API
---
 addons/register_resume.go         |  27 ++++++++
 addons/resume/resume.go           | 126 ++++++++++++++++++++++++++++++++++++++
 pkg/apis/camel/v1/common_types.go |   2 +
 pkg/util/defaults/defaults.go     |   2 +-
 4 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/addons/register_resume.go b/addons/register_resume.go
new file mode 100644
index 000000000..5b5f60947
--- /dev/null
+++ b/addons/register_resume.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package addons
+
+import (
+	"github.com/apache/camel-k/addons/resume"
+	"github.com/apache/camel-k/pkg/trait"
+)
+
+func init() {
+	trait.AddToTraits(resume.NewResumeTrait)
+}
diff --git a/addons/resume/resume.go b/addons/resume/resume.go
new file mode 100644
index 000000000..de590aa65
--- /dev/null
+++ b/addons/resume/resume.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package resume
+
+import (
+	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	traitv1 "github.com/apache/camel-k/pkg/apis/camel/v1/trait"
+	"github.com/apache/camel-k/pkg/metadata"
+	"github.com/apache/camel-k/pkg/trait"
+	"github.com/apache/camel-k/pkg/util"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
+	"github.com/apache/camel-k/pkg/util/log"
+	"k8s.io/utils/pointer"
+)
+
+// The Resume trait can be used to manage and configure resume strategies.
+//
+// This feature is meant to allow quick resume of processing by Camel K instances after they have been restarted. This
+// is an experimental implementation based on the support available on Camel Core:
+// https://camel.apache.org/components/next/eips/resume-strategies.html.
+//
+// The Resume trait is disabled by default.
+//
+// The main different from the implementation on Core is that it's not necessary to bind the strategies to the
+// registry. This step will be done automatically by Camel K, after resolving the options passed to the trait.
+//
+// A sample execution of this trait, using the Kafka backend (the only one supported at the moment), would require
+// the following trait options:
+// -t resume.enabled=true -t resume.resume-path=camel-file-sets -t resume.resume-server="address-of-your-kafka:9092"
+//
+// +camel-k:trait=resume.
+type Trait struct {
+	traitv1.Trait `property:",squash"`
+	// Enables automatic configuration of the trait.
+	Auto *bool `property:"auto" json:"auto,omitempty"`
+	// The type of the resume strategy to use
+	ResumeStrategy string `property:"resume-strategy,omitempty"`
+	// The path used by the resume strategy (this is specific to the resume strategy type)
+	ResumePath string `property:"resume-path,omitempty"`
+	// The address of the resume server to use (protocol / implementation specific)
+	ResumeServer string `property:"resume-server,omitempty"`
+	// The adapter-specific policy to use when filling the cache (use: minimizing / maximizing). Check
+	// the component documentation if unsure
+	CacheFillPolicy string `property:"cache-fill-policy,omitempty"`
+}
+
+type resumeTrait struct {
+	trait.BaseTrait
+	Trait `property:",squash"`
+}
+
+const (
+	KafkaSingle  = "org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy"
+	StrategyPath = "camel-k-offsets"
+)
+
+func NewResumeTrait() trait.Trait {
+	return &resumeTrait{
+		BaseTrait: trait.NewBaseTrait("resume", trait.TraitOrderBeforeControllerCreation),
+	}
+}
+
+func (r *resumeTrait) Configure(environment *trait.Environment) (bool, error) {
+	if !pointer.BoolDeref(r.Enabled, false) {
+		return false, nil
+	}
+
+	if !environment.IntegrationInPhase(v1.IntegrationPhaseInitialization) && !environment.IntegrationInRunningPhases() {
+		return false, nil
+	}
+
+	if pointer.BoolDeref(r.Auto, true) {
+		// Check which components have been used
+		sources, err := kubernetes.ResolveIntegrationSources(environment.Ctx, r.Client, environment.Integration, environment.Resources)
+		if err != nil {
+			return false, err
+		}
+
+		meta := metadata.ExtractAll(environment.CamelCatalog, sources)
+
+		for _, endpoint := range meta.FromURIs {
+			log.Infof("Processing component %s", endpoint)
+		}
+
+		if r.ResumeStrategy == "" {
+			r.ResumeStrategy = KafkaSingle
+		}
+
+		if r.ResumePath == "" {
+			r.ResumePath = StrategyPath
+		}
+	}
+
+	return r.Enabled != nil && *r.Enabled, nil
+}
+
+func (r *resumeTrait) Apply(environment *trait.Environment) error {
+	if environment.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+		util.StringSliceUniqueAdd(&environment.Integration.Status.Capabilities, v1.CapabilityResumeKafka)
+	}
+
+	if environment.IntegrationInRunningPhases() {
+		environment.ApplicationProperties["customizer.resume.enabled"] = "true"
+		environment.ApplicationProperties["customizer.resume.resumeStrategy"] = r.ResumeStrategy
+		environment.ApplicationProperties["customizer.resume.resumePath"] = r.ResumePath
+		environment.ApplicationProperties["customizer.resume.resumeServer"] = r.ResumeServer
+		environment.ApplicationProperties["customizer.resume.cacheFillPolicy"] = r.CacheFillPolicy
+	}
+
+	return nil
+}
diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go
index 660ab9a60..4a585256c 100644
--- a/pkg/apis/camel/v1/common_types.go
+++ b/pkg/apis/camel/v1/common_types.go
@@ -304,6 +304,8 @@ const (
 	CapabilityTracing = "tracing"
 	// CapabilityMaster defines the master capability
 	CapabilityMaster = "master"
+	// CapabilityResumeKafka defines the resume capability
+	CapabilityResumeKafka = "resume-kafka"
 )
 
 // +kubebuilder:object:generate=false
diff --git a/pkg/util/defaults/defaults.go b/pkg/util/defaults/defaults.go
index 06b6a20b2..0775adc62 100644
--- a/pkg/util/defaults/defaults.go
+++ b/pkg/util/defaults/defaults.go
@@ -47,5 +47,5 @@ const (
 	installDefaultKamelets = true
 )
 
-// GitCommit must be provided during application build
+//GitCommit must be provided during application build
 var GitCommit string