You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/07/10 14:31:35 UTC

[apisix-ingress-controller] branch master updated: chore: endpointslice controller (#574)

This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 67f3fd9  chore: endpointslice controller (#574)
67f3fd9 is described below

commit 67f3fd934b8a8b935440227a5c8ba7923ba91a2a
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Sat Jul 10 22:31:29 2021 +0800

    chore: endpointslice controller (#574)
---
 README.md                                        |   2 +-
 cmd/ingress/ingress.go                           |   1 +
 conf/config-default.yaml                         |   1 +
 docs/en/latest/FAQ.md                            |   2 +-
 pkg/ingress/controller.go                        | 103 ++++++++++-
 pkg/ingress/endpoint.go                          |  84 +--------
 pkg/ingress/endpointslice.go                     | 210 +++++++++++++++++++++++
 pkg/kube/endpoint.go                             |   9 +-
 pkg/kube/translation/translator_test.go          | 133 +++++++++++++-
 samples/deploy/rbac/apisix_view_clusterrole.yaml |   8 +
 test/e2e/scaffold/ingress.go                     |   9 +
 11 files changed, 470 insertions(+), 92 deletions(-)

diff --git a/README.md b/README.md
index 22b06db..df46172 100644
--- a/README.md
+++ b/README.md
@@ -58,7 +58,7 @@ This project is currently general availability.
 
 ## Prerequisites
 
-Apisix ingress controller requires Kubernetes version 1.14+.
+Apisix ingress controller requires Kubernetes version 1.15+.
 
 ## Apache APISIX Ingress vs. Kubernetes Ingress Nginx
 
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index df931ef..de22508 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -147,6 +147,7 @@ the apisix cluster and others are created`,
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader")
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"")
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, "apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute api group version, can be \"apisix.apache.org/v1\" or \"apisix.apache.org/v2alpha1\"")
+	cmd.PersistentFlags().BoolVar(&cfg.Kubernetes.WatchEndpointSlices, "watch-endpointslices", false, "whether to watch endpointslices rather than endpoints")
 	cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api (deprecated, using --default-apisix-cluster-base-url instead)")
 	cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api (deprecated, using --default-apisix-cluster-admin-key instead)")
 	cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster")
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 78c4f5d..b6ec625 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -50,6 +50,7 @@ kubernetes:
   ingress_version: "networking/v1"     # the supported ingress api group version, can be "networking/v1beta1"
                                        # , "networking/v1" (for Kubernetes version v1.19.0 or higher), and
                                        # "extensions/v1beta1", default is "networking/v1".
+  watch_endpointslices: false          # whether to watch EndpointSlices rather than Endpoints.
 
   apisix_route_version: "apisix.apache.org/v2alpha1" # the supported apisixroute api group version, can be
                                                      # "apisix.apache.org/v1" or "apisix.apache.org/v2alpha1",
diff --git a/docs/en/latest/FAQ.md b/docs/en/latest/FAQ.md
index 7c92f9a..8786def 100644
--- a/docs/en/latest/FAQ.md
+++ b/docs/en/latest/FAQ.md
@@ -49,7 +49,7 @@ Tips: The failure caused by empty upstream nodes is a limitation of Apache APISI
 
 6. What is the retry rule of `apisix-ingress-controller`?
 
-If an error occurs during the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered.
+If an error occurs duriREADME.mdng the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered.
 
 The delayed retry method is adopted. After the first failure, it is retried once per second. After 5 retries are triggered, the slow retry strategy will be enabled, and the retry will be performed every 1 minute until it succeeds.
 
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 7694f0b..3c3d566 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -21,8 +21,11 @@ import (
 	"sync"
 	"time"
 
+	apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
+	configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
 	"go.uber.org/zap"
 	v1 "k8s.io/api/core/v1"
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/runtime"
 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -107,10 +110,11 @@ type Controller struct {
 	apisixConsumerLister        listersv2alpha1.ApisixConsumerLister
 
 	// resource controllers
-	podController       *podController
-	endpointsController *endpointsController
-	ingressController   *ingressController
-	secretController    *secretController
+	podController           *podController
+	endpointsController     *endpointsController
+	endpointSliceController *endpointSliceController
+	ingressController       *ingressController
+	secretController        *secretController
 
 	apisixUpstreamController      *apisixUpstreamController
 	apisixRouteController         *apisixRouteController
@@ -237,8 +241,12 @@ func (c *Controller) initWhenStartLeading() {
 	c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer()
 	c.apisixConsumerInformer = apisixFactory.Apisix().V2alpha1().ApisixConsumers().Informer()
 
+	if c.cfg.Kubernetes.WatchEndpointSlices {
+		c.endpointSliceController = c.newEndpointSliceController()
+	} else {
+		c.endpointsController = c.newEndpointsController()
+	}
 	c.podController = c.newPodController()
-	c.endpointsController = c.newEndpointsController()
 	c.apisixUpstreamController = c.newApisixUpstreamController()
 	c.ingressController = c.newIngressController()
 	c.apisixRouteController = c.newApisixRouteController()
@@ -429,7 +437,11 @@ func (c *Controller) run(ctx context.Context) {
 		c.podController.run(ctx)
 	})
 	c.goAttach(func() {
-		c.endpointsController.run(ctx)
+		if c.cfg.Kubernetes.WatchEndpointSlices {
+			c.endpointSliceController.run(ctx)
+		} else {
+			c.endpointsController.run(ctx)
+		}
 	})
 	c.goAttach(func() {
 		c.apisixUpstreamController.run(ctx)
@@ -508,6 +520,85 @@ func (c *Controller) syncConsumer(ctx context.Context, consumer *apisixv1.Consum
 	}
 	return
 }
+
+func (c *Controller) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
+	namespace := ep.Namespace()
+	svcName := ep.ServiceName()
+	svc, err := c.svcLister.Services(ep.Namespace()).Get(svcName)
+	if err != nil {
+		if k8serrors.IsNotFound(err) {
+			log.Infof("service %s/%s not found", ep.Namespace(), svcName)
+			return nil
+		}
+		log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
+		return err
+	}
+	var subsets []configv1.ApisixUpstreamSubset
+	subsets = append(subsets, configv1.ApisixUpstreamSubset{})
+	au, err := c.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
+	if err != nil {
+		if !k8serrors.IsNotFound(err) {
+			log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
+			return err
+		}
+	} else if len(au.Spec.Subsets) > 0 {
+		subsets = append(subsets, au.Spec.Subsets...)
+	}
+
+	clusters := c.apisix.ListClusters()
+	for _, port := range svc.Spec.Ports {
+		for _, subset := range subsets {
+			nodes, err := c.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
+			if err != nil {
+				log.Errorw("failed to translate upstream nodes",
+					zap.Error(err),
+					zap.Any("endpoints", ep),
+					zap.Int32("port", port.Port),
+				)
+			}
+			name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
+			for _, cluster := range clusters {
+				if err := c.syncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
+					return err
+				}
+			}
+		}
+	}
+	return nil
+}
+
+func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
+	upstream, err := cluster.Upstream().Get(ctx, upsName)
+	if err != nil {
+		if err == apisixcache.ErrNotFound {
+			log.Warnw("upstream is not referenced",
+				zap.String("cluster", cluster.String()),
+				zap.String("upstream", upsName),
+			)
+			return nil
+		} else {
+			log.Errorw("failed to get upstream",
+				zap.String("upstream", upsName),
+				zap.String("cluster", cluster.String()),
+				zap.Error(err),
+			)
+			return err
+		}
+	}
+
+	upstream.Nodes = nodes
+
+	log.Debugw("upstream binds new nodes",
+		zap.Any("upstream", upstream),
+		zap.String("cluster", cluster.String()),
+	)
+
+	updated := &manifest{
+		upstreams: []*apisixv1.Upstream{upstream},
+	}
+	return c.syncManifests(ctx, nil, updated, nil)
+}
+
 func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
 	defer cancelFunc()
 	for {
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index fced1dd..c2eb236 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -16,21 +16,16 @@ package ingress
 
 import (
 	"context"
-	"github.com/apache/apisix-ingress-controller/pkg/kube"
 	"time"
 
 	"go.uber.org/zap"
 	corev1 "k8s.io/api/core/v1"
-	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/client-go/tools/cache"
 	"k8s.io/client-go/util/workqueue"
 
-	"github.com/apache/apisix-ingress-controller/pkg/apisix"
-	apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
-	configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
+	"github.com/apache/apisix-ingress-controller/pkg/kube"
 	"github.com/apache/apisix-ingress-controller/pkg/log"
 	"github.com/apache/apisix-ingress-controller/pkg/types"
-	apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
 type endpointsController struct {
@@ -89,82 +84,7 @@ func (c *endpointsController) run(ctx context.Context) {
 
 func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
 	ep := ev.Object.(kube.Endpoint)
-	namespace := ep.Namespace()
-	svcName := ep.ServiceName()
-	svc, err := c.controller.svcLister.Services(ep.Namespace()).Get(svcName)
-	if err != nil {
-		if k8serrors.IsNotFound(err) {
-			log.Infof("service %s/%s not found", ep.Namespace(), svcName)
-			return nil
-		}
-		log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
-		return err
-	}
-	var subsets []configv1.ApisixUpstreamSubset
-	subsets = append(subsets, configv1.ApisixUpstreamSubset{})
-	au, err := c.controller.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
-	if err != nil {
-		if !k8serrors.IsNotFound(err) {
-			log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
-			return err
-		}
-	} else if len(au.Spec.Subsets) > 0 {
-		subsets = append(subsets, au.Spec.Subsets...)
-	}
-
-	clusters := c.controller.apisix.ListClusters()
-	for _, port := range svc.Spec.Ports {
-		for _, subset := range subsets {
-			nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
-			if err != nil {
-				log.Errorw("failed to translate upstream nodes",
-					zap.Error(err),
-					zap.Any("endpoints", ep),
-					zap.Int32("port", port.Port),
-				)
-			}
-			name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
-			for _, cluster := range clusters {
-				if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil {
-					return err
-				}
-			}
-		}
-	}
-
-	return nil
-}
-
-func (c *endpointsController) syncToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
-	upstream, err := cluster.Upstream().Get(ctx, upsName)
-	if err != nil {
-		if err == apisixcache.ErrNotFound {
-			log.Warnw("upstream is not referenced",
-				zap.String("cluster", cluster.String()),
-				zap.String("upstream", upsName),
-			)
-			return nil
-		} else {
-			log.Errorw("failed to get upstream",
-				zap.String("upstream", upsName),
-				zap.String("cluster", cluster.String()),
-				zap.Error(err),
-			)
-			return err
-		}
-	}
-
-	upstream.Nodes = nodes
-
-	log.Debugw("upstream binds new nodes",
-		zap.Any("upstream", upstream),
-		zap.String("cluster", cluster.String()),
-	)
-
-	updated := &manifest{
-		upstreams: []*apisixv1.Upstream{upstream},
-	}
-	return c.controller.syncManifests(ctx, nil, updated, nil)
+	return c.controller.syncEndpoint(ctx, ep)
 }
 
 func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
diff --git a/pkg/ingress/endpointslice.go b/pkg/ingress/endpointslice.go
index ea5119c..c8eaa05 100644
--- a/pkg/ingress/endpointslice.go
+++ b/pkg/ingress/endpointslice.go
@@ -13,3 +13,213 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 package ingress
+
+import (
+	"context"
+	"time"
+
+	"go.uber.org/zap"
+	discoveryv1 "k8s.io/api/discovery/v1"
+	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/util/workqueue"
+
+	"github.com/apache/apisix-ingress-controller/pkg/log"
+	"github.com/apache/apisix-ingress-controller/pkg/types"
+)
+
+const (
+	_endpointSlicesManagedBy = "endpointslice-controller.k8s.io"
+)
+
+type endpointSliceEvent struct {
+	Key         string
+	ServiceName string
+}
+
+type endpointSliceController struct {
+	controller *Controller
+	workqueue  workqueue.RateLimitingInterface
+	workers    int
+}
+
+func (c *Controller) newEndpointSliceController() *endpointSliceController {
+	ctl := &endpointSliceController{
+		controller: c,
+		workqueue:  workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(time.Second, 60*time.Second, 5), "endpointSlice"),
+		workers:    1,
+	}
+
+	ctl.controller.epInformer.AddEventHandler(
+		cache.ResourceEventHandlerFuncs{
+			AddFunc:    ctl.onAdd,
+			UpdateFunc: ctl.onUpdate,
+			DeleteFunc: ctl.onDelete,
+		},
+	)
+
+	return ctl
+}
+
+func (c *endpointSliceController) run(ctx context.Context) {
+	log.Info("endpointSlice controller started")
+	defer log.Info("endpointSlice controller exited")
+	defer c.workqueue.ShutDown()
+
+	if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.epInformer.HasSynced); !ok {
+		log.Error("informers sync failed")
+		return
+	}
+
+	handler := func() {
+		for {
+			obj, shutdown := c.workqueue.Get()
+			if shutdown {
+				return
+			}
+
+			err := c.sync(ctx, obj.(*types.Event))
+			c.workqueue.Done(obj)
+			c.handleSyncErr(obj, err)
+		}
+	}
+
+	for i := 0; i < c.workers; i++ {
+		go handler()
+	}
+
+	<-ctx.Done()
+}
+
+func (c *endpointSliceController) sync(ctx context.Context, ev *types.Event) error {
+	epEvent := ev.Object.(endpointSliceEvent)
+	namespace, _, err := cache.SplitMetaNamespaceKey(epEvent.Key)
+	if err != nil {
+		log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", epEvent.Key)
+		return nil
+	}
+	ep, err := c.controller.epLister.GetEndpointSlices(namespace, epEvent.ServiceName)
+	if err != nil {
+		log.Errorf("failed to get all endpointSlices for service %s: %s",
+			epEvent.ServiceName, err)
+		return err
+	}
+	return c.controller.syncEndpoint(ctx, ep)
+}
+
+func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) {
+	if err == nil {
+		c.workqueue.Forget(obj)
+		return
+	}
+	log.Warnw("sync endpointSlice failed, will retry",
+		zap.Any("object", obj),
+	)
+	c.workqueue.AddRateLimited(obj)
+}
+
+func (c *endpointSliceController) onAdd(obj interface{}) {
+	key, err := cache.MetaNamespaceKeyFunc(obj)
+	if err != nil {
+		log.Errorf("found endpointSlice object with bad namespace")
+	}
+	if !c.controller.namespaceWatching(key) {
+		return
+	}
+	ep := obj.(*discoveryv1.EndpointSlice)
+	svcName := ep.Labels[discoveryv1.LabelServiceName]
+	if svcName == "" {
+		return
+	}
+	if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy {
+		// We only care about endpointSlice objects managed by the EndpointSlices
+		// controller.
+		return
+	}
+
+	log.Debugw("endpointSlice add event arrived",
+		zap.String("object-key", key),
+	)
+
+	c.workqueue.AddRateLimited(&types.Event{
+		Type: types.EventAdd,
+		Object: endpointSliceEvent{
+			Key:         key,
+			ServiceName: svcName,
+		},
+	})
+}
+
+func (c *endpointSliceController) onUpdate(prev, curr interface{}) {
+	prevEp := prev.(*discoveryv1.EndpointSlice)
+	currEp := curr.(*discoveryv1.EndpointSlice)
+
+	if prevEp.GetResourceVersion() == currEp.GetResourceVersion() {
+		return
+	}
+	key, err := cache.MetaNamespaceKeyFunc(currEp)
+	if err != nil {
+		log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", err)
+		return
+	}
+	if !c.controller.namespaceWatching(key) {
+		return
+	}
+	if currEp.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy {
+		// We only care about endpointSlice objects managed by the EndpointSlices
+		// controller.
+		return
+	}
+	svcName := currEp.Labels[discoveryv1.LabelServiceName]
+	if svcName == "" {
+		return
+	}
+
+	log.Debugw("endpointSlice update event arrived",
+		zap.Any("new object", currEp),
+		zap.Any("old object", prevEp),
+	)
+	c.workqueue.AddRateLimited(&types.Event{
+		Type: types.EventUpdate,
+		// TODO pass key.
+		Object: endpointSliceEvent{
+			Key:         key,
+			ServiceName: svcName,
+		},
+	})
+}
+
+func (c *endpointSliceController) onDelete(obj interface{}) {
+	ep, ok := obj.(*discoveryv1.EndpointSlice)
+	if !ok {
+		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+		if !ok {
+			log.Errorf("found endpoints: %+v in bad tombstone state", obj)
+			return
+		}
+		ep = tombstone.Obj.(*discoveryv1.EndpointSlice)
+	}
+	key, err := cache.MetaNamespaceKeyFunc(obj)
+	if err != nil {
+		log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", err)
+		return
+	}
+	if !c.controller.namespaceWatching(key) {
+		return
+	}
+	if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy {
+		// We only care about endpointSlice objects managed by the EndpointSlices
+		// controller.
+		return
+	}
+	svcName := ep.Labels[discoveryv1.LabelServiceName]
+	log.Debugw("endpoints delete event arrived",
+		zap.Any("object-key", key),
+	)
+	c.workqueue.AddRateLimited(&types.Event{
+		Type: types.EventDelete,
+		Object: endpointSliceEvent{
+			Key:         key,
+			ServiceName: svcName,
+		},
+	})
+}
diff --git a/pkg/kube/endpoint.go b/pkg/kube/endpoint.go
index 27eba84..995a73c 100644
--- a/pkg/kube/endpoint.go
+++ b/pkg/kube/endpoint.go
@@ -59,7 +59,7 @@ func (lister *endpointLister) GetEndpoint(namespace, name string) (Endpoint, err
 }
 
 func (lister *endpointLister) GetEndpointSlices(namespace, svcName string) (Endpoint, error) {
-	if lister.epsLister != nil {
+	if lister.epsLister == nil {
 		panic("not a endpointSlice lister")
 	}
 	selector := labels.SelectorFromSet(labels.Set{
@@ -174,3 +174,10 @@ func NewEndpoint(ep *corev1.Endpoints) Endpoint {
 		endpoint: ep,
 	}
 }
+
+// NewEndpointWithSlice creates an Endpoint which entity is Kubernetes EndpointSlices.
+func NewEndpointWithSlice(ep *discoveryv1.EndpointSlice) Endpoint {
+	return &endpoint{
+		endpointSlices: []*discoveryv1.EndpointSlice{ep},
+	}
+}
diff --git a/pkg/kube/translation/translator_test.go b/pkg/kube/translation/translator_test.go
index aa9bd70..f744403 100644
--- a/pkg/kube/translation/translator_test.go
+++ b/pkg/kube/translation/translator_test.go
@@ -16,9 +16,11 @@ package translation
 
 import (
 	"context"
-	"github.com/apache/apisix-ingress-controller/pkg/kube"
 	"testing"
 
+	"github.com/apache/apisix-ingress-controller/pkg/kube"
+	discoveryv1 "k8s.io/api/discovery/v1"
+
 	apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 	"github.com/stretchr/testify/assert"
 	corev1 "k8s.io/api/core/v1"
@@ -213,3 +215,132 @@ func TestTranslateUpstreamNodes(t *testing.T) {
 		},
 	})
 }
+
+func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) {
+	svc := &corev1.Service{
+		TypeMeta: metav1.TypeMeta{},
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      "svc",
+			Namespace: "test",
+		},
+		Spec: corev1.ServiceSpec{
+			Ports: []corev1.ServicePort{
+				{
+					Name: "port1",
+					Port: 80,
+					TargetPort: intstr.IntOrString{
+						Type:   intstr.Int,
+						IntVal: 9080,
+					},
+				},
+				{
+					Name: "port2",
+					Port: 443,
+					TargetPort: intstr.IntOrString{
+						Type:   intstr.Int,
+						IntVal: 9443,
+					},
+				},
+			},
+		},
+	}
+	isTrue := true
+	port1 := int32(9080)
+	port2 := int32(9443)
+	port1Name := "port1"
+	port2Name := "port2"
+	ep := &discoveryv1.EndpointSlice{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      "svc",
+			Namespace: "test",
+			Labels: map[string]string{
+				discoveryv1.LabelManagedBy:   "endpointslice-controller.k8s.io",
+				discoveryv1.LabelServiceName: "svc",
+			},
+		},
+		AddressType: discoveryv1.AddressTypeIPv4,
+		Endpoints: []discoveryv1.Endpoint{
+			{
+				Addresses: []string{
+					"192.168.1.1",
+					"192.168.1.2",
+				},
+				Conditions: discoveryv1.EndpointConditions{
+					Ready: &isTrue,
+				},
+			},
+		},
+		Ports: []discoveryv1.EndpointPort{
+			{
+				Name: &port1Name,
+				Port: &port1,
+			},
+			{
+				Name: &port2Name,
+				Port: &port2,
+			},
+		},
+	}
+
+	client := fake.NewSimpleClientset()
+	informersFactory := informers.NewSharedInformerFactory(client, 0)
+	svcInformer := informersFactory.Core().V1().Services().Informer()
+	svcLister := informersFactory.Core().V1().Services().Lister()
+
+	processCh := make(chan struct{})
+	svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+		AddFunc: func(obj interface{}) {
+			processCh <- struct{}{}
+		},
+	})
+
+	stopCh := make(chan struct{})
+	defer close(stopCh)
+	go svcInformer.Run(stopCh)
+	cache.WaitForCacheSync(stopCh, svcInformer.HasSynced)
+
+	_, err := client.CoreV1().Services("test").Create(context.Background(), svc, metav1.CreateOptions{})
+	assert.Nil(t, err)
+
+	tr := &translator{&TranslatorOptions{
+		ServiceLister: svcLister,
+	}}
+	<-processCh
+
+	nodes, err := tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 10080, nil)
+	assert.Nil(t, nodes)
+	assert.Equal(t, err, &translateError{
+		field:  "service.spec.ports",
+		reason: "port not defined",
+	})
+
+	nodes, err = tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 80, nil)
+	assert.Nil(t, err)
+	assert.Equal(t, nodes, apisixv1.UpstreamNodes{
+		{
+			Host:   "192.168.1.1",
+			Port:   9080,
+			Weight: 100,
+		},
+		{
+			Host:   "192.168.1.2",
+			Port:   9080,
+			Weight: 100,
+		},
+	})
+
+	nodes, err = tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 443, nil)
+	assert.Nil(t, err)
+	assert.Equal(t, nodes, apisixv1.UpstreamNodes{
+		{
+			Host:   "192.168.1.1",
+			Port:   9443,
+			Weight: 100,
+		},
+		{
+			Host:   "192.168.1.2",
+			Port:   9443,
+			Weight: 100,
+		},
+	})
+}
diff --git a/samples/deploy/rbac/apisix_view_clusterrole.yaml b/samples/deploy/rbac/apisix_view_clusterrole.yaml
index ef9d342..7a9ff16 100644
--- a/samples/deploy/rbac/apisix_view_clusterrole.yaml
+++ b/samples/deploy/rbac/apisix_view_clusterrole.yaml
@@ -157,3 +157,11 @@ rules:
   - leases
   verbs:
   - '*'
+- apiGroups:
+    - discovery.k8s.io
+  resources:
+    - endpointslices
+  verbs:
+    - get
+    - list
+    - watch
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 578907a..2b97082 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -168,6 +168,14 @@ rules:
     - leases
     verbs:
     - '*'
+  - apiGroups:
+    - discovery.k8s.io
+    resources:
+    - endpointslices
+    verbs:
+    - get
+    - list
+    - watch
 `
 	_clusterRoleBinding = `
 apiVersion: rbac.authorization.k8s.io/v1
@@ -256,6 +264,7 @@ spec:
             - %s,kube-system
             - --apisix-route-version
             - %s
+            - --watch-endpointslices
       serviceAccount: ingress-apisix-e2e-test-service-account
 `
 )