You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2021/03/05 11:46:51 UTC
[camel-k] 04/05: fix: Do not sink events when the operator lacks
Event permissions
This is an automated email from the ASF dual-hosted git repository.
astefanutti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 3a3b0bcf227130c57d17704edc65e56bd908a10b
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Thu Mar 4 18:47:31 2021 +0100
fix: Do not sink events when the operator lacks Event permissions
---
go.mod | 1 +
pkg/cmd/operator/operator.go | 23 +++++++++-------
pkg/event/broadcaster.go | 62 ++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 76 insertions(+), 10 deletions(-)
diff --git a/go.mod b/go.mod
index a8ded7a..69f7bea 100644
--- a/go.mod
+++ b/go.mod
@@ -48,6 +48,7 @@ require (
k8s.io/apimachinery v0.19.8
k8s.io/client-go v12.0.0+incompatible
k8s.io/gengo v0.0.0-20200428234225-8167cfdcfc14
+ k8s.io/klog/v2 v2.2.0
knative.dev/eventing v0.18.0
knative.dev/pkg v0.0.0-20200922164940-4bf40ad82aab
knative.dev/serving v0.18.0
diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go
index 7b07ef6..1938d28 100644
--- a/pkg/cmd/operator/operator.go
+++ b/pkg/cmd/operator/operator.go
@@ -43,6 +43,7 @@ import (
"github.com/apache/camel-k/pkg/apis"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/controller"
+ "github.com/apache/camel-k/pkg/event"
"github.com/apache/camel-k/pkg/install"
"github.com/apache/camel-k/pkg/platform"
"github.com/apache/camel-k/pkg/util/defaults"
@@ -115,20 +116,23 @@ func Run(healthPort, monitoringPort int32) {
// so that we can check the operator has been granted permission to create
// Events. This is required for the operator to be installable by standard
// admin users, that are not granted create permission on Events by default.
- eventBroadcaster := record.NewBroadcaster()
+ broadcaster := record.NewBroadcaster()
// nolint: gocritic
- if ok, err := kubernetes.CheckPermission(context.TODO(), c, corev1.GroupName, "events", namespace, "", "create"); err != nil {
- log.Error(err, "cannot check permissions for configuring event broadcaster")
- } else if !ok {
- log.Info("Event broadcasting to Kubernetes is disabled because of missing permissions to create events")
+ if ok, err := kubernetes.CheckPermission(context.TODO(), c, corev1.GroupName, "events", namespace, "", "create"); err != nil || !ok {
+ // Do not sink Events to the server as they'll be rejected
+ broadcaster = event.NewSinkLessBroadcaster(broadcaster)
+ if err != nil {
+ log.Error(err, "cannot check permissions for configuring event broadcaster")
+ } else if !ok {
+ log.Info("Event broadcasting to Kubernetes is disabled because of missing permissions to create events")
+ }
} else {
- eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.CoreV1().Events(namespace)})
+ broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.CoreV1().Events(namespace)})
}
- // Create a new Cmd to provide shared dependencies and start components
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Namespace: namespace,
- EventBroadcaster: eventBroadcaster,
+ EventBroadcaster: broadcaster,
HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)),
MetricsBindAddress: ":" + strconv.Itoa(int(monitoringPort)),
})
@@ -165,13 +169,12 @@ func Run(healthPort, monitoringPort int32) {
log.Info("Starting the Cmd.")
- // Start the Cmd
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
log.Error(err, "manager exited non-zero")
os.Exit(1)
}
- eventBroadcaster.Shutdown()
+ broadcaster.Shutdown()
}
// getWatchNamespace returns the Namespace the operator should be watching for changes
diff --git a/pkg/event/broadcaster.go b/pkg/event/broadcaster.go
new file mode 100644
index 0000000..fd8f5be
--- /dev/null
+++ b/pkg/event/broadcaster.go
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package event
+
+import (
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/klog/v2"
+)
+
+type sinkLessBroadcaster struct {
+ broadcaster record.EventBroadcaster
+}
+
+func (s sinkLessBroadcaster) StartEventWatcher(eventHandler func(*corev1.Event)) watch.Interface {
+ return s.broadcaster.StartEventWatcher(eventHandler)
+}
+
+func (s sinkLessBroadcaster) StartRecordingToSink(sink record.EventSink) watch.Interface {
+ return watch.NewEmptyWatch()
+}
+
+func (s sinkLessBroadcaster) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
+ return s.broadcaster.StartLogging(logf)
+}
+
+func (s sinkLessBroadcaster) StartStructuredLogging(verbosity klog.Level) watch.Interface {
+ return s.broadcaster.StartStructuredLogging(verbosity)
+}
+
+func (s sinkLessBroadcaster) NewRecorder(scheme *runtime.Scheme, source corev1.EventSource) record.EventRecorder {
+ return s.broadcaster.NewRecorder(scheme, source)
+}
+
+func (s sinkLessBroadcaster) Shutdown() {
+ s.broadcaster.Shutdown()
+}
+
+var _ record.EventBroadcaster = &sinkLessBroadcaster{}
+
+func NewSinkLessBroadcaster(broadcaster record.EventBroadcaster) record.EventBroadcaster {
+ return &sinkLessBroadcaster{
+ broadcaster: broadcaster,
+ }
+}