You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/05/31 02:59:35 UTC

[apisix-ingress-controller] branch master updated: feat: subset translation (#497)

This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new a89be23  feat: subset translation (#497)
a89be23 is described below

commit a89be230989ea62d03062181626cc197df655a78
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Mon May 31 10:59:26 2021 +0800

    feat: subset translation (#497)
---
 pkg/kube/apisix/apis/config/v2alpha1/types.go |  6 +++++
 pkg/kube/translation/apisix_route.go          |  9 +++----
 pkg/kube/translation/ingress.go               |  4 +--
 pkg/kube/translation/plugin.go                |  2 +-
 pkg/kube/translation/translator.go            | 31 +++++++++++++++++++++---
 pkg/kube/translation/util.go                  | 35 +++++++++++++++++++++++++--
 samples/deploy/crd/v1beta1/ApisixRoute.yaml   |  6 +++++
 7 files changed, 79 insertions(+), 14 deletions(-)

diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index 1fdf482..c978e92 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -174,6 +174,9 @@ type ApisixRouteHTTPBackend struct {
 	ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
 	// Weight of this backend.
 	Weight *int `json:"weight" yaml:"weight"`
+	// Subset specifies a subset for the target Service. The subset should be pre-defined
+	// in ApisixUpstream about this service.
+	Subset string `json:"subset" yaml:"subset"`
 }
 
 // ApisixRouteHTTPPlugin represents an APISIX plugin.
@@ -232,6 +235,9 @@ type ApisixRouteTCPBackend struct {
 	// wise, the service ClusterIP or ExternalIP will be used,
 	// default is endpoints.
 	ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
+	// Subset specifies a subset for the target Service. The subset should be pre-defined
+	// in ApisixUpstream about this service.
+	Subset string `json:"subset" yaml:"subset"`
 }
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go
index dceda4f..2360d72 100644
--- a/pkg/kube/translation/apisix_route.go
+++ b/pkg/kube/translation/apisix_route.go
@@ -64,7 +64,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) (*TranslateConte
 			route.UpstreamId = id.GenID(upstreamName)
 
 			if !ctx.checkUpstreamExist(upstreamName) {
-				ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, int32(p.Backend.ServicePort))
+				ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, "", int32(p.Backend.ServicePort))
 				if err != nil {
 					return nil, err
 				}
@@ -181,7 +181,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha
 		}
 		ctx.addRoute(route)
 		if !ctx.checkUpstreamExist(upstreamName) {
-			ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+			ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
 			if err != nil {
 				return err
 			}
@@ -312,10 +312,7 @@ func (t *translator) translateTCPRoute(ctx *TranslateContext, ar *configv2alpha1
 		name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
 		sr.ID = id.GenID(name)
 		sr.ServerPort = part.Match.IngressPort
-		// TODO use upstream id to refer the upstream object.
-		// Currently, APISIX doesn't use upstream_id field in
-		// APISIX, so we have to embed the entire upstream.
-		ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+		ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
 		if err != nil {
 			return err
 		}
diff --git a/pkg/kube/translation/ingress.go b/pkg/kube/translation/ingress.go
index ab0ea6f..5169b68 100644
--- a/pkg/kube/translation/ingress.go
+++ b/pkg/kube/translation/ingress.go
@@ -169,7 +169,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n
 	} else {
 		svcPort = backend.Port.Number
 	}
-	ups, err := t.TranslateUpstream(namespace, backend.Name, svcPort)
+	ups, err := t.TranslateUpstream(namespace, backend.Name, "", svcPort)
 	if err != nil {
 		return nil, err
 	}
@@ -260,7 +260,7 @@ func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcNa
 	} else {
 		portNumber = svcPort.IntVal
 	}
-	ups, err := t.TranslateUpstream(namespace, svcName, portNumber)
+	ups, err := t.TranslateUpstream(namespace, svcName, "", portNumber)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go
index d58379d..3cc6490 100644
--- a/pkg/kube/translation/plugin.go
+++ b/pkg/kube/translation/plugin.go
@@ -38,7 +38,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ar *conf
 		if err != nil {
 			return nil, err
 		}
-		ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+		ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
 		if err != nil {
 			return nil, err
 		}
diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go
index e15a6f7..aed3251 100644
--- a/pkg/kube/translation/translator.go
+++ b/pkg/kube/translation/translator.go
@@ -25,6 +25,7 @@ import (
 	configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
 	configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
 	listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
+	"github.com/apache/apisix-ingress-controller/pkg/types"
 	apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
@@ -54,7 +55,11 @@ type Translator interface {
 	// The returned Upstream doesn't have metadata info.
 	// It doesn't assign any metadata fields, so it's caller's responsibility to decide
 	// the metadata.
-	TranslateUpstream(string, string, int32) (*apisixv1.Upstream, error)
+	// Note the subset is used to filter the ultimate node list, only pods whose labels
+	// matching the subset labels (defined in ApisixUpstream) will be selected.
+	// When the subset is not found, the node list will be empty. When the subset is empty,
+	// all pods IP will be filled.
+	TranslateUpstream(string, string, string, int32) (*apisixv1.Upstream, error)
 	// TranslateIngress composes a couple of APISIX Routes and upstreams according
 	// to the given Ingress resource.
 	TranslateIngress(kube.Ingress) (*TranslateContext, error)
@@ -77,6 +82,7 @@ type Translator interface {
 // TranslatorOptions contains options to help Translator
 // work well.
 type TranslatorOptions struct {
+	PodCache             types.PodCache
 	PodLister            listerscorev1.PodLister
 	EndpointsLister      listerscorev1.EndpointsLister
 	ServiceLister        listerscorev1.ServiceLister
@@ -112,7 +118,7 @@ func (t *translator) TranslateUpstreamConfig(au *configv1.ApisixUpstreamConfig)
 	return ups, nil
 }
 
-func (t *translator) TranslateUpstream(namespace, name string, port int32) (*apisixv1.Upstream, error) {
+func (t *translator) TranslateUpstream(namespace, name, subset string, port int32) (*apisixv1.Upstream, error) {
 	endpoints, err := t.EndpointsLister.Endpoints(namespace).Get(name)
 	if err != nil {
 		return nil, &translateError{
@@ -128,7 +134,13 @@ func (t *translator) TranslateUpstream(namespace, name string, port int32) (*api
 	au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
 	if err != nil {
 		if k8serrors.IsNotFound(err) {
-			ups.Nodes = nodes
+			// If subset in ApisixRoute is not empty but the ApisixUpstream resouce not found,
+			// just set an empty node list.
+			if subset != "" {
+				ups.Nodes = apisixv1.UpstreamNodes{}
+			} else {
+				ups.Nodes = nodes
+			}
 			return ups, nil
 		}
 		return nil, &translateError{
@@ -136,6 +148,19 @@ func (t *translator) TranslateUpstream(namespace, name string, port int32) (*api
 			reason: err.Error(),
 		}
 	}
+
+	// Filter nodes by subset.
+	if subset != "" {
+		var labels types.Labels
+		for _, ss := range au.Spec.Subsets {
+			if ss.Name == subset {
+				labels = ss.Labels
+				break
+			}
+		}
+		nodes = t.filterNodesByLabels(nodes, labels, au.Namespace)
+	}
+
 	upsCfg := &au.Spec.ApisixUpstreamConfig
 	for _, pls := range au.Spec.PortLevelSettings {
 		if pls.Port == port {
diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go
index 16ced5a..3584205 100644
--- a/pkg/kube/translation/util.go
+++ b/pkg/kube/translation/util.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/apisix-ingress-controller/pkg/id"
 	configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
 	"github.com/apache/apisix-ingress-controller/pkg/log"
+	"github.com/apache/apisix-ingress-controller/pkg/types"
 	apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
@@ -109,8 +110,8 @@ loop:
 	return svc.Spec.ClusterIP, svcPort, nil
 }
 
-func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
-	ups, err := t.TranslateUpstream(namespace, svcName, svcPort)
+func (t *translator) translateUpstream(namespace, svcName, subset, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
+	ups, err := t.TranslateUpstream(namespace, svcName, subset, svcPort)
 	if err != nil {
 		return nil, err
 	}
@@ -128,6 +129,36 @@ func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity
 	return ups, nil
 }
 
+func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels types.Labels, namespace string) apisixv1.UpstreamNodes {
+	if labels == nil {
+		return nodes
+	}
+
+	var filteredNodes apisixv1.UpstreamNodes
+	for _, node := range nodes {
+		podName, err := t.PodCache.GetNameByIP(node.Host)
+		if err != nil {
+			log.Errorw("failed to find pod name by ip, ignore it",
+				zap.Error(err),
+				zap.String("pod_ip", node.Host),
+			)
+			continue
+		}
+		pod, err := t.PodLister.Pods(namespace).Get(podName)
+		if err != nil {
+			log.Errorw("failed to find pod, ignore it",
+				zap.Error(err),
+				zap.String("pod_name", podName),
+			)
+			continue
+		}
+		if labels.IsSubsetOf(pod.Labels) {
+			filteredNodes = append(filteredNodes, node)
+		}
+	}
+	return filteredNodes
+}
+
 func validateRemoteAddrs(remoteAddrs []string) error {
 	for _, addr := range remoteAddrs {
 		if ip := net.ParseIP(addr); ip == nil {
diff --git a/samples/deploy/crd/v1beta1/ApisixRoute.yaml b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
index 89f4e1a..974795e 100644
--- a/samples/deploy/crd/v1beta1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
@@ -166,6 +166,8 @@ spec:
                       weight:
                         type: integer
                         minimum: 0
+                      subset:
+                        type: string
                     required:
                       - serviceName
                       - servicePort
@@ -188,6 +190,8 @@ spec:
                         weight:
                           type: integer
                           minimum: 0
+                        subset:
+                          type: string
                     required:
                       - serviceName
                       - servicePort
@@ -238,6 +242,8 @@ spec:
                       resolveGranualrity:
                         type: string
                         enum: ["endpoint", "service"]
+                      subset:
+                        type: string
                     required:
                       - serviceName
                       - servicePort