You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/01 02:34:03 UTC
[incubator-uniffle] branch master updated: [ISSUE-48][FEATURE][FOLLOW UP] Add webhook component (#188)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d931e8bf [ISSUE-48][FEATURE][FOLLOW UP] Add webhook component (#188)
d931e8bf is described below
commit d931e8bf937982dd6c2f9fed84d65f67659c312a
Author: jasonawang <ja...@tencent.com>
AuthorDate: Thu Sep 1 10:33:59 2022 +0800
[ISSUE-48][FEATURE][FOLLOW UP] Add webhook component (#188)
### What changes were proposed in this pull request?
for issue #48
I add webhook module this time, maybe some logic is missing unit test, I will add in the next PR.
### Why are the changes needed?
Support K8S
### Does this PR introduce _any_ user-facing change?
Yes, We will add the doc later..
### How was this patch tested?
UT
---
deploy/kubernetes/operator/PROJECT | 4 +-
deploy/kubernetes/operator/cmd/webhook/main.go | 44 ++++
deploy/kubernetes/operator/go.mod | 10 +-
deploy/kubernetes/operator/go.sum | 49 ++++
deploy/kubernetes/operator/hack/revive.toml | 1 -
deploy/kubernetes/operator/hack/update-codegen.sh | 2 +-
deploy/kubernetes/operator/pkg/.gitkeep | 16 --
.../kubernetes/operator/pkg/constants/constants.go | 60 +++++
deploy/kubernetes/operator/pkg/utils/certs.go | 230 +++++++++++++++++++
deploy/kubernetes/operator/pkg/utils/config.go | 75 +++++++
.../kubernetes/operator/pkg/utils/coordinator.go | 54 +++++
.../operator/pkg/utils/shufflerserver.go | 101 +++++++++
deploy/kubernetes/operator/pkg/utils/util.go | 102 +++++++++
.../operator/pkg/webhook/config/config.go | 123 ++++++++++
.../operator/pkg/webhook/constants/constants.go | 35 +++
.../operator/pkg/webhook/inspector/inspector.go | 118 ++++++++++
.../operator/pkg/webhook/inspector/pod.go | 146 ++++++++++++
.../operator/pkg/webhook/inspector/rss.go | 157 +++++++++++++
deploy/kubernetes/operator/pkg/webhook/manager.go | 222 ++++++++++++++++++
.../operator/pkg/webhook/manager_test.go | 142 ++++++++++++
.../operator/pkg/webhook/syncer/syncer.go | 248 +++++++++++++++++++++
.../kubernetes/operator/pkg/webhook/util/patch.go | 25 +++
.../kubernetes/operator/pkg/webhook/util/util.go | 213 ++++++++++++++++++
23 files changed, 2156 insertions(+), 21 deletions(-)
diff --git a/deploy/kubernetes/operator/PROJECT b/deploy/kubernetes/operator/PROJECT
index 3cb854f1..e62a3a53 100644
--- a/deploy/kubernetes/operator/PROJECT
+++ b/deploy/kubernetes/operator/PROJECT
@@ -2,7 +2,7 @@ domain: apache.org
layout:
- go.kubebuilder.io/v3
projectName: operator
-repo: github.com/apache/incubator-uniffle
+repo: github.com/apache/incubator-uniffle/deploy/kubernetes/operator
resources:
- api:
crdVersion: v1
@@ -10,6 +10,6 @@ resources:
domain: apache.org
group: uniffle
kind: Remoteshuffleservice
- path: github.com/apache/incubator-uniffle/api/v1alpha1
+ path: github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1
version: v1alpha1
version: "3"
diff --git a/deploy/kubernetes/operator/cmd/webhook/main.go b/deploy/kubernetes/operator/cmd/webhook/main.go
new file mode 100644
index 00000000..ef1735c3
--- /dev/null
+++ b/deploy/kubernetes/operator/cmd/webhook/main.go
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "flag"
+
+ "k8s.io/klog/v2"
+
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
+)
+
+func main() {
+ klog.InitFlags(nil)
+ cfg := &config.Config{}
+ cfg.AddFlags()
+ flag.Parse()
+
+ cfg.Complete()
+ klog.Infof("run config: %+v", cfg)
+
+ // create an admission webhook manager.
+ am := webhook.NewAdmissionManager(cfg)
+ // start the admission webhook manager.
+ if err := am.Start(cfg.RunCtx); err != nil {
+ klog.Fatalf("start admission webhook failed: %v", err)
+ }
+}
diff --git a/deploy/kubernetes/operator/go.mod b/deploy/kubernetes/operator/go.mod
index 2845d777..6121ae4c 100644
--- a/deploy/kubernetes/operator/go.mod
+++ b/deploy/kubernetes/operator/go.mod
@@ -1,11 +1,19 @@
-module github.com/apache/incubator-uniffle
+module github.com/apache/incubator-uniffle/deploy/kubernetes/operator
go 1.16
require (
+ github.com/onsi/ginkgo/v2 v2.1.4
+ github.com/onsi/gomega v1.19.0
+ github.com/parnurzeal/gorequest v0.2.16
+ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+ gomodules.xyz/jsonpatch/v2 v2.2.0
k8s.io/api v0.22.1
k8s.io/apimachinery v0.22.1
k8s.io/client-go v0.22.1
k8s.io/code-generator v0.22.1
+ k8s.io/klog/v2 v2.9.0
+ k8s.io/utils v0.0.0-20210802155522-efc7438f0176
+ moul.io/http2curl v1.0.0 // indirect
sigs.k8s.io/controller-runtime v0.10.0
)
diff --git a/deploy/kubernetes/operator/go.sum b/deploy/kubernetes/operator/go.sum
index c1dc279b..a3e5c1ac 100644
--- a/deploy/kubernetes/operator/go.sum
+++ b/deploy/kubernetes/operator/go.sum
@@ -54,6 +54,7 @@ github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiU
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
@@ -61,7 +62,9 @@ github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnweb
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
+github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
@@ -124,6 +127,7 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v0.4.0 h1:K7/B1jt6fIBQVd4Owv2MqGQClcgf0R266+7C/QjRcLc=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
+github.com/go-logr/zapr v0.4.0 h1:uc1uML3hRYL9/ZZPdgHS/n8Nzo+eaYL/Efxkkamf7OM=
github.com/go-logr/zapr v0.4.0/go.mod h1:tabnROwaDl0UNxkVeFRbY8bwB37GwRv0P8lg6aAiEnk=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
@@ -147,6 +151,7 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@@ -190,6 +195,7 @@ github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OI
github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
@@ -229,7 +235,9 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
+github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
+github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
@@ -268,6 +276,7 @@ github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
@@ -303,12 +312,20 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
+github.com/onsi/ginkgo/v2 v2.1.4 h1:GNapqRSid3zijZ9H77KrgVG4/8KqiyRsxcSxe+7ApXY=
+github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
+github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
+github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
+github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
+github.com/parnurzeal/gorequest v0.2.16 h1:T/5x+/4BT+nj+3eSknXmCTnEVGSzFzPGdpqmUVVZXHQ=
+github.com/parnurzeal/gorequest v0.2.16/go.mod h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfDhbRdTjtNwNiUE=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
@@ -324,20 +341,24 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
+github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
+github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
+github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@@ -384,6 +405,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
@@ -410,12 +432,15 @@ go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
+go.uber.org/zap v1.19.0 h1:mZQZefskPPCMIBCSEH0v2/iUqqLrYtaeqwD6FUGUnFE=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -426,6 +451,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -459,6 +485,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
+golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -489,10 +517,14 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023 h1:ADo5wSpq2gqaCGQWzk7S5vd//0iyyLeAratkEoG5dLE=
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
+golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -507,6 +539,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -553,13 +586,20 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2 h1:c8PlLMqBbOHoqtjteWm5/kbe6rNY2pbRfbIMVnepueo=
golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs=
+golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -568,6 +608,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -616,11 +658,14 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20=
+golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
@@ -728,6 +773,7 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.22.1 h1:ISu3tD/jRhYfSW8jI/Q1e+lRxkR7w9UwQEZ7FgslrwY=
k8s.io/api v0.22.1/go.mod h1:bh13rkTp3F1XEaLGykbyRD2QaTTzPm0e/BMd8ptFONY=
+k8s.io/apiextensions-apiserver v0.22.1 h1:YSJYzlFNFSfUle+yeEXX0lSQyLEoxoPJySRupepb0gE=
k8s.io/apiextensions-apiserver v0.22.1/go.mod h1:HeGmorjtRmRLE+Q8dJu6AYRoZccvCMsghwS8XTUYb2c=
k8s.io/apimachinery v0.22.1 h1:DTARnyzmdHMz7bFWFDDm22AM4pLWTQECMpRTFu2d2OM=
k8s.io/apimachinery v0.22.1/go.mod h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0=
@@ -736,6 +782,7 @@ k8s.io/client-go v0.22.1 h1:jW0ZSHi8wW260FvcXHkIa0NLxFBQszTlhiAVsU5mopw=
k8s.io/client-go v0.22.1/go.mod h1:BquC5A4UOo4qVDUtoc04/+Nxp1MeHcVc1HJm1KmG8kk=
k8s.io/code-generator v0.22.1 h1:zAcKpn+xe9Iyc4qtZlfg4tD0f+SO2h5+e/s4pZPOVhs=
k8s.io/code-generator v0.22.1/go.mod h1:eV77Y09IopzeXOJzndrDyCI88UBok2h6WxAlBwpxa+o=
+k8s.io/component-base v0.22.1 h1:SFqIXsEN3v3Kkr1bS6rstrs1wd45StJqbtgbQ4nRQdo=
k8s.io/component-base v0.22.1/go.mod h1:0D+Bl8rrnsPN9v0dyYvkqFfBeAd4u7n77ze+p8CMiPo=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/gengo v0.0.0-20201214224949-b6c5ce23f027 h1:Uusb3oh8XcdzDF/ndlI4ToKTYVlkCSJP39SRY2mfRAw=
@@ -749,6 +796,8 @@ k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e/go.mod h1:vHXdDvt9+2spS2R
k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210802155522-efc7438f0176 h1:Mx0aa+SUAcNRQbs5jUzV8lkDlGFU8laZsY9jrcVX5SY=
k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
+moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8=
+moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
diff --git a/deploy/kubernetes/operator/hack/revive.toml b/deploy/kubernetes/operator/hack/revive.toml
index 56b8ec01..2db9acdb 100644
--- a/deploy/kubernetes/operator/hack/revive.toml
+++ b/deploy/kubernetes/operator/hack/revive.toml
@@ -29,7 +29,6 @@ warningCode = 1
[rule.if-return]
[rule.increment-decrement]
[rule.var-declaration]
-[rule.package-comments]
[rule.range]
[rule.receiver-naming]
[rule.time-naming]
diff --git a/deploy/kubernetes/operator/hack/update-codegen.sh b/deploy/kubernetes/operator/hack/update-codegen.sh
index b97447c4..518fc197 100755
--- a/deploy/kubernetes/operator/hack/update-codegen.sh
+++ b/deploy/kubernetes/operator/hack/update-codegen.sh
@@ -25,7 +25,7 @@ SCRIPT_ROOT=$(dirname "${BASH_SOURCE}")/..
go mod vendor
-MODULE="github.com/apache/incubator-uniffle"
+MODULE="github.com/apache/incubator-uniffle/deploy/kubernetes/operator"
GENERATED_BASE="pkg"
OUTPUT_PACKAGE="${MODULE}/pkg/generated"
APIS_PACKAGE="${MODULE}/api"
diff --git a/deploy/kubernetes/operator/pkg/.gitkeep b/deploy/kubernetes/operator/pkg/.gitkeep
deleted file mode 100644
index ecb1860d..00000000
--- a/deploy/kubernetes/operator/pkg/.gitkeep
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
\ No newline at end of file
diff --git a/deploy/kubernetes/operator/pkg/constants/constants.go b/deploy/kubernetes/operator/pkg/constants/constants.go
new file mode 100644
index 00000000..70a9395e
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/constants/constants.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constants
+
+const (
+ // LeaderIDSuffix is the suffix of leader id used for components' leader election
+ LeaderIDSuffix = "uniffle.apache.org"
+
+ // PodNamespaceEnv is the name of environment variable indicates pod's namespace.
+ PodNamespaceEnv = "POD_NAMESPACE"
+ // DefaultNamespace is used when environment variable of PodNamespaceEnv is not set.
+ DefaultNamespace = "kube-system"
+
+ // AnnotationMetricsServerPort represents annotation of metrics servers' port.
+ AnnotationMetricsServerPort = "uniffle.apache.org/metrics-server-port"
+ // AnnotationShuffleServerPort represents annotation of port used to identify shuffle servers.
+ AnnotationShuffleServerPort = "uniffle.apache.org/shuffle-server-port"
+ // AnnotationRssName represents annotation of rss object name used by shuffle servers' pods.
+ AnnotationRssName = "uniffle.apache.org/rss-name"
+ // AnnotationRssUID represents annotation of rss object uid used by shuffle servers' pods.
+ AnnotationRssUID = "uniffle.apache.org/rss-uid"
+
+ // LabelCoordinator represents label of coordinators.
+ LabelCoordinator = "uniffle.apache.org/coordinator"
+ // LabelShuffleServer represents label of shuffle servers.
+ LabelShuffleServer = "uniffle.apache.org/shuffle-server"
+
+ // RSSFinalizerName represents finalizer name of rss objects.
+ RSSFinalizerName = "WaitingShuffleServer"
+
+ // RSSCoordinator represents the prefix or identifier of the coordinator.
+ RSSCoordinator = "rss-coordinator"
+ // RSSShuffleServer represents the prefix or identifier of the shuffle server.
+ RSSShuffleServer = "rss-shuffle-server"
+
+ // DefaultInitContainerImage represents default image of init container used to change owner of host paths.
+ DefaultInitContainerImage = "busybox:latest"
+
+ // ShuffleServerConfigKey represents shuffle server configuration key in configMap used by a rss object.
+ ShuffleServerConfigKey = "server.conf"
+ // CoordinatorConfigKey represents coordinator configuration key in configMap used by a rss object.
+ CoordinatorConfigKey = "coordinator.conf"
+ // Log4jPropertiesKey represents log4j properties key in configMap used by a rss object.
+ Log4jPropertiesKey = "log4j.properties"
+)
diff --git a/deploy/kubernetes/operator/pkg/utils/certs.go b/deploy/kubernetes/operator/pkg/utils/certs.go
new file mode 100644
index 00000000..8bd16878
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/certs.go
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "crypto"
+ "crypto/rand"
+ "crypto/rsa"
+ "crypto/x509"
+ "crypto/x509/pkix"
+ "encoding/pem"
+ "errors"
+ "fmt"
+ "math"
+ "math/big"
+ "net"
+ "strings"
+ "time"
+
+ "k8s.io/client-go/util/cert"
+ "k8s.io/client-go/util/keyutil"
+ "k8s.io/klog/v2"
+)
+
+const (
+ certificateBlockType = "CERTIFICATE"
+ rsaKeySize = 2048
+ duration365d = time.Hour * 24 * 365 * 100
+ blockPrivateType = "RSA PRIVATE KEY"
+ blockCertType = "CERTIFICATE"
+)
+
+// newPrivateKey generates a private key.
+func newPrivateKey() (*rsa.PrivateKey, error) {
+ return rsa.GenerateKey(rand.Reader, rsaKeySize)
+}
+
+// SetUpCaKey sets up a new ca private key
+func SetUpCaKey() ([]byte, error) {
+ signingKey, err := newPrivateKey()
+ if err != nil {
+ return nil, fmt.Errorf("failed to create CA private key %v", err)
+ }
+ privateSigningKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(signingKey)
+ if err != nil {
+ klog.Errorf("marshall private key to pem error:%v", err)
+ return nil, err
+ }
+ return privateSigningKeyPEM, nil
+}
+
+// SetUpCaCert sets up a new ca certification.
+func SetUpCaCert(commonName string, pemCAKey []byte) ([]byte, error) {
+ caKey, err := decodePemToRSA(pemCAKey)
+ if err != nil {
+ klog.Errorf("decode pem to rsa private error %v", err)
+ return nil, err
+ }
+ signingCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: commonName}, caKey)
+ if err != nil {
+ klog.Errorf("self sign error %v", err)
+ return nil, fmt.Errorf("failed to create CA cert for apiserver %v", err)
+ }
+ return encodeCertPEM(signingCert), nil
+}
+
+// SetUpSignedCertAndKey uses the ca certificate and ca private key to generate certificates.
+func SetUpSignedCertAndKey(domains []string, ips []net.IP, commonName string,
+ pemCAKey, pemCACert []byte, usage []x509.ExtKeyUsage) (
+ []byte, []byte, error) {
+ // decode ca private key in PEM format.
+ caKey, err := decodePemToRSA(pemCAKey)
+ if err != nil {
+ klog.Errorf("decode pem to rsa private error %v", err)
+ return nil, nil, err
+ }
+ // decode ca certificate in PEM format.
+ caCert, err := decodePemToCert(pemCACert)
+ if err != nil {
+ klog.Errorf("decode pem to cert error %v", err)
+ return nil, nil, err
+ }
+ // create a new private key.
+ signedKey, err := newPrivateKey()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create private key %v", err)
+ }
+ // generate a new certificate by new private key, ca certificate and ca private key.
+ signedCert, err := newSignedCert(
+ &cert.Config{
+ CommonName: commonName,
+ AltNames: cert.AltNames{DNSNames: domains, IPs: ips},
+ Usages: usage,
+ },
+ signedKey, caCert, caKey,
+ )
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create cert %v", err)
+ }
+ privateSigningKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(signedKey)
+ if err != nil {
+ klog.Errorf("marshall private key to pem error:%v", err)
+ return nil, nil, err
+ }
+ // return a new certificate and private key in PEM format.
+ return encodeCertPEM(signedCert), privateSigningKeyPEM, nil
+}
+
+// SetupServerCert sets up the server certificate and private key.
+func SetupServerCert(domain, commonName string) ([]byte, []byte, []byte, error) {
+ signingKey, err := newPrivateKey()
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("failed to create CA private key %v", err)
+ }
+ signingCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: commonName}, signingKey)
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("failed to create CA cert for apiserver %v", err)
+ }
+ key, err := newPrivateKey()
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("failed to create private key for %v", err)
+ }
+
+ signedCert, err := newSignedCert(
+ &cert.Config{
+ CommonName: commonName,
+ AltNames: cert.AltNames{DNSNames: strings.Split(domain, ",")},
+ Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
+ },
+ key, signingCert, signingKey,
+ )
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("failed to create cert %v", err)
+ }
+ privateKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(key)
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("failed to marshal key %v", err)
+ }
+ return encodeCertPEM(signedCert), privateKeyPEM, encodeCertPEM(signingCert), nil
+}
+
+// newSignedCert generates s self-signed cert.
+func newSignedCert(cfg *cert.Config, key crypto.Signer, caCert *x509.Certificate,
+ caKey crypto.Signer) (*x509.Certificate, error) {
+ serial, err := rand.Int(rand.Reader, new(big.Int).SetInt64(math.MaxInt64))
+ if err != nil {
+ return nil, err
+ }
+ if len(cfg.CommonName) == 0 {
+ return nil, errors.New("must specify a CommonName")
+ }
+ if len(cfg.Usages) == 0 {
+ return nil, errors.New("must specify at least one ExtKeyUsage")
+ }
+
+ certTmpl := x509.Certificate{
+ Subject: pkix.Name{
+ CommonName: cfg.CommonName,
+ Organization: cfg.Organization,
+ },
+ DNSNames: cfg.AltNames.DNSNames,
+ IPAddresses: cfg.AltNames.IPs,
+ SerialNumber: serial,
+ NotBefore: caCert.NotBefore,
+ NotAfter: time.Now().Add(duration365d).UTC(),
+ KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
+ ExtKeyUsage: cfg.Usages,
+ }
+ certDERBytes, err := x509.CreateCertificate(rand.Reader, &certTmpl, caCert, key.Public(), caKey)
+ if err != nil {
+ return nil, err
+ }
+ return x509.ParseCertificate(certDERBytes)
+}
+
+// encodeCertPEM encodes a certificate.
+func encodeCertPEM(cert *x509.Certificate) []byte {
+ block := pem.Block{
+ Type: certificateBlockType,
+ Bytes: cert.Raw,
+ }
+ return pem.EncodeToMemory(&block)
+}
+
+// decodePemToRSA decodes pem key to RSA private key.
+func decodePemToRSA(pemKey []byte) (*rsa.PrivateKey, error) {
+ block, _ := pem.Decode(pemKey)
+ if block == nil || block.Type != blockPrivateType {
+ err := errors.New("block is nil or block type is wrong")
+ klog.Errorf("decode pem key error %v", err)
+ return nil, err
+ }
+ pri, err := x509.ParsePKCS1PrivateKey(block.Bytes)
+ if err != nil {
+ klog.Errorf("parse pem key error %v", err)
+ return nil, err
+ }
+ return pri, nil
+}
+
+// decodePemToCert decodes pem cert to x509 certificate.
+func decodePemToCert(pemCert []byte) (*x509.Certificate, error) {
+ block, _ := pem.Decode(pemCert)
+ if block == nil || block.Type != blockCertType {
+ err := errors.New("block is nil or block type is wrong")
+ klog.Errorf("decode pem cert error %v", err)
+ return nil, err
+ }
+ certificate, err := x509.ParseCertificate(block.Bytes)
+ if err != nil {
+ klog.Errorf("parse pem cert error %v", err)
+ return nil, err
+ }
+ return certificate, nil
+}
diff --git a/deploy/kubernetes/operator/pkg/utils/config.go b/deploy/kubernetes/operator/pkg/utils/config.go
new file mode 100644
index 00000000..9fbf2832
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/config.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "context"
+ "flag"
+
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/manager/signals"
+
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+)
+
+const (
+ flagKubeConfig = "kubeconfig"
+)
+
+// GenericConfig stores the basic configuration of admission webhook server.
+type GenericConfig struct {
+ KubeConfig string
+ RESTConfig *rest.Config
+ KubeClient kubernetes.Interface
+ RSSClient versioned.Interface
+ RunCtx context.Context
+}
+
+// AddFlags adds all configurations to the global flags.
+func (c *GenericConfig) AddFlags() {
+ if flag.Lookup(flagKubeConfig) == nil {
+ flag.StringVar(&c.KubeConfig, flagKubeConfig, "",
+ "Paths to a kubeconfig. Only required if out-of-cluster.")
+ }
+}
+
+// Complete is called before the component runs.
+func (c *GenericConfig) Complete() {
+ c.KubeConfig = flag.Lookup(flagKubeConfig).Value.String()
+
+ restConfig, err := clientcmd.BuildConfigFromFlags("", c.KubeConfig)
+ if err != nil {
+ klog.Fatalf("create *rest.Config failed: %v", err)
+ }
+ c.RESTConfig = restConfig
+
+ c.KubeClient, err = kubernetes.NewForConfig(restConfig)
+ if err != nil {
+ klog.Fatalf("create kubeClient failed: %v", err)
+ }
+
+ c.RSSClient, err = versioned.NewForConfig(restConfig)
+ if err != nil {
+ klog.Fatalf("create rssClient failed: %v", err)
+ }
+
+ c.RunCtx = signals.SetupSignalHandler()
+}
diff --git a/deploy/kubernetes/operator/pkg/utils/coordinator.go b/deploy/kubernetes/operator/pkg/utils/coordinator.go
new file mode 100644
index 00000000..b4204694
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/coordinator.go
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "path/filepath"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+
+ unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+)
+
+// GenerateCoordinatorName returns service account or configMap name of coordinators.
+func GenerateCoordinatorName(rss *unifflev1alpha1.RemoteShuffleService) string {
+ return constants.RSSCoordinator + "-" + rss.Name
+}
+
+// GetExcludeNodesConfigMapKey returns configMap key of excluded nodes.
+func GetExcludeNodesConfigMapKey(rss *unifflev1alpha1.RemoteShuffleService) string {
+ return filepath.Base(rss.Spec.Coordinator.ExcludeNodesFilePath)
+}
+
+// GetExcludeNodesMountPath returns excluded nodes file's directory which is used as configMap
+// volume mouth path.
+func GetExcludeNodesMountPath(rss *unifflev1alpha1.RemoteShuffleService) string {
+ return filepath.Dir(rss.Spec.Coordinator.ExcludeNodesFilePath)
+}
+
+// BuildCoordinatorInformerFactory builds informer factory for objects related to coordinators.
+func BuildCoordinatorInformerFactory(kubeClient kubernetes.Interface) informers.SharedInformerFactory {
+ option := func(options *metav1.ListOptions) {
+ options.LabelSelector = constants.LabelCoordinator + "=true"
+ }
+ return informers.NewSharedInformerFactoryWithOptions(
+ kubeClient, 0, informers.WithTweakListOptions(option))
+}
diff --git a/deploy/kubernetes/operator/pkg/utils/shufflerserver.go b/deploy/kubernetes/operator/pkg/utils/shufflerserver.go
new file mode 100644
index 00000000..4e427181
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/shufflerserver.go
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "strings"
+
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+
+ unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+)
+
+// GetShuffleServerNode returns shuffle server node name by pod.
+func GetShuffleServerNode(pod *corev1.Pod) string {
+ return pod.Status.PodIP + ":" + GetShuffleServerPort(pod)
+}
+
+// GetShuffleServerPort returns shuffle server port by pod.
+func GetShuffleServerPort(pod *corev1.Pod) string {
+ return pod.Annotations[constants.AnnotationShuffleServerPort]
+}
+
+// GetMetricsServerPort returns metrics server port by pod.
+func GetMetricsServerPort(pod *corev1.Pod) string {
+ return pod.Annotations[constants.AnnotationMetricsServerPort]
+}
+
+// BuildShuffleServerKey returns shuffler server key used in rss object's status.
+func BuildShuffleServerKey(pod *corev1.Pod) string {
+ return GetRevisionFromPod(pod) + "/" + pod.Name + "/" + GetShuffleServerNode(pod)
+}
+
+// ParseShuffleServerKey parses shuffler server key used in rss object's status.
+func ParseShuffleServerKey(key string) (revision, podName, node string) {
+ values := strings.Split(key, "/")
+ if len(values) == 3 {
+ revision = values[0]
+ podName = values[1]
+ node = values[2]
+ }
+ return
+}
+
+// ConvertShuffleServerKeysToNodes converts shuffle server keys to nodes.
+func ConvertShuffleServerKeysToNodes(keys sets.String) sets.String {
+ values := keys.List()
+ nodes := sets.NewString()
+ for _, v := range values {
+ _, _, node := ParseShuffleServerKey(v)
+ nodes.Insert(node)
+ }
+ return nodes
+}
+
+// GetRevisionFromPod returns revision of the pod belongs to a statefulSet.
+func GetRevisionFromPod(pod *corev1.Pod) string {
+ return pod.Labels[appsv1.ControllerRevisionHashLabelKey]
+}
+
+// GenerateShuffleServerName returns workload or nodePort service name of shuffle servers.
+func GenerateShuffleServerName(rss *unifflev1alpha1.RemoteShuffleService) string {
+ return constants.RSSShuffleServer + "-" + rss.Name
+}
+
+// GenerateShuffleServerLabels returns labels used by statefulSets or pods of shuffle servers.
+func GenerateShuffleServerLabels(rss *unifflev1alpha1.RemoteShuffleService) map[string]string {
+ return map[string]string{
+ "app": GenerateShuffleServerName(rss),
+ constants.LabelShuffleServer: "true",
+ }
+}
+
+// BuildShuffleServerInformerFactory builds an informer factory for shuffle servers.
+func BuildShuffleServerInformerFactory(kubeClient kubernetes.Interface) informers.SharedInformerFactory {
+ option := func(options *metav1.ListOptions) {
+ options.LabelSelector = constants.LabelShuffleServer + "=true"
+ }
+ return informers.NewSharedInformerFactoryWithOptions(
+ kubeClient, 0, informers.WithTweakListOptions(option))
+}
diff --git a/deploy/kubernetes/operator/pkg/utils/util.go b/deploy/kubernetes/operator/pkg/utils/util.go
new file mode 100644
index 00000000..bd336f48
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/util.go
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "sort"
+
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/util/retry"
+ "k8s.io/klog/v2"
+ "k8s.io/utils/strings/slices"
+
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+)
+
+// CreatePodInformerFactory creates pod informer factory by label selector.
+func CreatePodInformerFactory(kubeClient kubernetes.Interface,
+ key, value string) informers.SharedInformerFactory {
+ option := func(options *metav1.ListOptions) {
+ if len(value) > 0 {
+ options.LabelSelector = key + "=" + value
+ } else {
+ options.LabelSelector = key
+ }
+ }
+ return informers.NewSharedInformerFactoryWithOptions(kubeClient, 0,
+ informers.WithTweakListOptions(option))
+}
+
+// GetCurrentNamespace returns current namespace.
+func GetCurrentNamespace() string {
+ namespace := os.Getenv(constants.PodNamespaceEnv)
+ if namespace == "" {
+ namespace = constants.DefaultNamespace
+ }
+ return namespace
+}
+
+// UpdateSecret updates Secret and retries when conflicts are encountered.
+func UpdateSecret(kubeClient kubernetes.Interface, namespace, secretName string,
+ updateFunc func(secret *corev1.Secret)) error {
+ return retry.RetryOnConflict(retry.DefaultRetry, func() error {
+ var err error
+ var secret *corev1.Secret
+ secret, err = kubeClient.CoreV1().Secrets(namespace).
+ Get(context.Background(), secretName, metav1.GetOptions{})
+ if err != nil {
+ klog.Errorf("get secret %v/%v failed: %v", namespace, secretName, err)
+ return err
+ }
+ updateFunc(secret)
+ _, err = kubeClient.CoreV1().Secrets(namespace).Update(context.Background(), secret,
+ metav1.UpdateOptions{})
+ if err != nil {
+ klog.Errorf("update configMap %v/%v failed: %v", namespace, secretName, err)
+ }
+ return err
+ })
+}
+
+// UniqueName returns unique name of an object.
+func UniqueName(object metav1.Object) string {
+ return fmt.Sprintf("%v/%v/%v", object.GetNamespace(), object.GetName(), object.GetUID())
+}
+
+// GetRssNameByPod returns rss object name from a pod.
+func GetRssNameByPod(pod *corev1.Pod) string {
+ return pod.Annotations[constants.AnnotationRssName]
+}
+
+// GetSortedList returns sorted slice.
+func GetSortedList(values sets.String) []string {
+ sorted := slices.Filter(nil, values.List(), func(v string) bool {
+ return len(v) > 0
+ })
+ sort.Slice(sorted, func(i, j int) bool {
+ return sorted[i] < sorted[j]
+ })
+ return sorted
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/config/config.go b/deploy/kubernetes/operator/pkg/webhook/config/config.go
new file mode 100644
index 00000000..438680f5
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/config/config.go
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "crypto/tls"
+ "flag"
+ "fmt"
+ "io/ioutil"
+
+ "k8s.io/klog/v2"
+
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+ webhookconstants "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+)
+
+const (
+ flagIgnoreLastApps = "ignore-last-apps"
+ flagIgnoreRSS = "ignore-rss"
+ flagPort = "port"
+ flagExternalService = "external-service"
+ flagServerCertFile = "server-cert-file"
+ flagServerPrivateKeyFile = "server-private-key-file"
+ flagCACertFile = "ca-cert-file"
+)
+
+// Config contains all configurations.
+type Config struct {
+ IgnoreLastApps bool
+ IgnoreRSS bool
+ HTTPConfig
+ utils.GenericConfig
+}
+
+// HTTPConfig stores all the http-related configurations.
+type HTTPConfig struct {
+ Port int
+ ExternalService string
+ ServerCertFile string
+ ServerKeyFile string
+ CACertFile string
+}
+
+// AddFlags stores http-related configurations.
+func (c *HTTPConfig) AddFlags() {
+ flag.IntVar(&c.Port, flagPort, 9876, "Listening port of admission webhook server.")
+ flag.StringVar(&c.ExternalService, flagExternalService, webhookconstants.ComponentName,
+ "Service name which provides external access to admission webhook server.")
+ flag.StringVar(&c.ServerCertFile, flagServerCertFile, "",
+ "File containing the default x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). "+
+ "If HTTPS serving is enabled, and --server-cert-file and --server-private-key-file are not provided, "+
+ "a self-signed certificate and key will be generated.")
+ flag.StringVar(&c.ServerKeyFile, flagServerPrivateKeyFile, "",
+ "File containing the default x509 private key matching --server-cert-file.")
+ flag.StringVar(&c.CACertFile, flagCACertFile, "", "File containing the ca certificate.")
+}
+
+// NeedLoadCertsFromSecret returns whether we need to load certs from specify secret.
+func (c *HTTPConfig) NeedLoadCertsFromSecret() bool {
+ return len(c.CACertFile) == 0 || len(c.ServerCertFile) == 0 || len(c.ServerKeyFile) == 0
+}
+
+// Addr returns the webhook server's listening address.
+func (c *HTTPConfig) Addr() string {
+ return fmt.Sprintf(":%v", c.Port)
+}
+
+// TLSConfig returns the TLS config.
+func (c *HTTPConfig) TLSConfig() *tls.Config {
+ cert, err := tls.LoadX509KeyPair(c.ServerCertFile, c.ServerKeyFile)
+ if err != nil {
+ klog.Fatal(err)
+ }
+ tlsConfig := &tls.Config{
+ Certificates: []tls.Certificate{cert},
+ }
+ return tlsConfig
+}
+
+// GetCaCert return contents of ca cert.
+func (c *HTTPConfig) GetCaCert() []byte {
+ caCertBody, err := ioutil.ReadFile(c.CACertFile)
+ if err != nil {
+ klog.Fatalf("read ca cert file %v failed: %v", c.CACertFile, err)
+ }
+ return caCertBody
+}
+
+// LeaderElectionID returns leader election ID.
+func (c *Config) LeaderElectionID() string {
+ return "rss-webhook-" + constants.LeaderIDSuffix
+}
+
+// AddFlags adds all configurations to the global flags.
+func (c *Config) AddFlags() {
+ flag.BoolVar(&c.IgnoreLastApps, flagIgnoreLastApps, false,
+ "Used when debugging, it means we will ignore checking last apps.")
+ flag.BoolVar(&c.IgnoreRSS, flagIgnoreRSS, false,
+ "Used when debugging, it means we will ignore checking rss objects.")
+ c.HTTPConfig.AddFlags()
+ c.GenericConfig.AddFlags()
+}
+
+// Complete is called before rss-webhook runs.
+func (c *Config) Complete() {
+ c.GenericConfig.Complete()
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/constants/constants.go b/deploy/kubernetes/operator/pkg/webhook/constants/constants.go
new file mode 100644
index 00000000..4c94865c
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/constants/constants.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constants
+
+const (
+ validatingPrefix = "/validate"
+ mutatingPrefix = "/mutate"
+ rssPath = "/rss"
+ podPath = "/pod"
+
+ // ValidatingPodPath represents pod objects' validating handler's path
+ ValidatingPodPath = validatingPrefix + podPath
+ // ValidatingRssPath represents rss objects' validating handler's path
+ ValidatingRssPath = validatingPrefix + rssPath
+ // MutatingRssPath represents rss objects' mutating handler's path
+ MutatingRssPath = mutatingPrefix + rssPath
+
+ // ComponentName represents name of component.
+ ComponentName = "rss-webhook"
+)
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/inspector.go b/deploy/kubernetes/operator/pkg/webhook/inspector/inspector.go
new file mode 100644
index 00000000..175c4df8
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/inspector.go
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inspector
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "net/http"
+
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ corelisters "k8s.io/client-go/listers/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/informers/externalversions"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/listers/uniffle/v1alpha1"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
+ webhookconstants "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/util"
+)
+
+var _ Inspector = &inspector{}
+
+// Inspector intercepts the request and checks whether the pod can be deleted.
+type Inspector interface {
+ manager.Runnable
+}
+
+// NewInspector creates an Inspector.
+func NewInspector(cfg *config.Config, tlsConfig *tls.Config) Inspector {
+ return newInspector(cfg, tlsConfig)
+}
+
+// newInspector creates an inspector.
+func newInspector(cfg *config.Config, tlsConfig *tls.Config) *inspector {
+ rssInformerFactory := externalversions.NewSharedInformerFactory(cfg.RSSClient, 0)
+ cmInformerFactory := utils.BuildCoordinatorInformerFactory(cfg.KubeClient)
+ i := &inspector{
+ ignoreLastApps: cfg.IgnoreLastApps,
+ ignoreRSS: cfg.IgnoreRSS,
+ tlsConfig: tlsConfig,
+ kubeClient: cfg.KubeClient,
+ rssClient: cfg.RSSClient,
+ rssInformerFactory: rssInformerFactory,
+ rssInformer: rssInformerFactory.Uniffle().V1alpha1().RemoteShuffleServices().Informer(),
+ rssLister: rssInformerFactory.Uniffle().V1alpha1().RemoteShuffleServices().Lister(),
+ cmInformerFactory: cmInformerFactory,
+ cmInformer: cmInformerFactory.Core().V1().ConfigMaps().Informer(),
+ cmLister: cmInformerFactory.Core().V1().ConfigMaps().Lister(),
+ }
+
+ // register handler functions for admission webhook server.
+ mux := http.NewServeMux()
+ mux.HandleFunc(webhookconstants.ValidatingPodPath,
+ util.WithAdmissionReviewHandler(i.validateDeletingShuffleServer))
+ mux.HandleFunc(webhookconstants.ValidatingRssPath,
+ util.WithAdmissionReviewHandler(i.validateRSS))
+ mux.HandleFunc(webhookconstants.MutatingRssPath,
+ util.WithAdmissionReviewHandler(i.mutateRSS))
+ i.server = &http.Server{
+ Addr: cfg.Addr(),
+ Handler: mux,
+ TLSConfig: tlsConfig,
+ }
+
+ return i
+}
+
+// inspector implements the Inspector interface.
+type inspector struct {
+ ignoreLastApps bool
+ ignoreRSS bool
+ tlsConfig *tls.Config
+ server *http.Server
+ kubeClient kubernetes.Interface
+ rssClient versioned.Interface
+ rssInformerFactory externalversions.SharedInformerFactory
+ rssInformer cache.SharedIndexInformer
+ rssLister v1alpha1.RemoteShuffleServiceLister
+ cmInformerFactory informers.SharedInformerFactory
+ cmInformer cache.SharedIndexInformer
+ cmLister corelisters.ConfigMapLister
+}
+
+// Start starts the Inspector.
+func (i *inspector) Start(ctx context.Context) error {
+ i.rssInformerFactory.Start(ctx.Done())
+ i.cmInformerFactory.Start(ctx.Done())
+ if !cache.WaitForCacheSync(ctx.Done(), i.rssInformer.HasSynced, i.cmInformer.HasSynced) {
+ return fmt.Errorf("wait for cache synced failed")
+ }
+ klog.V(2).Info("inspector started")
+ // set up the http server for listening pods and rss objects' validating and mutating requests.
+ if err := i.server.ListenAndServeTLS("", ""); err != nil {
+ return fmt.Errorf("listen error: %v", err)
+ }
+ return nil
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/pod.go b/deploy/kubernetes/operator/pkg/webhook/inspector/pod.go
new file mode 100644
index 00000000..14b7c1ef
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/pod.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inspector
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ admissionv1 "k8s.io/api/admission/v1"
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/klog/v2"
+
+ unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/util"
+)
+
+// validateDeletingShuffleServer validates the delete operation towards shuffle server pods,
+// and update exclude nodes in configMap.
+func (i *inspector) validateDeletingShuffleServer(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview {
+ pod := &corev1.Pod{}
+ if err := json.Unmarshal(ar.Request.OldObject.Raw, pod); err != nil {
+ klog.Errorf("unmarshal object of AdmissionReview (%v) failed: %v",
+ string(ar.Request.Object.Raw), err)
+ return util.AdmissionReviewFailed(ar, err)
+ }
+ rssName := utils.GetRssNameByPod(pod)
+ // allow pods which are not shuffle servers or have been set deletion timestamp.
+ if rssName == "" || !util.NeedInspectPod(pod) {
+ klog.V(4).Infof("ignored non shuffle server or deleting pod: %v->(%+v/%+v/%v)",
+ utils.UniqueName(pod), pod.Labels, pod.Annotations, pod.DeletionTimestamp)
+ return util.AdmissionReviewAllow(ar)
+ }
+ klog.V(4).Infof("check shuffle server pod: %v", utils.BuildShuffleServerKey(pod))
+ rss, err := i.rssLister.RemoteShuffleServices(pod.Namespace).Get(rssName)
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ return util.AdmissionReviewAllow(ar)
+ }
+ return util.AdmissionReviewFailed(ar, err)
+ }
+ // we can only delete shuffle server pods when rss is in upgrading phase.
+ if rss.Status.Phase != unifflev1alpha1.RSSUpgrading && rss.Status.Phase != unifflev1alpha1.RSSTerminating {
+ message := fmt.Sprintf("can not delete the shuffle server pod (%v) directly",
+ utils.UniqueName(pod))
+ klog.V(4).Info(message)
+ return util.AdmissionReviewForbidden(ar, message)
+ }
+
+ // when the rss uses specific upgrade mode, we need to check whether the pod is specific.
+ if rss.Spec.ShuffleServer.UpgradeStrategy.Type == unifflev1alpha1.SpecificUpgrade {
+ specificNames := rss.Spec.ShuffleServer.UpgradeStrategy.SpecificNames
+ isSpecific := false
+ for _, name := range specificNames {
+ if name == pod.Name {
+ isSpecific = true
+ break
+ }
+ }
+ if !isSpecific {
+ message := fmt.Sprintf("can not delete the shuffle server pod (%v) which is not specific",
+ utils.UniqueName(pod))
+ klog.V(4).Info(message)
+ return util.AdmissionReviewForbidden(ar, message)
+ }
+ }
+
+ // update targetKeys field in status of rss and exclude nodes in configMap used by coordinators.
+ if err = i.updateTargetKeysAndExcludeNodes(rss, pod); err != nil {
+ return util.AdmissionReviewFailed(ar, err)
+ }
+
+ // check whether the shuffle server pod can be deleted.
+ if i.ignoreLastApps || util.HasZeroApps(pod) {
+ klog.V(3).Infof("shuffle server pod (%v) will be deleted", utils.BuildShuffleServerKey(pod))
+ return util.AdmissionReviewAllow(ar)
+ }
+ message := "there are some apps still running in shuffle server: " + utils.GetShuffleServerNode(pod)
+ return util.AdmissionReviewForbidden(ar, message)
+}
+
+// updateTargetKeysAndExcludeNodes updates targetKeys field in status of rss and exclude nodes in
+// configMap used by coordinators.
+func (i *inspector) updateTargetKeysAndExcludeNodes(rss *unifflev1alpha1.RemoteShuffleService,
+ pod *corev1.Pod) error {
+ targetKeys := sets.NewString(rss.Status.TargetKeys...)
+ deletedKeys := sets.NewString(rss.Status.DeletedKeys...)
+ currentKey := utils.BuildShuffleServerKey(pod)
+ if deletedKeys.Has(currentKey) {
+ klog.V(4).Infof("pod (%v) has been deleted", currentKey)
+ return nil
+ }
+
+ namespace := rss.Namespace
+ targetKeys.Insert(currentKey)
+ rssCopy := rss.DeepCopy()
+ rssCopy.Status.TargetKeys = utils.GetSortedList(targetKeys)
+ if _, err := i.rssClient.UniffleV1alpha1().RemoteShuffleServices(namespace).
+ UpdateStatus(context.Background(), rssCopy, metav1.UpdateOptions{}); err != nil {
+ klog.Errorf("update target keys in status of rss (%v) failed: %v",
+ utils.UniqueName(rss), err)
+ return err
+ }
+
+ cmName := utils.GenerateCoordinatorName(rss)
+ cm, err := i.cmLister.ConfigMaps(namespace).Get(cmName)
+ if err != nil {
+ klog.Errorf("get configMap (%v/%v) of excluded nodes for rss (%v) failed: %v",
+ namespace, cmName, utils.UniqueName(rss), err)
+ return err
+ }
+
+ excludeNodesFileKey := utils.GetExcludeNodesConfigMapKey(rss)
+ oldNodes := sets.NewString(strings.Split(cm.Data[excludeNodesFileKey], "\n")...)
+ newNodes := oldNodes.Difference(utils.ConvertShuffleServerKeysToNodes(deletedKeys))
+ newNodes.Insert(utils.GetShuffleServerNode(pod))
+ cmCopy := cm.DeepCopy()
+ cmCopy.Data[excludeNodesFileKey] = strings.Join(utils.GetSortedList(newNodes), "\n")
+ if _, err = i.kubeClient.CoreV1().ConfigMaps(namespace).
+ Update(context.Background(), cmCopy, metav1.UpdateOptions{}); err != nil {
+ klog.Errorf("updated exclude nodes in configMap (%v) for rss (%v) failed: %v",
+ utils.UniqueName(cmCopy), utils.UniqueName(rss), err)
+ return err
+ }
+ return nil
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go b/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
new file mode 100644
index 00000000..31bdff1d
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inspector
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "gomodules.xyz/jsonpatch/v2"
+ admissionv1 "k8s.io/api/admission/v1"
+ "k8s.io/klog/v2"
+ "k8s.io/utils/pointer"
+
+ unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/util"
+)
+
+// validateRSS validates the create and update operation towards rss objects.
+func (i *inspector) validateRSS(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview {
+ if !i.ignoreRSS && ar.Request.Operation == admissionv1.Update {
+ oldRSS := &unifflev1alpha1.RemoteShuffleService{}
+ if err := json.Unmarshal(ar.Request.OldObject.Raw, oldRSS); err != nil {
+ klog.Errorf("unmarshal old object of rss (%v) failed: %v",
+ string(ar.Request.OldObject.Raw), err)
+ return util.AdmissionReviewFailed(ar, err)
+ }
+ // for security purposes, we forbid updating rss objects when they are in upgrading phase.
+ if oldRSS.Status.Phase == unifflev1alpha1.RSSUpgrading {
+ message := "can not update upgrading rss object: " + utils.UniqueName(oldRSS)
+ return util.AdmissionReviewForbidden(ar, message)
+ }
+ }
+ newRSS := &unifflev1alpha1.RemoteShuffleService{}
+ if err := json.Unmarshal(ar.Request.Object.Raw, newRSS); err != nil {
+ klog.Errorf("unmarshal object of rss (%v) failed: %v",
+ string(ar.Request.Object.Raw), err)
+ return util.AdmissionReviewFailed(ar, err)
+ }
+ // validate configurations for coordinators.
+ coordinator := newRSS.Spec.Coordinator
+ if len(coordinator.RPCNodePort) != int(*coordinator.Count) ||
+ len(coordinator.HTTPNodePort) != int(*coordinator.Count) {
+ return util.AdmissionReviewFailed(ar,
+ fmt.Errorf("invalid number of http or rpc node ports (%v/%v) <> (%v)",
+ len(coordinator.RPCNodePort), len(coordinator.HTTPNodePort), coordinator.Count))
+ }
+ if len(coordinator.ExcludeNodesFilePath) == 0 {
+ return util.AdmissionReviewFailed(ar,
+ fmt.Errorf("empty exclude nodes file path for coordinators"))
+ }
+ // validate configurations of logHostPath for coordinators.
+ coordinatorLogPath := newRSS.Spec.Coordinator.LogHostPath
+ if len(coordinatorLogPath) > 0 && len(newRSS.Spec.Coordinator.HostPathMounts[coordinatorLogPath]) == 0 {
+ return util.AdmissionReviewFailed(ar, fmt.Errorf("empty log volume mount path for coordinators"))
+ }
+ // validate configurations of logHostPath for shuffle servers.
+ shuffleServerLogPath := newRSS.Spec.ShuffleServer.LogHostPath
+ if len(shuffleServerLogPath) > 0 && len(newRSS.Spec.ShuffleServer.HostPathMounts[shuffleServerLogPath]) == 0 {
+ return util.AdmissionReviewFailed(ar, fmt.Errorf("empty log volume mount path for shuffle servers"))
+ }
+ // validate configurations of different upgrade modes for shuffle servers.
+ upgradeStrategy := newRSS.Spec.ShuffleServer.UpgradeStrategy
+ switch upgradeStrategy.Type {
+ case unifflev1alpha1.FullUpgrade:
+ case unifflev1alpha1.PartitionUpgrade:
+ var err error
+ if upgradeStrategy.Partition == nil {
+ err = fmt.Errorf("empty partition for %v", upgradeStrategy.Type)
+ } else if *upgradeStrategy.Partition <= 0 {
+ err = fmt.Errorf("invalid partition (%v) for %v", *upgradeStrategy.Partition,
+ upgradeStrategy.Type)
+ }
+ if err != nil {
+ return util.AdmissionReviewFailed(ar, err)
+ }
+ case unifflev1alpha1.SpecificUpgrade:
+ if len(upgradeStrategy.SpecificNames) == 0 {
+ return util.AdmissionReviewFailed(ar,
+ fmt.Errorf("empty specific copies for %v", upgradeStrategy.Type))
+ }
+ case unifflev1alpha1.FullRestart:
+ default:
+ return util.AdmissionReviewFailed(ar,
+ fmt.Errorf("invalid upgrade stragety type (%v)", upgradeStrategy.Type))
+ }
+ return util.AdmissionReviewAllow(ar)
+}
+
+// mutateNmg mutates the rss object according to our needs.
+func (i *inspector) mutateRSS(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview {
+ rss := &unifflev1alpha1.RemoteShuffleService{}
+ if err := json.Unmarshal(ar.Request.Object.Raw, rss); err != nil {
+ klog.Errorf("unmarshal object of rss (%v) failed: %v",
+ string(ar.Request.Object.Raw), err)
+ return util.AdmissionReviewFailed(ar, err)
+ }
+ patches, err := generateRSSPatches(ar, rss)
+ if err != nil {
+ klog.Errorf("generate patches for rss (%v) failed: %v", utils.UniqueName(rss), err)
+ return util.AdmissionReviewFailed(ar, err)
+ }
+ // if payload is not empty, we need set patch operations in response.
+ if len(patches) > 0 {
+ return util.AdmissionReviewWithPatches(ar, patches)
+ }
+ return util.AdmissionReviewAllow(ar)
+}
+
+// generateRSSPatches generates patch payloads for mutating rss objects.
+func generateRSSPatches(ar *admissionv1.AdmissionReview,
+ rss *unifflev1alpha1.RemoteShuffleService) ([]byte, error) {
+ // TODO: add default values for RSS objects.
+ if ar.Request.Operation == admissionv1.Create {
+ rss.SetFinalizers([]string{constants.RSSFinalizerName})
+ rss.Spec.ShuffleServer.Sync = pointer.Bool(false)
+ }
+
+ original := ar.Request.Object.Raw
+ current, err := json.Marshal(rss)
+ if err != nil {
+ klog.Errorf("marshal rss (%+v) failed: %v", rss, err)
+ return nil, err
+ }
+ var patches []jsonpatch.Operation
+ // build patch payload form mutating rss objects.
+ patches, err = jsonpatch.CreatePatch(original, current)
+ if err != nil {
+ klog.Errorf("create patches for rss (%v) failed: %v", string(current), err)
+ return nil, err
+ }
+ var patchBody []byte
+ patchBody, err = json.Marshal(patches)
+ if err != nil {
+ klog.Errorf("marshal patches (%+v) for rss (%v) failed: %v",
+ patches, string(current), err)
+ return nil, err
+ }
+ klog.V(4).Infof("patch body (%v) for rss (%v)", string(patchBody), utils.UniqueName(rss))
+ return patchBody, nil
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/manager.go b/deploy/kubernetes/operator/pkg/webhook/manager.go
new file mode 100644
index 00000000..1bcacd41
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/manager.go
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package webhook
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "net"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
+ webhookconstants "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/inspector"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/syncer"
+)
+
+const (
+ certsSecretName = "rss-webhook-certs"
+
+ serverCert = "server.crt"
+ serverKey = "server.key"
+ caCert = "ca.crt"
+)
+
+// AdmissionManager manages admission webhook server for shuffle servers.
+type AdmissionManager interface {
+ manager.Runnable
+}
+
+// NewAdmissionManager creates an AdmissionManager.
+func NewAdmissionManager(cfg *config.Config) AdmissionManager {
+ return newAdmissionManager(cfg)
+}
+
+// newAdmissionManager creates an admissionManager.
+func newAdmissionManager(cfg *config.Config) *admissionManager {
+ am := &admissionManager{
+ externalService: cfg.ExternalService,
+ kubeClient: cfg.KubeClient,
+ }
+ if !cfg.NeedLoadCertsFromSecret() {
+ am.caCertBody = cfg.GetCaCert()
+ am.tlsConfig = cfg.TLSConfig()
+ } else {
+ am.loadCertsFromSecret()
+ }
+ mgr, err := ctrl.NewManager(cfg.RESTConfig, ctrl.Options{
+ LeaderElection: true,
+ LeaderElectionID: cfg.LeaderElectionID(),
+ LeaderElectionNamespace: utils.GetCurrentNamespace(),
+ })
+ if err != nil {
+ klog.Fatalf("build manager for admission manager failed: %v", err)
+ }
+ am.mgr = mgr
+ am.syncer = syncer.NewConfigSyncer(am.caCertBody, cfg.ExternalService, cfg.KubeClient)
+ am.inspector = inspector.NewInspector(cfg, am.tlsConfig)
+ return am
+}
+
+// admissionManager implements the AdmissionManager interface.
+type admissionManager struct {
+ externalService string
+ caCertBody []byte
+ tlsConfig *tls.Config
+
+ kubeClient kubernetes.Interface
+
+ mgr manager.Manager
+ syncer syncer.ConfigSyncer
+ inspector inspector.Inspector
+}
+
+// Start starts the AdmissionManager.
+func (am *admissionManager) Start(ctx context.Context) error {
+ stopCh := ctx.Done()
+ if err := am.mgr.Add(am.syncer); err != nil {
+ klog.Errorf("add syncer to mgr of admission manager failed: %v", err)
+ return err
+ }
+ go func() {
+ if err := am.mgr.Start(ctx); err != nil {
+ klog.Fatalf("mgr of admission manager started failed: %v", err)
+ }
+ }()
+ go func() {
+ if err := am.inspector.Start(ctx); err != nil {
+ klog.Fatalf("start webhook server failed: %v", err)
+ }
+ }()
+ klog.V(2).Info("admission manager started")
+ <-stopCh
+ return nil
+}
+
+// generateCerts generates certificate and privateKey and ca certificate for admission webhook
+// server, and they will be saved in a secret.
+func (am *admissionManager) generateCerts(create bool) (
+ serverCertificate, serverPrivateKey, caCertificate []byte,
+ err error) {
+ var caPrivateKey []byte
+ caPrivateKey, err = utils.SetUpCaKey()
+ if err != nil {
+ klog.Errorf("set up ca key failed %v", err)
+ return nil, nil, nil, err
+ }
+ caCertificate, err = utils.SetUpCaCert(webhookconstants.ComponentName, caPrivateKey)
+ if err != nil {
+ klog.Errorf("set up ca cert failed %v", err)
+ return nil, nil, nil, err
+ }
+ namespace := utils.GetCurrentNamespace()
+ domains, ips := subjectAltNames(namespace, am.externalService)
+ serverCertificate, serverPrivateKey, err = utils.SetUpSignedCertAndKey(domains, ips,
+ webhookconstants.ComponentName,
+ caPrivateKey, caCertificate, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth})
+ if err != nil {
+ klog.Errorf("set up server cert error %v", err)
+ return nil, nil, nil, err
+ }
+ if create {
+ // try to create a new secret to save certificate and privateKey and ca certificate.
+ _, err = am.kubeClient.CoreV1().Secrets(namespace).Create(context.Background(),
+ &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: certsSecretName,
+ Namespace: namespace,
+ },
+ Data: map[string][]byte{
+ serverCert: serverCertificate,
+ serverKey: serverPrivateKey,
+ caCert: caCertificate,
+ },
+ }, metav1.CreateOptions{})
+ if err != nil {
+ klog.Errorf("create new certificate secret error %v", err)
+ return nil, nil, nil, err
+ }
+ } else {
+ // try to update an old secret to save certificate and privateKey and ca certificate.
+ if err = utils.UpdateSecret(am.kubeClient, namespace, certsSecretName,
+ func(secret *corev1.Secret) {
+ secret.Data = map[string][]byte{
+ serverCert: serverCertificate,
+ serverKey: serverPrivateKey,
+ caCert: caCertificate,
+ }
+ }); err != nil {
+ return nil, nil, nil, err
+ }
+ }
+ return caCertificate, serverCertificate, serverPrivateKey, nil
+}
+
+// loadCertsFromSecret loads certificate and privateKey and ca certificate from the secret.
+func (am *admissionManager) loadCertsFromSecret() {
+ namespace := utils.GetCurrentNamespace()
+ create := false
+ secret, err := am.kubeClient.CoreV1().Secrets(namespace).Get(context.Background(),
+ certsSecretName, metav1.GetOptions{})
+ if err != nil {
+ if !errors.IsNotFound(err) {
+ klog.Fatalf("get secret of %v/%v failed: %v", namespace, certsSecretName, err)
+ }
+ create = true
+ }
+ var serverCertBody, serverKeyBody, caCertBody []byte
+ if secret == nil || secret.Data == nil || len(secret.Data[serverCert]) == 0 || len(secret.Data[serverKey]) == 0 ||
+ len(secret.Data[caCert]) == 0 {
+ caCertBody, serverCertBody, serverKeyBody, err = am.generateCerts(create)
+ if err != nil {
+ klog.Fatalf("generate certs failed: %v", err)
+ }
+ } else {
+ caCertBody = secret.Data[caCert]
+ serverCertBody = secret.Data[serverCert]
+ serverKeyBody = secret.Data[serverKey]
+ }
+ am.caCertBody = caCertBody
+ cert, err := tls.X509KeyPair(serverCertBody, serverKeyBody)
+ if err != nil {
+ klog.Fatalf("generate key pair error :%v", err)
+ }
+ am.tlsConfig = &tls.Config{
+ Certificates: []tls.Certificate{cert},
+ }
+}
+
+// subjectAltNames builds subject alt names by namespace and service name.
+func subjectAltNames(namespace, svcName string) ([]string, []net.IP) {
+ return []string{
+ "localhost",
+ svcName,
+ fmt.Sprintf("%v.%v.svc", svcName, namespace),
+ fmt.Sprintf("%v.%v.svc.cluster.local", svcName, namespace),
+ }, []net.IP{net.ParseIP("127.0.0.1")}
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/manager_test.go b/deploy/kubernetes/operator/pkg/webhook/manager_test.go
new file mode 100644
index 00000000..55d75230
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/manager_test.go
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package webhook
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ admissionv1 "k8s.io/api/admissionregistration/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "sigs.k8s.io/controller-runtime/pkg/envtest"
+ logf "sigs.k8s.io/controller-runtime/pkg/log"
+ "sigs.k8s.io/controller-runtime/pkg/log/zap"
+ "sigs.k8s.io/controller-runtime/pkg/manager/signals"
+
+ unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
+ webhookconstants "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+)
+
+var (
+ testEnv *envtest.Environment
+ kubeClient kubernetes.Interface
+ rssClient versioned.Interface
+)
+
+func TestAdmissionManager(t *testing.T) {
+ _ = os.Setenv(constants.PodNamespaceEnv, constants.DefaultNamespace)
+ RegisterFailHandler(Fail)
+ suiteCfg, reporterCfg := GinkgoConfiguration()
+ reporterCfg.VeryVerbose = true
+ reporterCfg.FullTrace = true
+ RunSpecs(t, "admission manager suite", suiteCfg, reporterCfg)
+}
+
+var _ = BeforeSuite(
+ func() {
+ logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
+ By("bootstrapping test environment")
+ testEnv = &envtest.Environment{
+ CRDDirectoryPaths: []string{filepath.Join("../..", "config", "crd", "bases")},
+ }
+ restConfig, err := testEnv.Start()
+ Expect(err).To(BeNil())
+ Expect(restConfig).ToNot(BeNil())
+
+ kubeClient, err = kubernetes.NewForConfig(restConfig)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(kubeClient).ToNot(BeNil())
+
+ rssClient, err = versioned.NewForConfig(restConfig)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(rssClient).ToNot(BeNil())
+
+ err = unifflev1alpha1.AddToScheme(scheme.Scheme)
+ Expect(err).NotTo(HaveOccurred())
+
+ // +kubebuilder:scaffold:scheme
+
+ cfg := &config.Config{
+ HTTPConfig: config.HTTPConfig{
+ Port: 9876,
+ ExternalService: webhookconstants.ComponentName,
+ },
+ GenericConfig: utils.GenericConfig{
+ RESTConfig: restConfig,
+ KubeClient: kubeClient,
+ RSSClient: rssClient,
+ },
+ }
+ am := newAdmissionManager(cfg)
+ stopCtx := signals.SetupSignalHandler()
+ go func() {
+ err = am.Start(stopCtx)
+ Expect(err).ToNot(HaveOccurred())
+ }()
+ },
+)
+
+var _ = AfterSuite(func() {
+ Expect(testEnv.Stop()).To(Succeed())
+})
+
+var _ = Describe("AdmissionManager", func() {
+ Context("Setup syncer", func() {
+ It("Generate validation and webhook configurations", func() {
+ By("Wait validation configurations synced")
+ var vwc *admissionv1.ValidatingWebhookConfiguration
+ err := wait.Poll(time.Second, time.Second*5, func() (bool, error) {
+ var getErr error
+ vwc, getErr = kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
+ Get(context.TODO(), webhookconstants.ComponentName, metav1.GetOptions{})
+ if getErr != nil {
+ return false, getErr
+ }
+ return true, nil
+ })
+ Expect(err).ToNot(HaveOccurred())
+ Expect(vwc).ToNot(BeNil())
+
+ By("Wait mutating configurations synced")
+ var mwc *admissionv1.MutatingWebhookConfiguration
+ err = wait.Poll(time.Second, time.Second*5, func() (bool, error) {
+ var getErr error
+ mwc, getErr = kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
+ Get(context.TODO(), webhookconstants.ComponentName, metav1.GetOptions{})
+ if getErr != nil {
+ return false, getErr
+ }
+ return true, nil
+ })
+ Expect(err).ToNot(HaveOccurred())
+ Expect(mwc).ToNot(BeNil())
+ })
+ })
+})
diff --git a/deploy/kubernetes/operator/pkg/webhook/syncer/syncer.go b/deploy/kubernetes/operator/pkg/webhook/syncer/syncer.go
new file mode 100644
index 00000000..86380257
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/syncer/syncer.go
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package syncer
+
+import (
+ "context"
+ "reflect"
+ "time"
+
+ "golang.org/x/sync/errgroup"
+ admissionv1 "k8s.io/api/admissionregistration/v1"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+ "k8s.io/utils/pointer"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+
+ unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+ webhookconstants "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+)
+
+var _ ConfigSyncer = &configSyncer{}
+
+// ConfigSyncer syncs ValidatingWebhookConfigurations and MutatingWebhookConfigurations.
+type ConfigSyncer interface {
+ manager.Runnable
+}
+
+// NewConfigSyncer creates a ConfigSyncer.
+func NewConfigSyncer(caCert []byte, externalService string,
+ kubeClient kubernetes.Interface) ConfigSyncer {
+ return newConfigSyncer(caCert, externalService, kubeClient)
+}
+
+// newConfigSyncer creates a configSyncer.
+func newConfigSyncer(caCert []byte, externalService string,
+ kubeClient kubernetes.Interface) *configSyncer {
+ return &configSyncer{
+ caCert: caCert,
+ externalService: externalService,
+ kubeClient: kubeClient,
+ }
+}
+
+// configSyncer implements the ConfigSyncer interface.
+type configSyncer struct {
+ caCert []byte
+ externalService string
+ kubeClient kubernetes.Interface
+}
+
+// Start starts the ConfigSyncer.
+func (cs *configSyncer) Start(ctx context.Context) error {
+ klog.V(2).Info("config syncer started")
+ for {
+ select {
+ case <-ctx.Done():
+ klog.V(3).Info("stop syncing webhook configurations")
+ return nil
+ default:
+ }
+ if err := cs.syncWebhookCfg(); err != nil {
+ klog.Errorf("sync webhook configuration failed: %v", err)
+ }
+ time.Sleep(time.Minute)
+ }
+}
+
+// syncWebhookCfg synchronizes the validatingWebhookConfiguration and mutatingWebhookConfiguration objects.
+func (cs *configSyncer) syncWebhookCfg() error {
+ currentVWC, currentMWC := cs.generateWebhookCfg()
+ eg := errgroup.Group{}
+ eg.Go(func() error {
+ return cs.syncValidatingWebhookCfg(currentVWC)
+ })
+ eg.Go(func() error {
+ return cs.syncMutatingWebhookCfg(currentMWC)
+ })
+ return eg.Wait()
+}
+
+// syncValidatingWebhookCfg synchronizes the validatingWebhookConfiguration object.
+func (cs *configSyncer) syncValidatingWebhookCfg(
+ currentVWC *admissionv1.ValidatingWebhookConfiguration) error {
+ vwc, err := cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
+ Get(context.Background(), webhookconstants.ComponentName, metav1.GetOptions{})
+ if err != nil {
+ if errors.IsNotFound(err) {
+ _, err = cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
+ Create(context.Background(), currentVWC, metav1.CreateOptions{})
+ }
+ return err
+ }
+ if reflect.DeepEqual(vwc.Webhooks, currentVWC.Webhooks) {
+ return nil
+ }
+ vwc.Webhooks = currentVWC.Webhooks
+ _, err = cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
+ Update(context.Background(), vwc, metav1.UpdateOptions{})
+ return err
+}
+
+// syncMutatingWebhookCfg synchronizes the mutatingWebhookConfiguration object.
+func (cs *configSyncer) syncMutatingWebhookCfg(
+ currentMWC *admissionv1.MutatingWebhookConfiguration) error {
+ vwc, err := cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
+ Get(context.Background(), webhookconstants.ComponentName, metav1.GetOptions{})
+ if err != nil {
+ if errors.IsNotFound(err) {
+ _, err = cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
+ Create(context.Background(), currentMWC, metav1.CreateOptions{})
+ }
+ return err
+ }
+ if reflect.DeepEqual(vwc.Webhooks, currentMWC.Webhooks) {
+ return nil
+ }
+ vwc.Webhooks = currentMWC.Webhooks
+ _, err = cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
+ Update(context.Background(), vwc, metav1.UpdateOptions{})
+ return err
+}
+
+// generateWebhookCfg generates the validatingWebhookConfiguration and mutatingWebhookConfiguration objects.
+func (cs *configSyncer) generateWebhookCfg() (
+ *admissionv1.ValidatingWebhookConfiguration, *admissionv1.MutatingWebhookConfiguration) {
+ validatingWebhooks := cs.generateValidatingWebhooks()
+ mutatingWebhooks := cs.generateMutatingWebhooks()
+ return &admissionv1.ValidatingWebhookConfiguration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: webhookconstants.ComponentName,
+ },
+ Webhooks: validatingWebhooks,
+ }, &admissionv1.MutatingWebhookConfiguration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: webhookconstants.ComponentName,
+ },
+ Webhooks: mutatingWebhooks,
+ }
+}
+
+// generateValidatingWebhooks generates validatingWebhooks of validatingWebhookConfiguration.
+func (cs *configSyncer) generateValidatingWebhooks() []admissionv1.ValidatingWebhook {
+ failurePolicy := admissionv1.Fail
+ sideEffects := admissionv1.SideEffectClassNone
+ return []admissionv1.ValidatingWebhook{
+ {
+ Name: "webhook.for.shuffle.server",
+ Rules: []admissionv1.RuleWithOperations{
+ {
+ Operations: []admissionv1.OperationType{admissionv1.Delete},
+ Rule: admissionv1.Rule{
+ APIGroups: []string{corev1.GroupName},
+ APIVersions: []string{"v1"},
+ Resources: []string{"pods"},
+ },
+ },
+ },
+ FailurePolicy: &failurePolicy,
+ ClientConfig: admissionv1.WebhookClientConfig{
+ CABundle: cs.caCert,
+ Service: &admissionv1.ServiceReference{
+ Name: cs.externalService,
+ Namespace: utils.GetCurrentNamespace(),
+ Path: pointer.StringPtr(webhookconstants.ValidatingPodPath),
+ },
+ },
+ ObjectSelector: &metav1.LabelSelector{
+ MatchLabels: map[string]string{
+ constants.LabelShuffleServer: "true",
+ },
+ },
+ SideEffects: &sideEffects,
+ AdmissionReviewVersions: []string{"v1", "v1beta1"},
+ TimeoutSeconds: pointer.Int32(30),
+ },
+ {
+ Name: "webhook.for.rss",
+ Rules: buildRules(),
+ FailurePolicy: &failurePolicy,
+ ClientConfig: admissionv1.WebhookClientConfig{
+ CABundle: cs.caCert,
+ Service: &admissionv1.ServiceReference{
+ Name: cs.externalService,
+ Namespace: utils.GetCurrentNamespace(),
+ Path: pointer.StringPtr(webhookconstants.ValidatingRssPath),
+ },
+ },
+ SideEffects: &sideEffects,
+ AdmissionReviewVersions: []string{"v1", "v1beta1"},
+ },
+ }
+}
+
+// generateMutatingWebhooks generates mutatingWebhooks of mutatingWebhookConfiguration.
+func (cs *configSyncer) generateMutatingWebhooks() []admissionv1.MutatingWebhook {
+ failurePolicy := admissionv1.Fail
+ sideEffects := admissionv1.SideEffectClassNone
+ return []admissionv1.MutatingWebhook{
+ {
+ Name: "webhook.for.rss",
+ Rules: buildRules(),
+ FailurePolicy: &failurePolicy,
+ ClientConfig: admissionv1.WebhookClientConfig{
+ CABundle: cs.caCert,
+ Service: &admissionv1.ServiceReference{
+ Name: cs.externalService,
+ Namespace: utils.GetCurrentNamespace(),
+ Path: pointer.StringPtr(webhookconstants.MutatingRssPath),
+ },
+ },
+ SideEffects: &sideEffects,
+ AdmissionReviewVersions: []string{"v1", "v1beta1"},
+ },
+ }
+}
+
+func buildRules() []admissionv1.RuleWithOperations {
+ return []admissionv1.RuleWithOperations{
+ {
+ Operations: []admissionv1.OperationType{admissionv1.Create, admissionv1.Update},
+ Rule: admissionv1.Rule{
+ APIGroups: []string{unifflev1alpha1.SchemeGroupVersion.Group},
+ APIVersions: []string{unifflev1alpha1.SchemeGroupVersion.Version},
+ Resources: []string{"remoteshuffleservices"},
+ },
+ },
+ }
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/util/patch.go b/deploy/kubernetes/operator/pkg/webhook/util/patch.go
new file mode 100644
index 00000000..57f18abb
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/util/patch.go
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+// PatchOperation defines the information of a patch's operation.
+type PatchOperation struct {
+ Op string `json:"op"`
+ Path string `json:"path"`
+ Value interface{} `json:"value,omitempty"`
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/util/util.go b/deploy/kubernetes/operator/pkg/webhook/util/util.go
new file mode 100644
index 00000000..f1425809
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/util/util.go
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "time"
+
+ "github.com/parnurzeal/gorequest"
+ admissionv1 "k8s.io/api/admission/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/serializer"
+ "k8s.io/klog/v2"
+
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+ "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+var (
+ // RuntimeScheme defines methods for serializing and deserializing API objects
+ runtimeScheme = runtime.NewScheme()
+ // Codecs serializers for specific versions and content types
+ codecs = serializer.NewCodecFactory(runtimeScheme)
+ // Deserializer attempts to load an object from data
+ deserializer = codecs.UniversalDeserializer()
+)
+
+// AdmissionReviewHandler handles AdmissionReviews and set response in them.
+type AdmissionReviewHandler func(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview
+
+// handleResponse write message to http response.
+func handleResponse(w http.ResponseWriter, status int, message string) {
+ w.WriteHeader(status)
+ if _, err := w.Write([]byte(message)); err != nil {
+ klog.Errorf("write message (%v) failed: %v", message, err)
+ }
+}
+
+// AdmissionReviewFailed returns error for the AdmissionReview.
+func AdmissionReviewFailed(ar *admissionv1.AdmissionReview,
+ err error) *admissionv1.AdmissionReview {
+ ar.Response = &admissionv1.AdmissionResponse{
+ UID: ar.Request.UID,
+ Result: &metav1.Status{
+ Message: fmt.Sprintf("handle admission review failed: %v", err),
+ },
+ }
+ return ar
+}
+
+// AdmissionReviewAllow allows the AdmissionReview.
+func AdmissionReviewAllow(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview {
+ ar.Response = &admissionv1.AdmissionResponse{
+ UID: ar.Request.UID,
+ Allowed: true,
+ }
+ return ar
+}
+
+// AdmissionReviewForbidden forbids the AdmissionReview with delete operation.
+func AdmissionReviewForbidden(ar *admissionv1.AdmissionReview,
+ message string) *admissionv1.AdmissionReview {
+ ar.Response = &admissionv1.AdmissionResponse{
+ UID: ar.Request.UID,
+ Result: &metav1.Status{
+ Message: message,
+ },
+ }
+ return ar
+}
+
+// AdmissionReviewWithPatches returns the AdmissionReview with patches in response.
+func AdmissionReviewWithPatches(ar *admissionv1.AdmissionReview,
+ patches []byte) *admissionv1.AdmissionReview {
+ ar.Response = &admissionv1.AdmissionResponse{
+ UID: ar.Request.UID,
+ Allowed: true,
+ Patch: patches,
+ PatchType: func() *admissionv1.PatchType {
+ pt := admissionv1.PatchTypeJSONPatch
+ return &pt
+ }(),
+ }
+ return ar
+}
+
+// WithAdmissionReviewHandler checks before InspectorFunc executes and creates a handleFunc.
+func WithAdmissionReviewHandler(handler AdmissionReviewHandler) http.HandlerFunc {
+ return func(w http.ResponseWriter, req *http.Request) {
+ if req.Body == nil {
+ klog.Error("Receive an invalid ar, body is empty")
+ handleResponse(w, http.StatusBadRequest, "ar body required")
+ return
+ }
+
+ data, err := ioutil.ReadAll(req.Body)
+ if err != nil {
+ klog.Errorf("Read ar body failed: %v", err)
+ handleResponse(w, http.StatusInternalServerError,
+ fmt.Sprintf("read ar body failed: %v", err))
+ return
+ }
+
+ ar := &admissionv1.AdmissionReview{}
+ if _, _, err = deserializer.Decode(data, nil, ar); err != nil {
+ klog.Errorf("Parse ar body failed: %s, %v", string(data), err)
+ handleResponse(w, http.StatusBadRequest, fmt.Sprintf("parse ar failed: %v", err))
+ return
+ }
+ klog.V(4).Infof("receive request: %v/%v/%v from %+v, verb: %+v",
+ ar.Request.Namespace, ar.Request.Name, ar.Request.UID, ar.Request.UserInfo,
+ ar.Request.Operation)
+ var respBytes []byte
+ respBytes, err = json.Marshal(handler(ar))
+ if err != nil {
+ handleResponse(w, http.StatusInternalServerError,
+ fmt.Sprintf("marshal response failed: %v", err))
+ return
+ }
+ if _, err := w.Write(respBytes); err != nil {
+ klog.Errorf("Send response failed: %v", err)
+ }
+ }
+}
+
+// NeedInspectPod returns whether we need to inspect the pod.
+func NeedInspectPod(pod *corev1.Pod) bool {
+ if pod.DeletionTimestamp != nil || pod.Labels == nil {
+ return false
+ }
+ if val, ok := pod.Labels[constants.LabelShuffleServer]; ok && val == "true" {
+ return true
+ }
+ return false
+}
+
+// MetricItem records an item of metric information of shuffle servers.
+type MetricItem struct {
+ Name string `json:"name"`
+ LabelNames []string `json:"labelNames"`
+ LabelValues []string `json:"labelValues"`
+ Value float32 `json:"value"`
+}
+
+// MetricList records all items of metric information of shuffle servers.
+type MetricList struct {
+ Metrics []*MetricItem `json:"metrics"`
+ TimeStamp int64 `json:"timestamp"`
+}
+
+func getLastAppNum(body []byte) (int, error) {
+ resp := &MetricList{}
+ if err := json.Unmarshal(body, resp); err != nil {
+ klog.Errorf("unmarshal body (%v) failed: %v", string(body), err)
+ return 0, err
+ }
+ for i := range resp.Metrics {
+ if resp.Metrics[i].Name == "app_num_with_node" {
+ return int(resp.Metrics[i].Value), nil
+ }
+ }
+ return 0, nil
+}
+
+// HasZeroApps returns whether there are zero apps in the shuffle server pod.
+func HasZeroApps(pod *corev1.Pod) bool {
+ port := utils.GetMetricsServerPort(pod)
+ if len(port) == 0 {
+ return true
+ }
+ if pod.Status.Phase != corev1.PodRunning {
+ return true
+ }
+ url := fmt.Sprintf("http://%v:%v/metrics/server", pod.Status.PodIP, port)
+ req := gorequest.New().Timeout(time.Second * 15).Get(url).Type("json")
+ resp, body, errs := req.EndBytes()
+ if len(errs) > 0 {
+ klog.Errorf("send metrics server request failed: %v->%+v", url, errs)
+ return true
+ }
+ if resp.StatusCode != http.StatusOK {
+ klog.Errorf("heartbeat response failed: invalid status (%v->%v)", url, resp.Status)
+ return false
+ }
+ if num, err := getLastAppNum(body); err != nil {
+ klog.Errorf("get last app number of (%v) failed: %v", pod.Spec.NodeName, err)
+ return false
+ } else if num > 0 {
+ klog.V(4).Infof("last %v apps in node %v", num, pod.Spec.NodeName)
+ return false
+ }
+ return true
+}