You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/09/24 10:00:57 UTC

[apisix-ingress-controller] branch master updated: feat: add full compare when ingress startup (#680)

This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 957c315  feat: add full compare when ingress startup (#680)
957c315 is described below

commit 957c31522e1b1e5f8ef9cab7eb244473a4e0f675
Author: kv <gx...@163.com>
AuthorDate: Fri Sep 24 18:00:46 2021 +0800

    feat: add full compare when ingress startup (#680)
---
 pkg/ingress/compare.go        | 246 ++++++++++++++++++++++++++++++++++++++++++
 pkg/ingress/controller.go     |   7 ++
 pkg/ingress/pod.go            |   3 +-
 test/e2e/go.mod               |   1 -
 test/e2e/ingress/compare.go   |  74 +++++++++++++
 test/e2e/scaffold/ingress.go  |  18 ++++
 test/e2e/scaffold/scaffold.go |   6 +-
 7 files changed, 350 insertions(+), 5 deletions(-)

diff --git a/pkg/ingress/compare.go b/pkg/ingress/compare.go
new file mode 100644
index 0000000..badad42
--- /dev/null
+++ b/pkg/ingress/compare.go
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+	"context"
+	"sync"
+
+	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	"github.com/apache/apisix-ingress-controller/pkg/log"
+)
+
+// CompareResources used to compare the object IDs in resources and APISIX
+// Find out the rest of objects in APISIX
+// AND warn them in log.
+func (c *Controller) CompareResources(ctx context.Context) error {
+	var (
+		wg                sync.WaitGroup
+		routeMapK8S       = new(sync.Map)
+		streamRouteMapK8S = new(sync.Map)
+		upstreamMapK8S    = new(sync.Map)
+		sslMapK8S         = new(sync.Map)
+		consumerMapK8S    = new(sync.Map)
+
+		routeMapA6       = make(map[string]string)
+		streamRouteMapA6 = make(map[string]string)
+		upstreamMapA6    = make(map[string]string)
+		sslMapA6         = make(map[string]string)
+		consumerMapA6    = make(map[string]string)
+	)
+	// watchingNamespace == nil means to monitor all namespaces
+	if c.watchingNamespace == nil {
+		opts := v1.ListOptions{}
+		// list all namespaces
+		nsList, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
+		if err != nil {
+			log.Error(err.Error())
+			ctx.Done()
+		} else {
+			wns := make(map[string]struct{}, len(nsList.Items))
+			for _, v := range nsList.Items {
+				wns[v.Name] = struct{}{}
+			}
+			c.watchingNamespace = wns
+		}
+	}
+	if len(c.watchingNamespace) > 0 {
+		wg.Add(len(c.watchingNamespace))
+	}
+	for ns := range c.watchingNamespace {
+		go func(ns string) {
+			defer wg.Done()
+			// ApisixRoute
+			opts := v1.ListOptions{}
+			retRoutes, err := c.kubeClient.APISIXClient.ApisixV2beta1().ApisixRoutes(ns).List(ctx, opts)
+			if err != nil {
+				log.Error(err.Error())
+				ctx.Done()
+			} else {
+				for _, r := range retRoutes.Items {
+					tc, err := c.translator.TranslateRouteV2beta1NotStrictly(&r)
+					if err != nil {
+						log.Error(err.Error())
+						ctx.Done()
+					} else {
+						// routes
+						for _, route := range tc.Routes {
+							routeMapK8S.Store(route.ID, route.ID)
+						}
+						// streamRoutes
+						for _, stRoute := range tc.StreamRoutes {
+							streamRouteMapK8S.Store(stRoute.ID, stRoute.ID)
+						}
+						// upstreams
+						for _, upstream := range tc.Upstreams {
+							upstreamMapK8S.Store(upstream.ID, upstream.ID)
+						}
+						// ssl
+						for _, ssl := range tc.SSL {
+							sslMapK8S.Store(ssl.ID, ssl.ID)
+						}
+					}
+				}
+			}
+			// todo ApisixUpstream
+			// ApisixUpstream should be synced with ApisixRoute resource
+
+			// ApisixSSL
+			retSSL, err := c.kubeClient.APISIXClient.ApisixV1().ApisixTlses(ns).List(ctx, opts)
+			if err != nil {
+				log.Error(err.Error())
+				ctx.Done()
+			} else {
+				for _, s := range retSSL.Items {
+					ssl, err := c.translator.TranslateSSL(&s)
+					if err != nil {
+						log.Error(err.Error())
+						ctx.Done()
+					} else {
+						sslMapK8S.Store(ssl.ID, ssl.ID)
+					}
+				}
+			}
+			// ApisixConsumer
+			retConsumer, err := c.kubeClient.APISIXClient.ApisixV2alpha1().ApisixConsumers(ns).List(ctx, opts)
+			if err != nil {
+				log.Error(err.Error())
+				ctx.Done()
+			} else {
+				for _, con := range retConsumer.Items {
+					consumer, err := c.translator.TranslateApisixConsumer(&con)
+					if err != nil {
+						log.Error(err.Error())
+						ctx.Done()
+					} else {
+						consumerMapK8S.Store(consumer.Username, consumer.Username)
+					}
+				}
+			}
+		}(ns)
+	}
+	wg.Wait()
+
+	// 2.get all cache routes
+	if err := c.listRouteCache(ctx, routeMapA6); err != nil {
+		return err
+	}
+	if err := c.listStreamRouteCache(ctx, streamRouteMapA6); err != nil {
+		return err
+	}
+	if err := c.listUpstreamCache(ctx, upstreamMapA6); err != nil {
+		return err
+	}
+	if err := c.listSSLCache(ctx, sslMapA6); err != nil {
+		return err
+	}
+	if err := c.listConsumerCache(ctx, consumerMapA6); err != nil {
+		return err
+	}
+	// 3.compare
+	routeReult := findRedundant(routeMapA6, routeMapK8S)
+	streamRouteReult := findRedundant(streamRouteMapA6, streamRouteMapK8S)
+	upstreamReult := findRedundant(upstreamMapA6, upstreamMapK8S)
+	sslReult := findRedundant(sslMapA6, sslMapK8S)
+	consuemrReult := findRedundant(consumerMapA6, consumerMapK8S)
+	// 4.warn
+	warnRedundantResources(routeReult, "route")
+	warnRedundantResources(streamRouteReult, "streamRoute")
+	warnRedundantResources(upstreamReult, "upstream")
+	warnRedundantResources(sslReult, "ssl")
+	warnRedundantResources(consuemrReult, "consumer")
+
+	return nil
+}
+
+// log warn
+func warnRedundantResources(resources map[string]string, t string) {
+	for k := range resources {
+		log.Warnf("%s: %s in APISIX but do not in declare yaml", t, k)
+	}
+}
+
+// findRedundant find redundant item which in src and do not in dest
+func findRedundant(src map[string]string, dest *sync.Map) map[string]string {
+	result := make(map[string]string)
+	for k, v := range src {
+		_, ok := dest.Load(k)
+		if !ok {
+			result[k] = v
+		}
+	}
+	return result
+}
+
+func (c *Controller) listRouteCache(ctx context.Context, routeMapA6 map[string]string) error {
+	routesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Route().List(ctx)
+	if err != nil {
+		return err
+	} else {
+		for _, ra := range routesInA6 {
+			routeMapA6[ra.ID] = ra.ID
+		}
+	}
+	return nil
+}
+
+func (c *Controller) listStreamRouteCache(ctx context.Context, streamRouteMapA6 map[string]string) error {
+	streamRoutesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).StreamRoute().List(ctx)
+	if err != nil {
+		return err
+	} else {
+		for _, ra := range streamRoutesInA6 {
+			streamRouteMapA6[ra.ID] = ra.ID
+		}
+	}
+	return nil
+}
+
+func (c *Controller) listUpstreamCache(ctx context.Context, upstreamMapA6 map[string]string) error {
+	upstreamsInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Upstream().List(ctx)
+	if err != nil {
+		return err
+	} else {
+		for _, ra := range upstreamsInA6 {
+			upstreamMapA6[ra.ID] = ra.ID
+		}
+	}
+	return nil
+}
+
+func (c *Controller) listSSLCache(ctx context.Context, sslMapA6 map[string]string) error {
+	sslInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).SSL().List(ctx)
+	if err != nil {
+		return err
+	} else {
+		for _, s := range sslInA6 {
+			sslMapA6[s.ID] = s.ID
+		}
+	}
+	return nil
+}
+
+func (c *Controller) listConsumerCache(ctx context.Context, consumerMapA6 map[string]string) error {
+	consumerInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Consumer().List(ctx)
+	if err != nil {
+		return err
+	} else {
+		for _, con := range consumerInA6 {
+			consumerMapA6[con.Username] = con.Username
+		}
+	}
+	return nil
+}
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 0b83a8e..b3312d0 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -402,6 +402,12 @@ func (c *Controller) run(ctx context.Context) {
 
 	c.initWhenStartLeading()
 
+	// compare resources of k8s with objects of APISIX
+	if err = c.CompareResources(ctx); err != nil {
+		ctx.Done()
+		return
+	}
+
 	c.goAttach(func() {
 		c.checkClusterHealth(ctx, cancelFunc)
 	})
@@ -418,6 +424,7 @@ func (c *Controller) run(ctx context.Context) {
 		c.ingressInformer.Run(ctx.Done())
 	})
 	c.goAttach(func() {
+
 		c.apisixRouteInformer.Run(ctx.Done())
 	})
 	c.goAttach(func() {
diff --git a/pkg/ingress/pod.go b/pkg/ingress/pod.go
index a5b1cd2..efaf881 100644
--- a/pkg/ingress/pod.go
+++ b/pkg/ingress/pod.go
@@ -89,7 +89,8 @@ func (c *podController) onUpdate(_, cur interface{}) {
 		return
 	}
 	log.Debugw("pod update event arrived",
-		zap.Any("final state", pod),
+		zap.Any("pod namespace", pod.Namespace),
+		zap.Any("pod name", pod.Name),
 	)
 	if pod.DeletionTimestamp != nil {
 		if err := c.controller.podCache.Delete(pod); err != nil {
diff --git a/test/e2e/go.mod b/test/e2e/go.mod
index 7e9fd40..4c48e5e 100644
--- a/test/e2e/go.mod
+++ b/test/e2e/go.mod
@@ -8,7 +8,6 @@ require (
 	github.com/gorilla/websocket v1.4.2
 	github.com/gruntwork-io/terratest v0.32.8
 	github.com/onsi/ginkgo v1.16.4
-	github.com/onsi/gomega v1.10.1
 	github.com/stretchr/testify v1.7.0
 	k8s.io/api v0.21.1
 	k8s.io/apimachinery v0.21.1
diff --git a/test/e2e/ingress/compare.go b/test/e2e/ingress/compare.go
new file mode 100644
index 0000000..ce57855
--- /dev/null
+++ b/test/e2e/ingress/compare.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/onsi/ginkgo"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("Testing compare resources", func() {
+	opts := &scaffold.Options{
+		Name:                  "default",
+		Kubeconfig:            scaffold.GetKubeconfig(),
+		APISIXConfigPath:      "testdata/apisix-gw-config.yaml",
+		IngressAPISIXReplicas: 1,
+		HTTPBinServicePort:    80,
+		APISIXRouteVersion:    "apisix.apache.org/v2beta1",
+	}
+	s := scaffold.NewScaffold(opts)
+	ginkgo.It("Compare and find out the redundant objects in APISIX, and remove them", func() {
+		backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+		apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta1
+kind: ApisixRoute
+metadata:
+  name: httpbin-route
+spec:
+  http:
+  - name: rule1
+    match:
+      hosts:
+      - httpbin.com
+      paths:
+      - /ip
+    backend:
+      serviceName: %s
+      servicePort: %d
+`, backendSvc, backendSvcPort[0])
+		assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute))
+
+		err := s.EnsureNumApisixRoutesCreated(1)
+		assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+		err = s.EnsureNumApisixUpstreamsCreated(1)
+		assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams")
+		// scale Ingres Controller --replicas=0
+		assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(0), "scaling ingress controller instances = 0")
+		// remove ApisixRoute resource
+		assert.Nil(ginkgo.GinkgoT(), s.RemoveResourceByString(apisixRoute))
+		// scale Ingres Controller --replicas=1
+		assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(1), "scaling ingress controller instances = 1")
+		time.Sleep(15 * time.Second)
+		// should find the warn log
+		output := s.GetDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
+		fmt.Println(output)
+		assert.Contains(ginkgo.GinkgoT(), output, "in APISIX but do not in declare yaml")
+	})
+})
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 33e732a..714cf3d 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -19,6 +19,7 @@ import (
 	"context"
 	"encoding/base64"
 	"fmt"
+	"time"
 
 	"github.com/gruntwork-io/terratest/modules/k8s"
 	"github.com/onsi/ginkgo"
@@ -452,3 +453,20 @@ func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
 		LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test",
 	})
 }
+
+// ScaleIngressController scales the number of Ingress Controller pods to desired.
+func (s *Scaffold) ScaleIngressController(desired int) error {
+	var ingressDeployment string
+	if s.opts.EnableWebhooks {
+		ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
+	} else {
+		ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
+	}
+	if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressDeployment); err != nil {
+		return err
+	}
+	if err := k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, s.labelSelector("app=ingress-apisix-controller-deployment-e2e-test"), desired, 5, 5*time.Second); err != nil {
+		return err
+	}
+	return nil
+}
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index bde3e8f..865c072 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -339,12 +339,12 @@ func (s *Scaffold) afterEach() {
 			_, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
 		}
 		// Get the logs of apisix
-		output = s.getDeploymentLogs("apisix-deployment-e2e-test")
+		output = s.GetDeploymentLogs("apisix-deployment-e2e-test")
 		if output != "" {
 			_, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
 		}
 		// Get the logs of ingress
-		output = s.getDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
+		output = s.GetDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
 		if output != "" {
 			_, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
 		}
@@ -362,7 +362,7 @@ func (s *Scaffold) afterEach() {
 	time.Sleep(3 * time.Second)
 }
 
-func (s *Scaffold) getDeploymentLogs(name string) string {
+func (s *Scaffold) GetDeploymentLogs(name string) string {
 	cli, err := k8s.GetKubernetesClientE(s.t)
 	if err != nil {
 		assert.Nilf(ginkgo.GinkgoT(), err, "get client error: %s", err.Error())