You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/10/28 04:39:05 UTC

[apisix-ingress-controller] branch master updated: feat: add label-selector for watching namespace (#715)

This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f7c88  feat: add label-selector for watching namespace (#715)
65f7c88 is described below

commit 65f7c88193eb6e83b2bb4ca87a981321a99503e5
Author: kv <gx...@163.com>
AuthorDate: Thu Oct 28 12:37:59 2021 +0800

    feat: add label-selector for watching namespace (#715)
---
 cmd/ingress/ingress.go                   |   6 +-
 conf/config-default.yaml                 |   4 +
 docs/en/latest/practices/the-hard-way.md |   4 +-
 pkg/config/config.go                     |  52 ++++++++++
 pkg/ingress/compare.go                   |  16 +--
 pkg/ingress/controller.go                |  36 ++++++-
 pkg/ingress/namespace.go                 | 166 +++++++++++++++++++++++++++++++
 pkg/ingress/pod_test.go                  |  25 ++---
 test/e2e/scaffold/ingress.go             |  14 ++-
 test/e2e/scaffold/scaffold.go            |   5 +-
 10 files changed, 294 insertions(+), 34 deletions(-)

diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index 7843337..e36fbcc 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -143,7 +143,8 @@ the apisix cluster and others are created`,
 	cmd.PersistentFlags().BoolVar(&cfg.EnableProfiling, "enable-profiling", true, "enable profiling via web interface host:port/debug/pprof")
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig, "kubeconfig", "", "Kubernetes configuration file (by default in-cluster configuration will be used)")
 	cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration, "resync-interval", time.Minute, "the controller resync (with Kubernetes) interval, the minimum resync interval is 30s")
-	cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, "app-namespace", []string{config.NamespaceAll}, "namespaces that controller will watch for resources")
+	cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, "app-namespace", []string{config.NamespaceAll}, "namespaces that controller will watch for resources.")
+	cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.NamespaceSelector, "namespace-selector", []string{""}, "labels that controller used to select namespaces which will watch for resources")
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressClass, "ingress-class", config.IngressClass, "the class of an Ingress object is set using the field IngressClassName in Kubernetes clusters version v1.18.0 or higher or the annotation \"kubernetes.io/ingress.class\" (deprecated)")
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader")
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"")
@@ -155,5 +156,8 @@ the apisix cluster and others are created`,
 	cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey, "default-apisix-cluster-admin-key", "", "admin key used for the authorization of admin api / manager api for the default APISIX cluster")
 	cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName, "default-apisix-cluster-name", "default", "name of the default apisix cluster")
 
+	if err := cmd.PersistentFlags().MarkDeprecated("app-namespace", "use namespace-selector instead"); err != nil {
+		dief("failed to mark `app-namespace` as deprecated: %s", err)
+	}
 	return cmd
 }
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index fa0c1fd..c724212 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -44,6 +44,10 @@ kubernetes:
                                        # and the minimal resync interval is 30s.
   app_namespaces: ["*"]                # namespace list that controller will watch for resources,
                                        # by default all namespaces (represented by "*") are watched.
+                                       # The `app_namespace` is deprecated, using `namespace_selector` instead since version 1.4.0
+  namespace_selector: [""]             # namespace_selector represent basis for selecting managed namespaces.
+                                       # the field is support since version 1.4.0
+                                       # For example, "apisix.ingress=watching", so ingress will watching the namespaces which labels "apisix.ingress=watching"
   election_id: "ingress-apisix-leader" # the election id for the controller leader campaign,
                                        # only the leader will watch and delivery resource changes,
                                        # other instances (as candidates) stand by.
diff --git a/docs/en/latest/practices/the-hard-way.md b/docs/en/latest/practices/the-hard-way.md
index 5b2a6ca..f55aab5 100644
--- a/docs/en/latest/practices/the-hard-way.md
+++ b/docs/en/latest/practices/the-hard-way.md
@@ -621,8 +621,8 @@ data:
     kubernetes:
       kubeconfig: ""
       resync_interval: "30s"
