You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/25 05:23:06 UTC
[camel-k] branch main updated: Add preliminary support for the resume API
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push:
new 5e7a96387 Add preliminary support for the resume API
5e7a96387 is described below
commit 5e7a963876c24d788903719bd6b4b568247f9fd1
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue May 17 11:29:01 2022 +0200
Add preliminary support for the resume API
---
addons/register_resume.go | 27 ++++++++
addons/resume/resume.go | 126 ++++++++++++++++++++++++++++++++++++++
pkg/apis/camel/v1/common_types.go | 2 +
pkg/util/defaults/defaults.go | 2 +-
4 files changed, 156 insertions(+), 1 deletion(-)
diff --git a/addons/register_resume.go b/addons/register_resume.go
new file mode 100644
index 000000000..5b5f60947
--- /dev/null
+++ b/addons/register_resume.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package addons
+
+import (
+ "github.com/apache/camel-k/addons/resume"
+ "github.com/apache/camel-k/pkg/trait"
+)
+
+func init() {
+ trait.AddToTraits(resume.NewResumeTrait)
+}
diff --git a/addons/resume/resume.go b/addons/resume/resume.go
new file mode 100644
index 000000000..de590aa65
--- /dev/null
+++ b/addons/resume/resume.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package resume
+
+import (
+ v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+ traitv1 "github.com/apache/camel-k/pkg/apis/camel/v1/trait"
+ "github.com/apache/camel-k/pkg/metadata"
+ "github.com/apache/camel-k/pkg/trait"
+ "github.com/apache/camel-k/pkg/util"
+ "github.com/apache/camel-k/pkg/util/kubernetes"
+ "github.com/apache/camel-k/pkg/util/log"
+ "k8s.io/utils/pointer"
+)
+
+// The Resume trait can be used to manage and configure resume strategies.
+//
+// This feature is meant to allow quick resume of processing by Camel K instances after they have been restarted. This
+// is an experimental implementation based on the support available on Camel Core:
+// https://camel.apache.org/components/next/eips/resume-strategies.html.
+//
+// The Resume trait is disabled by default.
+//
+// The main different from the implementation on Core is that it's not necessary to bind the strategies to the
+// registry. This step will be done automatically by Camel K, after resolving the options passed to the trait.
+//
+// A sample execution of this trait, using the Kafka backend (the only one supported at the moment), would require
+// the following trait options:
+// -t resume.enabled=true -t resume.resume-path=camel-file-sets -t resume.resume-server="address-of-your-kafka:9092"
+//
+// +camel-k:trait=resume.
+type Trait struct {
+ traitv1.Trait `property:",squash"`
+ // Enables automatic configuration of the trait.
+ Auto *bool `property:"auto" json:"auto,omitempty"`
+ // The type of the resume strategy to use
+ ResumeStrategy string `property:"resume-strategy,omitempty"`
+ // The path used by the resume strategy (this is specific to the resume strategy type)
+ ResumePath string `property:"resume-path,omitempty"`
+ // The address of the resume server to use (protocol / implementation specific)
+ ResumeServer string `property:"resume-server,omitempty"`
+ // The adapter-specific policy to use when filling the cache (use: minimizing / maximizing). Check
+ // the component documentation if unsure
+ CacheFillPolicy string `property:"cache-fill-policy,omitempty"`
+}
+
+type resumeTrait struct {
+ trait.BaseTrait
+ Trait `property:",squash"`
+}
+
+const (
+ KafkaSingle = "org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy"
+ StrategyPath = "camel-k-offsets"
+)
+
+func NewResumeTrait() trait.Trait {
+ return &resumeTrait{
+ BaseTrait: trait.NewBaseTrait("resume", trait.TraitOrderBeforeControllerCreation),
+ }
+}
+
+func (r *resumeTrait) Configure(environment *trait.Environment) (bool, error) {
+ if !pointer.BoolDeref(r.Enabled, false) {
+ return false, nil
+ }
+
+ if !environment.IntegrationInPhase(v1.IntegrationPhaseInitialization) && !environment.IntegrationInRunningPhases() {
+ return false, nil
+ }
+
+ if pointer.BoolDeref(r.Auto, true) {
+ // Check which components have been used
+ sources, err := kubernetes.ResolveIntegrationSources(environment.Ctx, r.Client, environment.Integration, environment.Resources)
+ if err != nil {
+ return false, err
+ }
+
+ meta := metadata.ExtractAll(environment.CamelCatalog, sources)
+
+ for _, endpoint := range meta.FromURIs {
+ log.Infof("Processing component %s", endpoint)
+ }
+
+ if r.ResumeStrategy == "" {
+ r.ResumeStrategy = KafkaSingle
+ }
+
+ if r.ResumePath == "" {
+ r.ResumePath = StrategyPath
+ }
+ }
+
+ return r.Enabled != nil && *r.Enabled, nil
+}
+
+func (r *resumeTrait) Apply(environment *trait.Environment) error {
+ if environment.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+ util.StringSliceUniqueAdd(&environment.Integration.Status.Capabilities, v1.CapabilityResumeKafka)
+ }
+
+ if environment.IntegrationInRunningPhases() {
+ environment.ApplicationProperties["customizer.resume.enabled"] = "true"
+ environment.ApplicationProperties["customizer.resume.resumeStrategy"] = r.ResumeStrategy
+ environment.ApplicationProperties["customizer.resume.resumePath"] = r.ResumePath
+ environment.ApplicationProperties["customizer.resume.resumeServer"] = r.ResumeServer
+ environment.ApplicationProperties["customizer.resume.cacheFillPolicy"] = r.CacheFillPolicy
+ }
+
+ return nil
+}
diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go
index 660ab9a60..4a585256c 100644
--- a/pkg/apis/camel/v1/common_types.go
+++ b/pkg/apis/camel/v1/common_types.go
@@ -304,6 +304,8 @@ const (
CapabilityTracing = "tracing"
// CapabilityMaster defines the master capability
CapabilityMaster = "master"
+ // CapabilityResumeKafka defines the resume capability
+ CapabilityResumeKafka = "resume-kafka"
)
// +kubebuilder:object:generate=false
diff --git a/pkg/util/defaults/defaults.go b/pkg/util/defaults/defaults.go
index 06b6a20b2..0775adc62 100644
--- a/pkg/util/defaults/defaults.go
+++ b/pkg/util/defaults/defaults.go
@@ -47,5 +47,5 @@ const (
installDefaultKamelets = true
)
-// GitCommit must be provided during application build
+//GitCommit must be provided during application build
var GitCommit string