You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/09/24 10:00:57 UTC
[apisix-ingress-controller] branch master updated: feat: add full
compare when ingress startup (#680)
This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 957c315 feat: add full compare when ingress startup (#680)
957c315 is described below
commit 957c31522e1b1e5f8ef9cab7eb244473a4e0f675
Author: kv <gx...@163.com>
AuthorDate: Fri Sep 24 18:00:46 2021 +0800
feat: add full compare when ingress startup (#680)
---
pkg/ingress/compare.go | 246 ++++++++++++++++++++++++++++++++++++++++++
pkg/ingress/controller.go | 7 ++
pkg/ingress/pod.go | 3 +-
test/e2e/go.mod | 1 -
test/e2e/ingress/compare.go | 74 +++++++++++++
test/e2e/scaffold/ingress.go | 18 ++++
test/e2e/scaffold/scaffold.go | 6 +-
7 files changed, 350 insertions(+), 5 deletions(-)
diff --git a/pkg/ingress/compare.go b/pkg/ingress/compare.go
new file mode 100644
index 0000000..badad42
--- /dev/null
+++ b/pkg/ingress/compare.go
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+ "context"
+ "sync"
+
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ "github.com/apache/apisix-ingress-controller/pkg/log"
+)
+
+// CompareResources used to compare the object IDs in resources and APISIX
+// Find out the rest of objects in APISIX
+// AND warn them in log.
+func (c *Controller) CompareResources(ctx context.Context) error {
+ var (
+ wg sync.WaitGroup
+ routeMapK8S = new(sync.Map)
+ streamRouteMapK8S = new(sync.Map)
+ upstreamMapK8S = new(sync.Map)
+ sslMapK8S = new(sync.Map)
+ consumerMapK8S = new(sync.Map)
+
+ routeMapA6 = make(map[string]string)
+ streamRouteMapA6 = make(map[string]string)
+ upstreamMapA6 = make(map[string]string)
+ sslMapA6 = make(map[string]string)
+ consumerMapA6 = make(map[string]string)
+ )
+ // watchingNamespace == nil means to monitor all namespaces
+ if c.watchingNamespace == nil {
+ opts := v1.ListOptions{}
+ // list all namespaces
+ nsList, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
+ if err != nil {
+ log.Error(err.Error())
+ ctx.Done()
+ } else {
+ wns := make(map[string]struct{}, len(nsList.Items))
+ for _, v := range nsList.Items {
+ wns[v.Name] = struct{}{}
+ }
+ c.watchingNamespace = wns
+ }
+ }
+ if len(c.watchingNamespace) > 0 {
+ wg.Add(len(c.watchingNamespace))
+ }
+ for ns := range c.watchingNamespace {
+ go func(ns string) {
+ defer wg.Done()
+ // ApisixRoute
+ opts := v1.ListOptions{}
+ retRoutes, err := c.kubeClient.APISIXClient.ApisixV2beta1().ApisixRoutes(ns).List(ctx, opts)
+ if err != nil {
+ log.Error(err.Error())
+ ctx.Done()
+ } else {
+ for _, r := range retRoutes.Items {
+ tc, err := c.translator.TranslateRouteV2beta1NotStrictly(&r)
+ if err != nil {
+ log.Error(err.Error())
+ ctx.Done()
+ } else {
+ // routes
+ for _, route := range tc.Routes {
+ routeMapK8S.Store(route.ID, route.ID)
+ }
+ // streamRoutes
+ for _, stRoute := range tc.StreamRoutes {
+ streamRouteMapK8S.Store(stRoute.ID, stRoute.ID)
+ }
+ // upstreams
+ for _, upstream := range tc.Upstreams {
+ upstreamMapK8S.Store(upstream.ID, upstream.ID)
+ }
+ // ssl
+ for _, ssl := range tc.SSL {
+ sslMapK8S.Store(ssl.ID, ssl.ID)
+ }
+ }
+ }
+ }
+ // todo ApisixUpstream
+ // ApisixUpstream should be synced with ApisixRoute resource
+
+ // ApisixSSL
+ retSSL, err := c.kubeClient.APISIXClient.ApisixV1().ApisixTlses(ns).List(ctx, opts)
+ if err != nil {
+ log.Error(err.Error())
+ ctx.Done()
+ } else {
+ for _, s := range retSSL.Items {
+ ssl, err := c.translator.TranslateSSL(&s)
+ if err != nil {
+ log.Error(err.Error())
+ ctx.Done()
+ } else {
+ sslMapK8S.Store(ssl.ID, ssl.ID)
+ }
+ }
+ }
+ // ApisixConsumer
+ retConsumer, err := c.kubeClient.APISIXClient.ApisixV2alpha1().ApisixConsumers(ns).List(ctx, opts)
+ if err != nil {
+ log.Error(err.Error())
+ ctx.Done()
+ } else {
+ for _, con := range retConsumer.Items {
+ consumer, err := c.translator.TranslateApisixConsumer(&con)
+ if err != nil {
+ log.Error(err.Error())
+ ctx.Done()
+ } else {
+ consumerMapK8S.Store(consumer.Username, consumer.Username)
+ }
+ }
+ }
+ }(ns)
+ }
+ wg.Wait()
+
+ // 2.get all cache routes
+ if err := c.listRouteCache(ctx, routeMapA6); err != nil {
+ return err
+ }
+ if err := c.listStreamRouteCache(ctx, streamRouteMapA6); err != nil {
+ return err
+ }
+ if err := c.listUpstreamCache(ctx, upstreamMapA6); err != nil {
+ return err
+ }
+ if err := c.listSSLCache(ctx, sslMapA6); err != nil {
+ return err
+ }
+ if err := c.listConsumerCache(ctx, consumerMapA6); err != nil {
+ return err
+ }
+ // 3.compare
+ routeReult := findRedundant(routeMapA6, routeMapK8S)
+ streamRouteReult := findRedundant(streamRouteMapA6, streamRouteMapK8S)
+ upstreamReult := findRedundant(upstreamMapA6, upstreamMapK8S)
+ sslReult := findRedundant(sslMapA6, sslMapK8S)
+ consuemrReult := findRedundant(consumerMapA6, consumerMapK8S)
+ // 4.warn
+ warnRedundantResources(routeReult, "route")
+ warnRedundantResources(streamRouteReult, "streamRoute")
+ warnRedundantResources(upstreamReult, "upstream")
+ warnRedundantResources(sslReult, "ssl")
+ warnRedundantResources(consuemrReult, "consumer")
+
+ return nil
+}
+
+// log warn
+func warnRedundantResources(resources map[string]string, t string) {
+ for k := range resources {
+ log.Warnf("%s: %s in APISIX but do not in declare yaml", t, k)
+ }
+}
+
+// findRedundant find redundant item which in src and do not in dest
+func findRedundant(src map[string]string, dest *sync.Map) map[string]string {
+ result := make(map[string]string)
+ for k, v := range src {
+ _, ok := dest.Load(k)
+ if !ok {
+ result[k] = v
+ }
+ }
+ return result
+}
+
+func (c *Controller) listRouteCache(ctx context.Context, routeMapA6 map[string]string) error {
+ routesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Route().List(ctx)
+ if err != nil {
+ return err
+ } else {
+ for _, ra := range routesInA6 {
+ routeMapA6[ra.ID] = ra.ID
+ }
+ }
+ return nil
+}
+
+func (c *Controller) listStreamRouteCache(ctx context.Context, streamRouteMapA6 map[string]string) error {
+ streamRoutesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).StreamRoute().List(ctx)
+ if err != nil {
+ return err
+ } else {
+ for _, ra := range streamRoutesInA6 {
+ streamRouteMapA6[ra.ID] = ra.ID
+ }
+ }
+ return nil
+}
+
+func (c *Controller) listUpstreamCache(ctx context.Context, upstreamMapA6 map[string]string) error {
+ upstreamsInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Upstream().List(ctx)
+ if err != nil {
+ return err
+ } else {
+ for _, ra := range upstreamsInA6 {
+ upstreamMapA6[ra.ID] = ra.ID
+ }
+ }
+ return nil
+}
+
+func (c *Controller) listSSLCache(ctx context.Context, sslMapA6 map[string]string) error {
+ sslInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).SSL().List(ctx)
+ if err != nil {
+ return err
+ } else {
+ for _, s := range sslInA6 {
+ sslMapA6[s.ID] = s.ID
+ }
+ }
+ return nil
+}
+
+func (c *Controller) listConsumerCache(ctx context.Context, consumerMapA6 map[string]string) error {
+ consumerInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Consumer().List(ctx)
+ if err != nil {
+ return err
+ } else {
+ for _, con := range consumerInA6 {
+ consumerMapA6[con.Username] = con.Username
+ }
+ }
+ return nil
+}
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 0b83a8e..b3312d0 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -402,6 +402,12 @@ func (c *Controller) run(ctx context.Context) {
c.initWhenStartLeading()
+ // compare resources of k8s with objects of APISIX
+ if err = c.CompareResources(ctx); err != nil {
+ ctx.Done()
+ return
+ }
+
c.goAttach(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
@@ -418,6 +424,7 @@ func (c *Controller) run(ctx context.Context) {
c.ingressInformer.Run(ctx.Done())
})
c.goAttach(func() {
+
c.apisixRouteInformer.Run(ctx.Done())
})
c.goAttach(func() {
diff --git a/pkg/ingress/pod.go b/pkg/ingress/pod.go
index a5b1cd2..efaf881 100644
--- a/pkg/ingress/pod.go
+++ b/pkg/ingress/pod.go
@@ -89,7 +89,8 @@ func (c *podController) onUpdate(_, cur interface{}) {
return
}
log.Debugw("pod update event arrived",
- zap.Any("final state", pod),
+ zap.Any("pod namespace", pod.Namespace),
+ zap.Any("pod name", pod.Name),
)
if pod.DeletionTimestamp != nil {
if err := c.controller.podCache.Delete(pod); err != nil {
diff --git a/test/e2e/go.mod b/test/e2e/go.mod
index 7e9fd40..4c48e5e 100644
--- a/test/e2e/go.mod
+++ b/test/e2e/go.mod
@@ -8,7 +8,6 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/gruntwork-io/terratest v0.32.8
github.com/onsi/ginkgo v1.16.4
- github.com/onsi/gomega v1.10.1
github.com/stretchr/testify v1.7.0
k8s.io/api v0.21.1
k8s.io/apimachinery v0.21.1
diff --git a/test/e2e/ingress/compare.go b/test/e2e/ingress/compare.go
new file mode 100644
index 0000000..ce57855
--- /dev/null
+++ b/test/e2e/ingress/compare.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/onsi/ginkgo"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("Testing compare resources", func() {
+ opts := &scaffold.Options{
+ Name: "default",
+ Kubeconfig: scaffold.GetKubeconfig(),
+ APISIXConfigPath: "testdata/apisix-gw-config.yaml",
+ IngressAPISIXReplicas: 1,
+ HTTPBinServicePort: 80,
+ APISIXRouteVersion: "apisix.apache.org/v2beta1",
+ }
+ s := scaffold.NewScaffold(opts)
+ ginkgo.It("Compare and find out the redundant objects in APISIX, and remove them", func() {
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ http:
+ - name: rule1
+ match:
+ hosts:
+ - httpbin.com
+ paths:
+ - /ip
+ backend:
+ serviceName: %s
+ servicePort: %d
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute))
+
+ err := s.EnsureNumApisixRoutesCreated(1)
+ assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+ err = s.EnsureNumApisixUpstreamsCreated(1)
+ assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams")
+ // scale Ingres Controller --replicas=0
+ assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(0), "scaling ingress controller instances = 0")
+ // remove ApisixRoute resource
+ assert.Nil(ginkgo.GinkgoT(), s.RemoveResourceByString(apisixRoute))
+ // scale Ingres Controller --replicas=1
+ assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(1), "scaling ingress controller instances = 1")
+ time.Sleep(15 * time.Second)
+ // should find the warn log
+ output := s.GetDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
+ fmt.Println(output)
+ assert.Contains(ginkgo.GinkgoT(), output, "in APISIX but do not in declare yaml")
+ })
+})
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 33e732a..714cf3d 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -19,6 +19,7 @@ import (
"context"
"encoding/base64"
"fmt"
+ "time"
"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/onsi/ginkgo"
@@ -452,3 +453,20 @@ func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test",
})
}
+
+// ScaleIngressController scales the number of Ingress Controller pods to desired.
+func (s *Scaffold) ScaleIngressController(desired int) error {
+ var ingressDeployment string
+ if s.opts.EnableWebhooks {
+ ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
+ } else {
+ ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
+ }
+ if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressDeployment); err != nil {
+ return err
+ }
+ if err := k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, s.labelSelector("app=ingress-apisix-controller-deployment-e2e-test"), desired, 5, 5*time.Second); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index bde3e8f..865c072 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -339,12 +339,12 @@ func (s *Scaffold) afterEach() {
_, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
}
// Get the logs of apisix
- output = s.getDeploymentLogs("apisix-deployment-e2e-test")
+ output = s.GetDeploymentLogs("apisix-deployment-e2e-test")
if output != "" {
_, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
}
// Get the logs of ingress
- output = s.getDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
+ output = s.GetDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
if output != "" {
_, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
}
@@ -362,7 +362,7 @@ func (s *Scaffold) afterEach() {
time.Sleep(3 * time.Second)
}
-func (s *Scaffold) getDeploymentLogs(name string) string {
+func (s *Scaffold) GetDeploymentLogs(name string) string {
cli, err := k8s.GetKubernetesClientE(s.t)
if err != nil {
assert.Nilf(ginkgo.GinkgoT(), err, "get client error: %s", err.Error())