-      app_namespaces:
-      - "*"
+      namespace_selector:
+      - "apisix.ingress=watching"
       ingress_class: "apisix"
       ingress_version: "networking/v1"
       apisix_route_version: "apisix.apache.org/v2beta1"
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 9294e88..9cd19c1 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -17,12 +17,14 @@ package config
 import (
 	"encoding/json"
 	"errors"
+	"fmt"
 	"io/ioutil"
 	"strings"
 	"time"
 
 	"gopkg.in/yaml.v2"
 	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/util/validation"
 
 	"github.com/apache/apisix-ingress-controller/pkg/types"
 )
@@ -77,6 +79,7 @@ type KubernetesConfig struct {
 	Kubeconfig          string             `json:"kubeconfig" yaml:"kubeconfig"`
 	ResyncInterval      types.TimeDuration `json:"resync_interval" yaml:"resync_interval"`
 	AppNamespaces       []string           `json:"app_namespaces" yaml:"app_namespaces"`
+	NamespaceSelector   []string           `json:"namespace_selector" yaml:"namespace_selector"`
 	ElectionID          string             `json:"election_id" yaml:"election_id"`
 	IngressClass        string             `json:"ingress_class" yaml:"ingress_class"`
 	IngressVersion      string             `json:"ingress_version" yaml:"ingress_version"`
@@ -174,6 +177,10 @@ func (cfg *Config) Validate() error {
 		return errors.New("unsupported ingress version")
 	}
 	cfg.Kubernetes.AppNamespaces = purifyAppNamespaces(cfg.Kubernetes.AppNamespaces)
+	ok, err := cfg.verifyNamespaceSelector()
+	if !ok {
+		return err
+	}
 	return nil
 }
 
@@ -191,3 +198,48 @@ func purifyAppNamespaces(namespaces []string) []string {
 	}
 	return ultimate
 }
+
+func (cfg *Config) verifyNamespaceSelector() (bool, error) {
+	labels := cfg.Kubernetes.NamespaceSelector
+	// default is [""]
+	if len(labels) == 1 && labels[0] == "" {
+		cfg.Kubernetes.NamespaceSelector = []string{}
+	}
+
+	for _, s := range cfg.Kubernetes.NamespaceSelector {
+		parts := strings.Split(s, "=")
+		if len(parts) != 2 {
+			return false, fmt.Errorf("Illegal namespaceSelector: %s, should be key-value pairs divided by = ", s)
+		} else {
+			if err := cfg.validateLabelKey(parts[0]); err != nil {
+				return false, err
+			}
+			if err := cfg.validateLabelValue(parts[1]); err != nil {
+				return false, err
+			}
+		}
+	}
+	return true, nil
+}
+
+// validateLabelKey validate the key part of label
+// ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
+func (cfg *Config) validateLabelKey(key string) error {
+	errorMsg := validation.IsQualifiedName(key)
+	msg := strings.Join(errorMsg, "\n")
+	if msg == "" {
+		return nil
+	}
+	return fmt.Errorf("Illegal namespaceSelector: %s, "+msg, key)
+}
+
+// validateLabelValue validate the value part of label
+// ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
+func (cfg *Config) validateLabelValue(value string) error {
+	errorMsg := validation.IsValidLabelValue(value)
+	msg := strings.Join(errorMsg, "\n")
+	if msg == "" {
+		return nil
+	}
+	return fmt.Errorf("Illegal namespaceSelector: %s, "+msg, value)
+}
diff --git a/pkg/ingress/compare.go b/pkg/ingress/compare.go
index badad42..b5829a9 100644
--- a/pkg/ingress/compare.go
+++ b/pkg/ingress/compare.go
@@ -50,17 +50,16 @@ func (c *Controller) CompareResources(ctx context.Context) error {
 			log.Error(err.Error())
 			ctx.Done()
 		} else {
-			wns := make(map[string]struct{}, len(nsList.Items))
+			wns := new(sync.Map)
 			for _, v := range nsList.Items {
-				wns[v.Name] = struct{}{}
+				wns.Store(v.Name, struct{}{})
 			}
 			c.watchingNamespace = wns
 		}
 	}
