You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2020/09/17 09:10:04 UTC
[camel-k] 09/21: kamelets: add schema to sources and map them in
application.properties
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 11a6500a8af8e63af823dc8356d214a91d8f3d99
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Jul 3 18:06:57 2020 +0200
kamelets: add schema to sources and map them in application.properties
---
pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go | 3 +-
pkg/trait/container.go | 6 +-
pkg/trait/kamelets.go | 33 ++++--
pkg/trait/kamelets_test.go | 52 +++++----
pkg/trait/trait_types.go | 142 ++++++++++++++++++-----
pkg/util/test/client.go | 12 +-
6 files changed, 181 insertions(+), 67 deletions(-)
diff --git a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
index 61e8aa0..1292dc3 100644
--- a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
@@ -476,7 +476,8 @@ func (in *KameletSpec) DeepCopyInto(out *KameletSpec) {
}
if in.Flow != nil {
in, out := &in.Flow, &out.Flow
- *out = (*in).DeepCopy()
+ *out = new(v1.Flow)
+ **out = **in
}
out.Authorization = in.Authorization
if in.Types != nil {
diff --git a/pkg/trait/container.go b/pkg/trait/container.go
index be1ea20..9c19e5e 100644
--- a/pkg/trait/container.go
+++ b/pkg/trait/container.go
@@ -24,7 +24,6 @@ import (
"strings"
"github.com/apache/camel-k/pkg/util"
-
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
@@ -193,10 +192,13 @@ func (t *containerTrait) configureContainer(e *Environment) error {
}
envvar.SetVal(&container.Env, "CAMEL_K_DIGEST", e.Integration.Status.Digest)
- envvar.SetVal(&container.Env, "CAMEL_K_ROUTES", strings.Join(e.ComputeSourcesURI(), ","))
envvar.SetVal(&container.Env, "CAMEL_K_CONF", "/etc/camel/conf/application.properties")
envvar.SetVal(&container.Env, "CAMEL_K_CONF_D", "/etc/camel/conf.d")
+ // Configure sources
+ envvar.SetVal(&container.Env, "CAMEL_K_ROUTES", strings.Join(e.ComputeSourcesURI(), ","))
+ e.AddSourcesProperties()
+
t.configureResources(e, &container)
if t.Expose != nil && *t.Expose {
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index 2a98a3a..80d4bc2 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -18,20 +18,21 @@ limitations under the License.
package trait
import (
+ "encoding/json"
"fmt"
+ "regexp"
+ "sort"
+ "strconv"
+ "strings"
+
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/metadata"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/digest"
- "github.com/apache/camel-k/pkg/util/flows"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "regexp"
"sigs.k8s.io/controller-runtime/pkg/client"
- "sort"
- "strconv"
- "strings"
)
// The kamelets trait is a platform trait used to inject Kamelets into the integration runtime.
@@ -136,19 +137,21 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet v1alpha1.Kame
var sources []v1.SourceSpec
if kamelet.Spec.Flow != nil {
- flowData, err := flows.Marshal([]v1.Flow{*kamelet.Spec.Flow})
- if err != nil {
- return err
- }
+ // TODO fixme removed for changes to Flow
+ //flowData, err := flows.Marshal([]v1.Flow{*kamelet.Spec.Flow})
+ //if err != nil {
+ // return err
+ //}
flowSource := v1.SourceSpec{
DataSpec: v1.DataSpec{
- Name: fmt.Sprintf("%s.yaml", kamelet.Name),
- Content: string(flowData),
+ Name: fmt.Sprintf("%s.yaml", kamelet.Name),
+ //Content: string(flowData),
+ Content: string(*kamelet.Spec.Flow),
},
Language: v1.LanguageYaml,
Type: v1.SourceTypeKamelet,
}
- flowSource, err = integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name))
+ flowSource, err := integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name))
if err != nil {
return err
}
@@ -272,6 +275,11 @@ func integrationSourceFromKameletSource(e *Environment, kamelet v1alpha1.Kamelet
// Create configmaps to avoid storing kamelet definitions in the integration CR
+ schema, err := json.Marshal(kamelet.Spec.Definition)
+ if err != nil {
+ return v1.SourceSpec{}, err
+ }
+
// Compute the input digest and store it along with the configmap
hash, err := digest.ComputeForSource(source)
if err != nil {
@@ -301,6 +309,7 @@ func integrationSourceFromKameletSource(e *Environment, kamelet v1alpha1.Kamelet
},
Data: map[string]string{
"content": source.Content,
+ "schema": string(schema),
},
}
diff --git a/pkg/trait/kamelets_test.go b/pkg/trait/kamelets_test.go
index 4916046..59657be 100644
--- a/pkg/trait/kamelets_test.go
+++ b/pkg/trait/kamelets_test.go
@@ -19,19 +19,18 @@ package trait
import (
"context"
- "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
- corev1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
+ "gopkg.in/yaml.v2"
"testing"
- "github.com/apache/camel-k/pkg/util/camel"
-
- "github.com/stretchr/testify/assert"
-
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/pkg/util/camel"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/test"
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
)
func TestConfigurationNoKameletsUsed(t *testing.T) {
@@ -89,11 +88,11 @@ func TestKameletLookup(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: &v1.Flow{
+ Flow: deleteMeAtSomePoint(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
- },
+ }),
Dependencies: []string{
"camel:timer",
"camel:log",
@@ -132,11 +131,11 @@ func TestKameletSecondarySourcesLookup(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: &v1.Flow{
+ Flow: deleteMeAtSomePoint(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
- },
+ }),
Sources: []v1.SourceSpec{
{
DataSpec: v1.DataSpec{
@@ -237,11 +236,11 @@ func TestErrorMultipleKameletSources(t *testing.T) {
Type: v1.SourceTypeKamelet,
},
},
- Flow: &v1.Flow{
+ Flow: deleteMeAtSomePoint(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
- },
+ }),
},
})
enabled, err := trait.Configure(environment)
@@ -266,11 +265,11 @@ func TestMultipleKamelets(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: &v1.Flow{
+ Flow: deleteMeAtSomePoint(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
- },
+ }),
Sources: []v1.SourceSpec{
{
DataSpec: v1.DataSpec{
@@ -291,7 +290,7 @@ func TestMultipleKamelets(t *testing.T) {
Name: "logger",
},
Spec: v1alpha1.KameletSpec{
- Flow: &v1.Flow{
+ Flow: deleteMeAtSomePoint(map[string]interface{}{
"from": map[string]interface{}{
"uri": "tbd:endpoint",
"steps": []interface{}{
@@ -302,7 +301,7 @@ func TestMultipleKamelets(t *testing.T) {
},
},
},
- },
+ }),
Dependencies: []string{
"camel:log",
"camel:tbd",
@@ -359,11 +358,11 @@ func TestKameletConfigLookup(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: &v1.Flow{
+ Flow: deleteMeAtSomePoint(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
- },
+ }),
Dependencies: []string{
"camel:timer",
"camel:log",
@@ -421,11 +420,11 @@ func TestKameletNamedConfigLookup(t *testing.T) {
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
- Flow: &v1.Flow{
+ Flow: deleteMeAtSomePoint(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
- },
+ }),
Dependencies: []string{
"camel:timer",
"camel:log",
@@ -511,3 +510,12 @@ func createKameletsTestEnvironment(flow string, objects ...runtime.Object) (*kam
return trait, environment
}
+
+func deleteMeAtSomePoint(flow map[string]interface{}) *v1.Flow {
+ data, err := yaml.Marshal(flow)
+ if err != nil {
+ panic(err)
+ }
+ f := v1.Flow(data)
+ return &f
+}
diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go
index e815469..f1666f7 100644
--- a/pkg/trait/trait_types.go
+++ b/pkg/trait/trait_types.go
@@ -484,6 +484,10 @@ func (e *Environment) ComputeSourcesURI() []string {
paths := make([]string, 0, len(sources))
for i, s := range sources {
+ if s.Type == v1.SourceTypeKamelet {
+ // Kamelet information is added to application.properties
+ continue
+ }
root := path.Join(SourcesMountPath, fmt.Sprintf("i-source-%03d", i))
srcName := strings.TrimPrefix(s.Name, "/")
@@ -522,6 +526,64 @@ func (e *Environment) ComputeSourcesURI() []string {
return paths
}
+// AddSourcesProperties --
+func (e *Environment) AddSourcesProperties() {
+ sources := e.Integration.Sources()
+ properties := make(map[string]string)
+
+ for _, s := range sources {
+ if s.Type == v1.SourceTypeKamelet {
+ // We compute properties only for Kamelets
+ // TODO move other sources types to this format instead of ENV_VAR
+
+ srcName := strings.TrimPrefix(s.Name, "/")
+ kameletName := srcName
+ if strings.Contains(kameletName, ".") {
+ kameletName = kameletName[0:strings.LastIndex(kameletName, ".")]
+ }
+ root := path.Join(KameletsMountPath, kameletName)
+
+ src := path.Join(root, srcName)
+ src = "file:" + src
+ properties[fmt.Sprintf("camel.k.kamelets[%s].location", kameletName)] = src
+
+ schemaName := path.Join(root, fmt.Sprintf("%s-schema.json", kameletName))
+ properties[fmt.Sprintf("camel.k.kamelets[%s].schema", kameletName)] = schemaName
+
+ if s.InferLanguage() != "" {
+ properties[fmt.Sprintf("camel.k.kamelets[%s].language", kameletName)] = string(s.InferLanguage())
+ }
+ if s.Loader != "" {
+ properties[fmt.Sprintf("camel.k.kamelets[%s].loader", kameletName)] = s.Loader
+ }
+ if s.Compression {
+ properties[fmt.Sprintf("camel.k.kamelets[%s].compression", kameletName)] = "true"
+ }
+
+ interceptors := make([]string, 0, len(s.Interceptors))
+ if s.Interceptors != nil {
+ interceptors = append(interceptors, s.Interceptors...)
+ }
+ if e.Interceptors != nil {
+ interceptors = append(interceptors, e.Interceptors...)
+ }
+ for i, interceptor := range interceptors {
+ properties[fmt.Sprintf("camel.k.kamelets[%s].interceptors[%d]", kameletName, i)] = interceptor
+ }
+ }
+ }
+
+ if len(properties) > 0 {
+ if e.ApplicationProperties == nil {
+ e.ApplicationProperties = properties
+ } else {
+ for k, v := range properties {
+ e.ApplicationProperties[k] = v
+ }
+ }
+ }
+}
+
// ConfigureVolumesAndMounts --
func (e *Environment) ConfigureVolumesAndMounts(vols *[]corev1.Volume, mnts *[]corev1.VolumeMount) {
//
@@ -533,40 +595,68 @@ func (e *Environment) ConfigureVolumesAndMounts(vols *[]corev1.Volume, mnts *[]c
if s.ContentRef != "" {
cmName = s.ContentRef
}
-
- refName := fmt.Sprintf("i-source-%03d", i)
- if s.Type == v1.SourceTypeKamelet {
- refName = fmt.Sprintf("i-kamelet-source-%03d", i)
- }
-
resName := strings.TrimPrefix(s.Name, "/")
- resPath := path.Join(SourcesMountPath, refName)
if s.Type == v1.SourceTypeKamelet {
- resPath = KameletsMountPath
- }
-
- *vols = append(*vols, corev1.Volume{
- Name: refName,
- VolumeSource: corev1.VolumeSource{
- ConfigMap: &corev1.ConfigMapVolumeSource{
- LocalObjectReference: corev1.LocalObjectReference{
- Name: cmName,
+ kameletName := resName
+ if strings.Contains(kameletName, ".") {
+ kameletName = kameletName[0:strings.LastIndex(kameletName, ".")]
+ }
+ refName := fmt.Sprintf("i-kamelet-source-%03d", i)
+ resPath := path.Join(KameletsMountPath, kameletName)
+ schemaResName := fmt.Sprintf("%s-schema.json", kameletName)
+
+ *vols = append(*vols, corev1.Volume{
+ Name: refName,
+ VolumeSource: corev1.VolumeSource{
+ ConfigMap: &corev1.ConfigMapVolumeSource{
+ LocalObjectReference: corev1.LocalObjectReference{
+ Name: cmName,
+ },
+ Items: []corev1.KeyToPath{
+ {
+ Key: "content",
+ Path: resName,
+ },
+ {
+ Key: "schema",
+ Path: schemaResName,
+ },
+ },
},
- Items: []corev1.KeyToPath{
- {
- Key: "content",
- Path: resName,
+ },
+ })
+
+ *mnts = append(*mnts, corev1.VolumeMount{
+ Name: refName,
+ MountPath: resPath,
+ })
+ } else {
+ refName := fmt.Sprintf("i-source-%03d", i)
+ resPath := path.Join(SourcesMountPath, refName)
+
+ *vols = append(*vols, corev1.Volume{
+ Name: refName,
+ VolumeSource: corev1.VolumeSource{
+ ConfigMap: &corev1.ConfigMapVolumeSource{
+ LocalObjectReference: corev1.LocalObjectReference{
+ Name: cmName,
+ },
+ Items: []corev1.KeyToPath{
+ {
+ Key: "content",
+ Path: resName,
+ },
},
},
},
- },
- })
+ })
- *mnts = append(*mnts, corev1.VolumeMount{
- Name: refName,
- MountPath: resPath,
- })
+ *mnts = append(*mnts, corev1.VolumeMount{
+ Name: refName,
+ MountPath: resPath,
+ })
+ }
}
for i, r := range e.Integration.Resources() {
diff --git a/pkg/util/test/client.go b/pkg/util/test/client.go
index b223f95..22e7d0b 100644
--- a/pkg/util/test/client.go
+++ b/pkg/util/test/client.go
@@ -18,6 +18,8 @@ limitations under the License.
package test
import (
+ "strings"
+
"github.com/apache/camel-k/pkg/apis"
"github.com/apache/camel-k/pkg/client"
"k8s.io/apimachinery/pkg/runtime"
@@ -25,7 +27,6 @@ import (
fakeclientset "k8s.io/client-go/kubernetes/fake"
clientscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
-
controller "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
@@ -41,13 +42,16 @@ func NewFakeClient(initObjs ...runtime.Object) (client.Client, error) {
c := fake.NewFakeClientWithScheme(scheme, initObjs...)
filtered := make([]runtime.Object, 0, len(initObjs))
+ skipList := []string{"camel", "knative"}
for _, o := range initObjs {
kinds, _, _ := scheme.ObjectKinds(o)
allow := true
for _, k := range kinds {
- if k.Group == "camel.apache.org" {
- allow = false
- break
+ for _, skip := range skipList {
+ if strings.Contains(k.Group, skip) {
+ allow = false
+ break
+ }
}
}
if allow {