You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/07/10 14:31:35 UTC
[apisix-ingress-controller] branch master updated: chore:
endpointslice controller (#574)
This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 67f3fd9 chore: endpointslice controller (#574)
67f3fd9 is described below
commit 67f3fd934b8a8b935440227a5c8ba7923ba91a2a
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Sat Jul 10 22:31:29 2021 +0800
chore: endpointslice controller (#574)
---
README.md | 2 +-
cmd/ingress/ingress.go | 1 +
conf/config-default.yaml | 1 +
docs/en/latest/FAQ.md | 2 +-
pkg/ingress/controller.go | 103 ++++++++++-
pkg/ingress/endpoint.go | 84 +--------
pkg/ingress/endpointslice.go | 210 +++++++++++++++++++++++
pkg/kube/endpoint.go | 9 +-
pkg/kube/translation/translator_test.go | 133 +++++++++++++-
samples/deploy/rbac/apisix_view_clusterrole.yaml | 8 +
test/e2e/scaffold/ingress.go | 9 +
11 files changed, 470 insertions(+), 92 deletions(-)
diff --git a/README.md b/README.md
index 22b06db..df46172 100644
--- a/README.md
+++ b/README.md
@@ -58,7 +58,7 @@ This project is currently general availability.
## Prerequisites
-Apisix ingress controller requires Kubernetes version 1.14+.
+Apisix ingress controller requires Kubernetes version 1.15+.
## Apache APISIX Ingress vs. Kubernetes Ingress Nginx
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index df931ef..de22508 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -147,6 +147,7 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, "apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute api group version, can be \"apisix.apache.org/v1\" or \"apisix.apache.org/v2alpha1\"")
+ cmd.PersistentFlags().BoolVar(&cfg.Kubernetes.WatchEndpointSlices, "watch-endpointslices", false, "whether to watch endpointslices rather than endpoints")
cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api (deprecated, using --default-apisix-cluster-base-url instead)")
cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api (deprecated, using --default-apisix-cluster-admin-key instead)")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster")
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 78c4f5d..b6ec625 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -50,6 +50,7 @@ kubernetes:
ingress_version: "networking/v1" # the supported ingress api group version, can be "networking/v1beta1"
# , "networking/v1" (for Kubernetes version v1.19.0 or higher), and
# "extensions/v1beta1", default is "networking/v1".
+ watch_endpointslices: false # whether to watch EndpointSlices rather than Endpoints.
apisix_route_version: "apisix.apache.org/v2alpha1" # the supported apisixroute api group version, can be
# "apisix.apache.org/v1" or "apisix.apache.org/v2alpha1",
diff --git a/docs/en/latest/FAQ.md b/docs/en/latest/FAQ.md
index 7c92f9a..8786def 100644
--- a/docs/en/latest/FAQ.md
+++ b/docs/en/latest/FAQ.md
@@ -49,7 +49,7 @@ Tips: The failure caused by empty upstream nodes is a limitation of Apache APISI
6. What is the retry rule of `apisix-ingress-controller`?
-If an error occurs during the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered.
+If an error occurs duriREADME.mdng the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered.
The delayed retry method is adopted. After the first failure, it is retried once per second. After 5 retries are triggered, the slow retry strategy will be enabled, and the retry will be performed every 1 minute until it succeeds.
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 7694f0b..3c3d566 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -21,8 +21,11 @@ import (
"sync"
"time"
+ apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
+ configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -107,10 +110,11 @@ type Controller struct {
apisixConsumerLister listersv2alpha1.ApisixConsumerLister
// resource controllers
- podController *podController
- endpointsController *endpointsController
- ingressController *ingressController
- secretController *secretController
+ podController *podController
+ endpointsController *endpointsController
+ endpointSliceController *endpointSliceController
+ ingressController *ingressController
+ secretController *secretController
apisixUpstreamController *apisixUpstreamController
apisixRouteController *apisixRouteController
@@ -237,8 +241,12 @@ func (c *Controller) initWhenStartLeading() {
c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer()
c.apisixConsumerInformer = apisixFactory.Apisix().V2alpha1().ApisixConsumers().Informer()
+ if c.cfg.Kubernetes.WatchEndpointSlices {
+ c.endpointSliceController = c.newEndpointSliceController()
+ } else {
+ c.endpointsController = c.newEndpointsController()
+ }
c.podController = c.newPodController()
- c.endpointsController = c.newEndpointsController()
c.apisixUpstreamController = c.newApisixUpstreamController()
c.ingressController = c.newIngressController()
c.apisixRouteController = c.newApisixRouteController()
@@ -429,7 +437,11 @@ func (c *Controller) run(ctx context.Context) {
c.podController.run(ctx)
})
c.goAttach(func() {
- c.endpointsController.run(ctx)
+ if c.cfg.Kubernetes.WatchEndpointSlices {
+ c.endpointSliceController.run(ctx)
+ } else {
+ c.endpointsController.run(ctx)
+ }
})
c.goAttach(func() {
c.apisixUpstreamController.run(ctx)
@@ -508,6 +520,85 @@ func (c *Controller) syncConsumer(ctx context.Context, consumer *apisixv1.Consum
}
return
}
+
+func (c *Controller) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
+ namespace := ep.Namespace()
+ svcName := ep.ServiceName()
+ svc, err := c.svcLister.Services(ep.Namespace()).Get(svcName)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Infof("service %s/%s not found", ep.Namespace(), svcName)
+ return nil
+ }
+ log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
+ return err
+ }
+ var subsets []configv1.ApisixUpstreamSubset
+ subsets = append(subsets, configv1.ApisixUpstreamSubset{})
+ au, err := c.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
+ if err != nil {
+ if !k8serrors.IsNotFound(err) {
+ log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
+ return err
+ }
+ } else if len(au.Spec.Subsets) > 0 {
+ subsets = append(subsets, au.Spec.Subsets...)
+ }
+
+ clusters := c.apisix.ListClusters()
+ for _, port := range svc.Spec.Ports {
+ for _, subset := range subsets {
+ nodes, err := c.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
+ if err != nil {
+ log.Errorw("failed to translate upstream nodes",
+ zap.Error(err),
+ zap.Any("endpoints", ep),
+ zap.Int32("port", port.Port),
+ )
+ }
+ name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
+ for _, cluster := range clusters {
+ if err := c.syncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
+ upstream, err := cluster.Upstream().Get(ctx, upsName)
+ if err != nil {
+ if err == apisixcache.ErrNotFound {
+ log.Warnw("upstream is not referenced",
+ zap.String("cluster", cluster.String()),
+ zap.String("upstream", upsName),
+ )
+ return nil
+ } else {
+ log.Errorw("failed to get upstream",
+ zap.String("upstream", upsName),
+ zap.String("cluster", cluster.String()),
+ zap.Error(err),
+ )
+ return err
+ }
+ }
+
+ upstream.Nodes = nodes
+
+ log.Debugw("upstream binds new nodes",
+ zap.Any("upstream", upstream),
+ zap.String("cluster", cluster.String()),
+ )
+
+ updated := &manifest{
+ upstreams: []*apisixv1.Upstream{upstream},
+ }
+ return c.syncManifests(ctx, nil, updated, nil)
+}
+
func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
defer cancelFunc()
for {
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index fced1dd..c2eb236 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -16,21 +16,16 @@ package ingress
import (
"context"
- "github.com/apache/apisix-ingress-controller/pkg/kube"
"time"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
- "github.com/apache/apisix-ingress-controller/pkg/apisix"
- apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
- configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
+ "github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
- apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
type endpointsController struct {
@@ -89,82 +84,7 @@ func (c *endpointsController) run(ctx context.Context) {
func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
ep := ev.Object.(kube.Endpoint)
- namespace := ep.Namespace()
- svcName := ep.ServiceName()
- svc, err := c.controller.svcLister.Services(ep.Namespace()).Get(svcName)
- if err != nil {
- if k8serrors.IsNotFound(err) {
- log.Infof("service %s/%s not found", ep.Namespace(), svcName)
- return nil
- }
- log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
- return err
- }
- var subsets []configv1.ApisixUpstreamSubset
- subsets = append(subsets, configv1.ApisixUpstreamSubset{})
- au, err := c.controller.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
- if err != nil {
- if !k8serrors.IsNotFound(err) {
- log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
- return err
- }
- } else if len(au.Spec.Subsets) > 0 {
- subsets = append(subsets, au.Spec.Subsets...)
- }
-
- clusters := c.controller.apisix.ListClusters()
- for _, port := range svc.Spec.Ports {
- for _, subset := range subsets {
- nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
- if err != nil {
- log.Errorw("failed to translate upstream nodes",
- zap.Error(err),
- zap.Any("endpoints", ep),
- zap.Int32("port", port.Port),
- )
- }
- name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
- for _, cluster := range clusters {
- if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil {
- return err
- }
- }
- }
- }
-
- return nil
-}
-
-func (c *endpointsController) syncToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
- upstream, err := cluster.Upstream().Get(ctx, upsName)
- if err != nil {
- if err == apisixcache.ErrNotFound {
- log.Warnw("upstream is not referenced",
- zap.String("cluster", cluster.String()),
- zap.String("upstream", upsName),
- )
- return nil
- } else {
- log.Errorw("failed to get upstream",
- zap.String("upstream", upsName),
- zap.String("cluster", cluster.String()),
- zap.Error(err),
- )
- return err
- }
- }
-
- upstream.Nodes = nodes
-
- log.Debugw("upstream binds new nodes",
- zap.Any("upstream", upstream),
- zap.String("cluster", cluster.String()),
- )
-
- updated := &manifest{
- upstreams: []*apisixv1.Upstream{upstream},
- }
- return c.controller.syncManifests(ctx, nil, updated, nil)
+ return c.controller.syncEndpoint(ctx, ep)
}
func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
diff --git a/pkg/ingress/endpointslice.go b/pkg/ingress/endpointslice.go
index ea5119c..c8eaa05 100644
--- a/pkg/ingress/endpointslice.go
+++ b/pkg/ingress/endpointslice.go
@@ -13,3 +13,213 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress
+
+import (
+ "context"
+ "time"
+
+ "go.uber.org/zap"
+ discoveryv1 "k8s.io/api/discovery/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+
+ "github.com/apache/apisix-ingress-controller/pkg/log"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
+)
+
+const (
+ _endpointSlicesManagedBy = "endpointslice-controller.k8s.io"
+)
+
+type endpointSliceEvent struct {
+ Key string
+ ServiceName string
+}
+
+type endpointSliceController struct {
+ controller *Controller
+ workqueue workqueue.RateLimitingInterface
+ workers int
+}
+
+func (c *Controller) newEndpointSliceController() *endpointSliceController {
+ ctl := &endpointSliceController{
+ controller: c,
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(time.Second, 60*time.Second, 5), "endpointSlice"),
+ workers: 1,
+ }
+
+ ctl.controller.epInformer.AddEventHandler(
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: ctl.onAdd,
+ UpdateFunc: ctl.onUpdate,
+ DeleteFunc: ctl.onDelete,
+ },
+ )
+
+ return ctl
+}
+
+func (c *endpointSliceController) run(ctx context.Context) {
+ log.Info("endpointSlice controller started")
+ defer log.Info("endpointSlice controller exited")
+ defer c.workqueue.ShutDown()
+
+ if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.epInformer.HasSynced); !ok {
+ log.Error("informers sync failed")
+ return
+ }
+
+ handler := func() {
+ for {
+ obj, shutdown := c.workqueue.Get()
+ if shutdown {
+ return
+ }
+
+ err := c.sync(ctx, obj.(*types.Event))
+ c.workqueue.Done(obj)
+ c.handleSyncErr(obj, err)
+ }
+ }
+
+ for i := 0; i < c.workers; i++ {
+ go handler()
+ }
+
+ <-ctx.Done()
+}
+
+func (c *endpointSliceController) sync(ctx context.Context, ev *types.Event) error {
+ epEvent := ev.Object.(endpointSliceEvent)
+ namespace, _, err := cache.SplitMetaNamespaceKey(epEvent.Key)
+ if err != nil {
+ log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", epEvent.Key)
+ return nil
+ }
+ ep, err := c.controller.epLister.GetEndpointSlices(namespace, epEvent.ServiceName)
+ if err != nil {
+ log.Errorf("failed to get all endpointSlices for service %s: %s",
+ epEvent.ServiceName, err)
+ return err
+ }
+ return c.controller.syncEndpoint(ctx, ep)
+}
+
+func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) {
+ if err == nil {
+ c.workqueue.Forget(obj)
+ return
+ }
+ log.Warnw("sync endpointSlice failed, will retry",
+ zap.Any("object", obj),
+ )
+ c.workqueue.AddRateLimited(obj)
+}
+
+func (c *endpointSliceController) onAdd(obj interface{}) {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorf("found endpointSlice object with bad namespace")
+ }
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
+ ep := obj.(*discoveryv1.EndpointSlice)
+ svcName := ep.Labels[discoveryv1.LabelServiceName]
+ if svcName == "" {
+ return
+ }
+ if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy {
+ // We only care about endpointSlice objects managed by the EndpointSlices
+ // controller.
+ return
+ }
+
+ log.Debugw("endpointSlice add event arrived",
+ zap.String("object-key", key),
+ )
+
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventAdd,
+ Object: endpointSliceEvent{
+ Key: key,
+ ServiceName: svcName,
+ },
+ })
+}
+
+func (c *endpointSliceController) onUpdate(prev, curr interface{}) {
+ prevEp := prev.(*discoveryv1.EndpointSlice)
+ currEp := curr.(*discoveryv1.EndpointSlice)
+
+ if prevEp.GetResourceVersion() == currEp.GetResourceVersion() {
+ return
+ }
+ key, err := cache.MetaNamespaceKeyFunc(currEp)
+ if err != nil {
+ log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", err)
+ return
+ }
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
+ if currEp.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy {
+ // We only care about endpointSlice objects managed by the EndpointSlices
+ // controller.
+ return
+ }
+ svcName := currEp.Labels[discoveryv1.LabelServiceName]
+ if svcName == "" {
+ return
+ }
+
+ log.Debugw("endpointSlice update event arrived",
+ zap.Any("new object", currEp),
+ zap.Any("old object", prevEp),
+ )
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventUpdate,
+ // TODO pass key.
+ Object: endpointSliceEvent{
+ Key: key,
+ ServiceName: svcName,
+ },
+ })
+}
+
+func (c *endpointSliceController) onDelete(obj interface{}) {
+ ep, ok := obj.(*discoveryv1.EndpointSlice)
+ if !ok {
+ tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ log.Errorf("found endpoints: %+v in bad tombstone state", obj)
+ return
+ }
+ ep = tombstone.Obj.(*discoveryv1.EndpointSlice)
+ }
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", err)
+ return
+ }
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
+ if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy {
+ // We only care about endpointSlice objects managed by the EndpointSlices
+ // controller.
+ return
+ }
+ svcName := ep.Labels[discoveryv1.LabelServiceName]
+ log.Debugw("endpoints delete event arrived",
+ zap.Any("object-key", key),
+ )
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventDelete,
+ Object: endpointSliceEvent{
+ Key: key,
+ ServiceName: svcName,
+ },
+ })
+}
diff --git a/pkg/kube/endpoint.go b/pkg/kube/endpoint.go
index 27eba84..995a73c 100644
--- a/pkg/kube/endpoint.go
+++ b/pkg/kube/endpoint.go
@@ -59,7 +59,7 @@ func (lister *endpointLister) GetEndpoint(namespace, name string) (Endpoint, err
}
func (lister *endpointLister) GetEndpointSlices(namespace, svcName string) (Endpoint, error) {
- if lister.epsLister != nil {
+ if lister.epsLister == nil {
panic("not a endpointSlice lister")
}
selector := labels.SelectorFromSet(labels.Set{
@@ -174,3 +174,10 @@ func NewEndpoint(ep *corev1.Endpoints) Endpoint {
endpoint: ep,
}
}
+
+// NewEndpointWithSlice creates an Endpoint which entity is Kubernetes EndpointSlices.
+func NewEndpointWithSlice(ep *discoveryv1.EndpointSlice) Endpoint {
+ return &endpoint{
+ endpointSlices: []*discoveryv1.EndpointSlice{ep},
+ }
+}
diff --git a/pkg/kube/translation/translator_test.go b/pkg/kube/translation/translator_test.go
index aa9bd70..f744403 100644
--- a/pkg/kube/translation/translator_test.go
+++ b/pkg/kube/translation/translator_test.go
@@ -16,9 +16,11 @@ package translation
import (
"context"
- "github.com/apache/apisix-ingress-controller/pkg/kube"
"testing"
+ "github.com/apache/apisix-ingress-controller/pkg/kube"
+ discoveryv1 "k8s.io/api/discovery/v1"
+
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
@@ -213,3 +215,132 @@ func TestTranslateUpstreamNodes(t *testing.T) {
},
})
}
+
+func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) {
+ svc := &corev1.Service{
+ TypeMeta: metav1.TypeMeta{},
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "svc",
+ Namespace: "test",
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {
+ Name: "port1",
+ Port: 80,
+ TargetPort: intstr.IntOrString{
+ Type: intstr.Int,
+ IntVal: 9080,
+ },
+ },
+ {
+ Name: "port2",
+ Port: 443,
+ TargetPort: intstr.IntOrString{
+ Type: intstr.Int,
+ IntVal: 9443,
+ },
+ },
+ },
+ },
+ }
+ isTrue := true
+ port1 := int32(9080)
+ port2 := int32(9443)
+ port1Name := "port1"
+ port2Name := "port2"
+ ep := &discoveryv1.EndpointSlice{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "svc",
+ Namespace: "test",
+ Labels: map[string]string{
+ discoveryv1.LabelManagedBy: "endpointslice-controller.k8s.io",
+ discoveryv1.LabelServiceName: "svc",
+ },
+ },
+ AddressType: discoveryv1.AddressTypeIPv4,
+ Endpoints: []discoveryv1.Endpoint{
+ {
+ Addresses: []string{
+ "192.168.1.1",
+ "192.168.1.2",
+ },
+ Conditions: discoveryv1.EndpointConditions{
+ Ready: &isTrue,
+ },
+ },
+ },
+ Ports: []discoveryv1.EndpointPort{
+ {
+ Name: &port1Name,
+ Port: &port1,
+ },
+ {
+ Name: &port2Name,
+ Port: &port2,
+ },
+ },
+ }
+
+ client := fake.NewSimpleClientset()
+ informersFactory := informers.NewSharedInformerFactory(client, 0)
+ svcInformer := informersFactory.Core().V1().Services().Informer()
+ svcLister := informersFactory.Core().V1().Services().Lister()
+
+ processCh := make(chan struct{})
+ svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ processCh <- struct{}{}
+ },
+ })
+
+ stopCh := make(chan struct{})
+ defer close(stopCh)
+ go svcInformer.Run(stopCh)
+ cache.WaitForCacheSync(stopCh, svcInformer.HasSynced)
+
+ _, err := client.CoreV1().Services("test").Create(context.Background(), svc, metav1.CreateOptions{})
+ assert.Nil(t, err)
+
+ tr := &translator{&TranslatorOptions{
+ ServiceLister: svcLister,
+ }}
+ <-processCh
+
+ nodes, err := tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 10080, nil)
+ assert.Nil(t, nodes)
+ assert.Equal(t, err, &translateError{
+ field: "service.spec.ports",
+ reason: "port not defined",
+ })
+
+ nodes, err = tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 80, nil)
+ assert.Nil(t, err)
+ assert.Equal(t, nodes, apisixv1.UpstreamNodes{
+ {
+ Host: "192.168.1.1",
+ Port: 9080,
+ Weight: 100,
+ },
+ {
+ Host: "192.168.1.2",
+ Port: 9080,
+ Weight: 100,
+ },
+ })
+
+ nodes, err = tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 443, nil)
+ assert.Nil(t, err)
+ assert.Equal(t, nodes, apisixv1.UpstreamNodes{
+ {
+ Host: "192.168.1.1",
+ Port: 9443,
+ Weight: 100,
+ },
+ {
+ Host: "192.168.1.2",
+ Port: 9443,
+ Weight: 100,
+ },
+ })
+}
diff --git a/samples/deploy/rbac/apisix_view_clusterrole.yaml b/samples/deploy/rbac/apisix_view_clusterrole.yaml
index ef9d342..7a9ff16 100644
--- a/samples/deploy/rbac/apisix_view_clusterrole.yaml
+++ b/samples/deploy/rbac/apisix_view_clusterrole.yaml
@@ -157,3 +157,11 @@ rules:
- leases
verbs:
- '*'
+- apiGroups:
+ - discovery.k8s.io
+ resources:
+ - endpointslices
+ verbs:
+ - get
+ - list
+ - watch
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 578907a..2b97082 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -168,6 +168,14 @@ rules:
- leases
verbs:
- '*'
+ - apiGroups:
+ - discovery.k8s.io
+ resources:
+ - endpointslices
+ verbs:
+ - get
+ - list
+ - watch
`
_clusterRoleBinding = `
apiVersion: rbac.authorization.k8s.io/v1
@@ -256,6 +264,7 @@ spec:
- %s,kube-system
- --apisix-route-version
- %s
+ - --watch-endpointslices
serviceAccount: ingress-apisix-e2e-test-service-account
`
)