-	if len(c.watchingNamespace) > 0 {
-		wg.Add(len(c.watchingNamespace))
-	}
-	for ns := range c.watchingNamespace {
+
+	c.watchingNamespace.Range(func(key, value interface{}) bool {
+		wg.Add(1)
 		go func(ns string) {
 			defer wg.Done()
 			// ApisixRoute
@@ -130,8 +129,9 @@ func (c *Controller) CompareResources(ctx context.Context) error {
 					}
 				}
 			}
-		}(ns)
-	}
+		}(key.(string))
+		return true
+	})
 	wg.Wait()
 
 	// 2.get all cache routes
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 6465b56..14b024b 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -18,6 +18,7 @@ import (
 	"context"
 	"fmt"
 	"os"
+	"strings"
 	"sync"
 	"time"
 
@@ -70,7 +71,8 @@ type Controller struct {
 	namespace         string
 	cfg               *config.Config
 	wg                sync.WaitGroup
-	watchingNamespace map[string]struct{}
+	watchingNamespace *sync.Map
+	watchingLabels    types.Labels
 	apisix            apisix.APISIX
 	podCache          types.PodCache
 	translator        translation.Translator
@@ -90,6 +92,8 @@ type Controller struct {
 	leaderContextCancelFunc context.CancelFunc
 
 	// common informers and listers
+	namespaceInformer           cache.SharedIndexInformer
+	namespaceLister             listerscorev1.NamespaceLister
 	podInformer                 cache.SharedIndexInformer
 	podLister                   listerscorev1.PodLister
 	epInformer                  cache.SharedIndexInformer
@@ -112,6 +116,7 @@ type Controller struct {
 	apisixConsumerLister        listersv2alpha1.ApisixConsumerLister
 
 	// resource controllers
+	namespaceController     *namespaceController
 	podController           *podController
 	endpointsController     *endpointsController
 	endpointSliceController *endpointSliceController
@@ -148,15 +153,21 @@ func NewController(cfg *config.Config) (*Controller, error) {
 	}
 
 	var (
-		watchingNamespace map[string]struct{}
+		watchingNamespace = new(sync.Map)
+		watchingLabels    = make(map[string]string)
 	)
 	if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
-		watchingNamespace = make(map[string]struct{}, len(cfg.Kubernetes.AppNamespaces))
 		for _, ns := range cfg.Kubernetes.AppNamespaces {
-			watchingNamespace[ns] = struct{}{}
+			watchingNamespace.Store(ns, struct{}{})
 		}
 	}
 
+	// support namespace label-selector
+	for _, labels := range cfg.Kubernetes.NamespaceSelector {
+		labelSlice := strings.Split(labels, "=")
+		watchingLabels[labelSlice[0]] = labelSlice[1]
+	}
+
 	// recorder
 	utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme))
 	eventBroadcaster := record.NewBroadcaster()
@@ -171,6 +182,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
 		metricsCollector:  metrics.NewPrometheusCollector(),
 		kubeClient:        kubeClient,
 		watchingNamespace: watchingNamespace,
+		watchingLabels:    watchingLabels,
 		secretSSLMap:      new(sync.Map),
 		recorder:          eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
 
@@ -188,6 +200,7 @@ func (c *Controller) initWhenStartLeading() {
 	kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
 	apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()
 
+	c.namespaceLister = kubeFactory.Core().V1().Namespaces().Lister()
 	c.podLister = kubeFactory.Core().V1().Pods().Lister()
 	c.epLister, c.epInformer = kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices)
 	c.svcLister = kubeFactory.Core().V1().Services().Lister()
@@ -236,6 +249,7 @@ func (c *Controller) initWhenStartLeading() {
 		apisixRouteInformer = apisixFactory.Apisix().V2beta2().ApisixRoutes().Informer()
 	}
 
+	c.namespaceInformer = kubeFactory.Core().V1().Namespaces().Informer()
 	c.podInformer = kubeFactory.Core().V1().Pods().Informer()
 	c.svcInformer = kubeFactory.Core().V1().Services().Informer()
 	c.ingressInformer = ingressInformer
@@ -251,6 +265,7 @@ func (c *Controller) initWhenStartLeading() {
 	} else {
 		c.endpointsController = c.newEndpointsController()
 	}
+	c.namespaceController = c.newNamespaceController()
 	c.podController = c.newPodController()
 	c.apisixUpstreamController = c.newApisixUpstreamController()
 	c.ingressController = c.newIngressController()
@@ -405,6 +420,11 @@ func (c *Controller) run(ctx context.Context) {
 
 	c.initWhenStartLeading()
 
+	// list namesapce and init watchingNamespace
+	if err := c.initWatchingNamespaceByLabels(ctx); err != nil {
+		ctx.Done()
+		return
+	}
 	// compare resources of k8s with objects of APISIX
 	if err = c.CompareResources(ctx); err != nil {
 		ctx.Done()
@@ -415,6 +435,9 @@ func (c *Controller) run(ctx context.Context) {
 		c.checkClusterHealth(ctx, cancelFunc)
 	})
 	c.goAttach(func() {
+		c.namespaceInformer.Run(ctx.Done())
+	})
+	c.goAttach(func() {
 		c.podInformer.Run(ctx.Done())
 	})
 	c.goAttach(func() {
@@ -446,6 +469,9 @@ func (c *Controller) run(ctx context.Context) {
 		c.apisixConsumerInformer.Run(ctx.Done())
 	})
 	c.goAttach(func() {
+		c.namespaceController.run(ctx)
+	})
+	c.goAttach(func() {
 		c.podController.run(ctx)
 	})
 	c.goAttach(func() {
@@ -502,7 +528,7 @@ func (c *Controller) namespaceWatching(key string) (ok bool) {
 		log.Warnf("resource %s was ignored since: %s", key, err)
 		return
 	}
-	_, ok = c.watchingNamespace[ns]
+	_, ok = c.watchingNamespace.Load(ns)
 	return
 }
 
diff --git a/pkg/ingress/namespace.go b/pkg/ingress/namespace.go
new file mode 100644
index 0000000..b8f1479
--- /dev/null
+++ b/pkg/ingress/namespace.go
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+	"context"
+	"time"
+
+	"go.uber.org/zap"
+	corev1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
+	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/util/workqueue"
+
+	"github.com/apache/apisix-ingress-controller/pkg/log"
+	"github.com/apache/apisix-ingress-controller/pkg/types"
+)
+
+type namespaceController struct {
+	controller *Controller
+	workqueue  workqueue.RateLimitingInterface
+	workers    int
+}
+
+func (c *Controller) newNamespaceController() *namespaceController {
+	ctl := &namespaceController{
+		controller: c,
+		workqueue:  workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Namespace"),
+		workers:    1,
+	}
+	ctl.controller.namespaceInformer.AddEventHandler(
+		cache.ResourceEventHandlerFuncs{
+			AddFunc:    ctl.onAdd,
+			UpdateFunc: ctl.onUpdate,
+			DeleteFunc: ctl.onDelete,
+		},
+	)
+	return ctl
+}
+
+func (c *Controller) initWatchingNamespaceByLabels(ctx context.Context) error {
+	labelSelector := metav1.LabelSelector{MatchLabels: c.watchingLabels}
+	opts := metav1.ListOptions{
+		LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
+	}
+	namespaces, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
+	if err != nil {
+		return err
+	} else {
+		for _, ns := range namespaces.Items {
+			c.watchingNamespace.Store(ns.Name, struct{}{})
+		}
+	}
+	return nil
+}
+
+func (c *namespaceController) run(ctx context.Context) {
+	log.Info("namespace controller started")
+	defer log.Info("namespace controller exited")
+
+	if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.namespaceInformer.HasSynced); !ok {
+		log.Error("informers sync failed")
+		return
+	}
+	for i := 0; i < c.workers; i++ {
+		go c.runWorker(ctx)
+	}
+	<-ctx.Done()
+}
+
+func (c *namespaceController) runWorker(ctx context.Context) {
+	for {
+		obj, quit := c.workqueue.Get()
+		if quit {
+			return
+		}
+		err := c.sync(ctx, obj.(*types.Event))
+		c.workqueue.Done(obj)
+		c.handleSyncErr(obj.(*types.Event), err)
+	}
+}
+
+func (c *namespaceController) sync(ctx context.Context, ev *types.Event) error {
+	if ev.Type != types.EventDelete {
+		// check the labels of specify namespace
+		namespace, err := c.controller.kubeClient.Client.CoreV1().Namespaces().Get(ctx, ev.Object.(string), metav1.GetOptions{})
+		if err != nil {
+			return err
+		} else {
+			// if labels of namespace contains the watchingLabels, the namespace should be set to controller.watchingNamespace
+			if c.controller.watchingLabels.IsSubsetOf(namespace.Labels) {
+				c.controller.watchingNamespace.Store(namespace.Name, struct{}{})
+			}
+		}
+	} else { // type == types.EventDelete
+		namespace := ev.Tombstone.(*corev1.Namespace)
+		if _, ok := c.controller.watchingNamespace.Load(namespace.Name); ok {
+			c.controller.watchingNamespace.Delete(namespace.Name)
+		}
+		// do nothing, if the namespace did not in controller.watchingNamespace
+	}
+	return nil
+}
+
+func (c *namespaceController) handleSyncErr(event *types.Event, err error) {
+	name := event.Object.(string)
+	if err != nil {
+		log.Warnw("sync namespace info failed, will retry",
+			zap.String("namespace", name),
+			zap.Error(err),
+		)
+		c.workqueue.AddRateLimited(event)
+	} else {
+		c.workqueue.Forget(event)
+	}
+}
+
+func (c *namespaceController) onAdd(obj interface{}) {
+	key, err := cache.MetaNamespaceKeyFunc(obj)
+	if err == nil {
+		log.Debugw(key)
+	}
+	c.workqueue.AddRateLimited(&types.Event{
+		Type:   types.EventAdd,
+		Object: key,
+	})
+}
+
+func (c *namespaceController) onUpdate(pre, cur interface{}) {
+	oldNamespace := pre.(*corev1.Namespace)
+	newNamespace := cur.(*corev1.Namespace)
+	if oldNamespace.ResourceVersion >= newNamespace.ResourceVersion {
+		return
+	}
+	key, err := cache.MetaNamespaceKeyFunc(cur)
+	if err != nil {
+		log.Errorf("found Namespace resource with error: %s", err)
+		return
+	}
+	c.workqueue.AddRateLimited(&types.Event{
+		Type:   types.EventUpdate,
+		Object: key,
+	})
+}
+
+func (c *namespaceController) onDelete(obj interface{}) {
+	namespace := obj.(*corev1.Namespace)
+	c.workqueue.AddRateLimited(&types.Event{
+		Type:      types.EventDelete,
+		Object:    namespace.Name,
+		Tombstone: namespace,
+	})
+}
diff --git a/pkg/ingress/pod_test.go b/pkg/ingress/pod_test.go
index 289c4e7..7069783 100644
--- a/pkg/ingress/pod_test.go
+++ b/pkg/ingress/pod_test.go
@@ -15,6 +15,7 @@
 package ingress
 
 import (
+	"sync"
 	"testing"
 	"time"
 
@@ -26,12 +27,12 @@ import (
 )
 
 func TestPodOnAdd(t *testing.T) {
+	watchingNamespace := new(sync.Map)
+	watchingNamespace.Store("default", struct{}{})
 	ctl := &podController{
 		controller: &Controller{
-			watchingNamespace: map[string]struct{}{
-				"default": {},
-			},
-			podCache: types.NewPodCache(),
+			watchingNamespace: watchingNamespace,
+			podCache:          types.NewPodCache(),
 		},
 	}
 
@@ -67,12 +68,12 @@ func TestPodOnAdd(t *testing.T) {
 }
 
 func TestPodOnDelete(t *testing.T) {
+	watchingNamespace := new(sync.Map)
+	watchingNamespace.Store("default", struct{}{})
 	ctl := &podController{
 		controller: &Controller{
-			watchingNamespace: map[string]struct{}{
-				"default": {},
-			},
-			podCache: types.NewPodCache(),
+			watchingNamespace: watchingNamespace,
+			podCache:          types.NewPodCache(),
 		},
 	}
 
@@ -111,12 +112,12 @@ func TestPodOnDelete(t *testing.T) {
 }
 
 func TestPodOnUpdate(t *testing.T) {
+	watchingNamespace := new(sync.Map)
+	watchingNamespace.Store("default", struct{}{})
 	ctl := &podController{
 		controller: &Controller{
-			watchingNamespace: map[string]struct{}{
-				"default": {},
-			},
-			podCache: types.NewPodCache(),
+			watchingNamespace: watchingNamespace,
+			podCache:          types.NewPodCache(),
 		},
 	}
 
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index e65832f..a2c732d 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -278,7 +278,9 @@ spec:
             - --default-apisix-cluster-admin-key
             - edd1c9f034335f136f87ad84b625c8f1
             - --app-namespace
-            - %s,kube-system
+            - kube-system
+            - --namespace-selector
+            - %s
             - --apisix-route-version
             - %s
             - --watch-endpointslices
@@ -401,10 +403,11 @@ func (s *Scaffold) newIngressAPISIXController() error {
 	})
 
 	var ingressAPISIXDeployment string
+	label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
 	if s.opts.EnableWebhooks {
-		ingressAPISIXDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.opts.IngressAPISIXReplicas, s.namespace, s.namespace, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
+		ingressAPISIXDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.opts.IngressAPISIXReplicas, s.namespace, label, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
 	} else {
-		ingressAPISIXDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.opts.IngressAPISIXReplicas, s.namespace, s.namespace, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
+		ingressAPISIXDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.opts.IngressAPISIXReplicas, s.namespace, label, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
 	}
 
 	err = k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressAPISIXDeployment)
@@ -508,10 +511,11 @@ func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
 // ScaleIngressController scales the number of Ingress Controller pods to desired.
 func (s *Scaffold) ScaleIngressController(desired int) error {
 	var ingressDeployment string
+	label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
 	if s.opts.EnableWebhooks {
-		ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
+		ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, label, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
 	} else {
-		ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
+		ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, label, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
 	}
 	if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressDeployment); err != nil {
 		return err
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index 865c072..a330c86 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -287,8 +287,11 @@ func (s *Scaffold) beforeEach() {
 		ConfigPath: s.opts.Kubeconfig,
 		Namespace:  s.namespace,
 	}
+
 	s.finializers = nil
-	k8s.CreateNamespace(s.t, s.kubectlOptions, s.namespace)
+	labels := make(map[string]string)
+	labels["apisix.ingress.watch"] = s.namespace
+	k8s.CreateNamespaceWithMetadata(s.t, s.kubectlOptions, metav1.ObjectMeta{Name: s.namespace, Labels: labels})
 
 	s.nodes, err = k8s.GetReadyNodesE(s.t, s.kubectlOptions)
 	assert.Nil(s.t, err, "querying ready nodes")