You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/05/31 02:59:35 UTC
[apisix-ingress-controller] branch master updated: feat: subset
translation (#497)
This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new a89be23 feat: subset translation (#497)
a89be23 is described below
commit a89be230989ea62d03062181626cc197df655a78
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Mon May 31 10:59:26 2021 +0800
feat: subset translation (#497)
---
pkg/kube/apisix/apis/config/v2alpha1/types.go | 6 +++++
pkg/kube/translation/apisix_route.go | 9 +++----
pkg/kube/translation/ingress.go | 4 +--
pkg/kube/translation/plugin.go | 2 +-
pkg/kube/translation/translator.go | 31 +++++++++++++++++++++---
pkg/kube/translation/util.go | 35 +++++++++++++++++++++++++--
samples/deploy/crd/v1beta1/ApisixRoute.yaml | 6 +++++
7 files changed, 79 insertions(+), 14 deletions(-)
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index 1fdf482..c978e92 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -174,6 +174,9 @@ type ApisixRouteHTTPBackend struct {
ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
// Weight of this backend.
Weight *int `json:"weight" yaml:"weight"`
+ // Subset specifies a subset for the target Service. The subset should be pre-defined
+ // in ApisixUpstream about this service.
+ Subset string `json:"subset" yaml:"subset"`
}
// ApisixRouteHTTPPlugin represents an APISIX plugin.
@@ -232,6 +235,9 @@ type ApisixRouteTCPBackend struct {
// wise, the service ClusterIP or ExternalIP will be used,
// default is endpoints.
ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
+ // Subset specifies a subset for the target Service. The subset should be pre-defined
+ // in ApisixUpstream about this service.
+ Subset string `json:"subset" yaml:"subset"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go
index dceda4f..2360d72 100644
--- a/pkg/kube/translation/apisix_route.go
+++ b/pkg/kube/translation/apisix_route.go
@@ -64,7 +64,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) (*TranslateConte
route.UpstreamId = id.GenID(upstreamName)
if !ctx.checkUpstreamExist(upstreamName) {
- ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, int32(p.Backend.ServicePort))
+ ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, "", int32(p.Backend.ServicePort))
if err != nil {
return nil, err
}
@@ -181,7 +181,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha
}
ctx.addRoute(route)
if !ctx.checkUpstreamExist(upstreamName) {
- ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+ ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
@@ -312,10 +312,7 @@ func (t *translator) translateTCPRoute(ctx *TranslateContext, ar *configv2alpha1
name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
sr.ID = id.GenID(name)
sr.ServerPort = part.Match.IngressPort
- // TODO use upstream id to refer the upstream object.
- // Currently, APISIX doesn't use upstream_id field in
- // APISIX, so we have to embed the entire upstream.
- ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+ ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
diff --git a/pkg/kube/translation/ingress.go b/pkg/kube/translation/ingress.go
index ab0ea6f..5169b68 100644
--- a/pkg/kube/translation/ingress.go
+++ b/pkg/kube/translation/ingress.go
@@ -169,7 +169,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n
} else {
svcPort = backend.Port.Number
}
- ups, err := t.TranslateUpstream(namespace, backend.Name, svcPort)
+ ups, err := t.TranslateUpstream(namespace, backend.Name, "", svcPort)
if err != nil {
return nil, err
}
@@ -260,7 +260,7 @@ func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcNa
} else {
portNumber = svcPort.IntVal
}
- ups, err := t.TranslateUpstream(namespace, svcName, portNumber)
+ ups, err := t.TranslateUpstream(namespace, svcName, "", portNumber)
if err != nil {
return nil, err
}
diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go
index d58379d..3cc6490 100644
--- a/pkg/kube/translation/plugin.go
+++ b/pkg/kube/translation/plugin.go
@@ -38,7 +38,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ar *conf
if err != nil {
return nil, err
}
- ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+ ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return nil, err
}
diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go
index e15a6f7..aed3251 100644
--- a/pkg/kube/translation/translator.go
+++ b/pkg/kube/translation/translator.go
@@ -25,6 +25,7 @@ import (
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -54,7 +55,11 @@ type Translator interface {
// The returned Upstream doesn't have metadata info.
// It doesn't assign any metadata fields, so it's caller's responsibility to decide
// the metadata.
- TranslateUpstream(string, string, int32) (*apisixv1.Upstream, error)
+ // Note the subset is used to filter the ultimate node list, only pods whose labels
+ // matching the subset labels (defined in ApisixUpstream) will be selected.
+ // When the subset is not found, the node list will be empty. When the subset is empty,
+ // all pods IP will be filled.
+ TranslateUpstream(string, string, string, int32) (*apisixv1.Upstream, error)
// TranslateIngress composes a couple of APISIX Routes and upstreams according
// to the given Ingress resource.
TranslateIngress(kube.Ingress) (*TranslateContext, error)
@@ -77,6 +82,7 @@ type Translator interface {
// TranslatorOptions contains options to help Translator
// work well.
type TranslatorOptions struct {
+ PodCache types.PodCache
PodLister listerscorev1.PodLister
EndpointsLister listerscorev1.EndpointsLister
ServiceLister listerscorev1.ServiceLister
@@ -112,7 +118,7 @@ func (t *translator) TranslateUpstreamConfig(au *configv1.ApisixUpstreamConfig)
return ups, nil
}
-func (t *translator) TranslateUpstream(namespace, name string, port int32) (*apisixv1.Upstream, error) {
+func (t *translator) TranslateUpstream(namespace, name, subset string, port int32) (*apisixv1.Upstream, error) {
endpoints, err := t.EndpointsLister.Endpoints(namespace).Get(name)
if err != nil {
return nil, &translateError{
@@ -128,7 +134,13 @@ func (t *translator) TranslateUpstream(namespace, name string, port int32) (*api
au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
- ups.Nodes = nodes
+ // If subset in ApisixRoute is not empty but the ApisixUpstream resouce not found,
+ // just set an empty node list.
+ if subset != "" {
+ ups.Nodes = apisixv1.UpstreamNodes{}
+ } else {
+ ups.Nodes = nodes
+ }
return ups, nil
}
return nil, &translateError{
@@ -136,6 +148,19 @@ func (t *translator) TranslateUpstream(namespace, name string, port int32) (*api
reason: err.Error(),
}
}
+
+ // Filter nodes by subset.
+ if subset != "" {
+ var labels types.Labels
+ for _, ss := range au.Spec.Subsets {
+ if ss.Name == subset {
+ labels = ss.Labels
+ break
+ }
+ }
+ nodes = t.filterNodesByLabels(nodes, labels, au.Namespace)
+ }
+
upsCfg := &au.Spec.ApisixUpstreamConfig
for _, pls := range au.Spec.PortLevelSettings {
if pls.Port == port {
diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go
index 16ced5a..3584205 100644
--- a/pkg/kube/translation/util.go
+++ b/pkg/kube/translation/util.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/id"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/log"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -109,8 +110,8 @@ loop:
return svc.Spec.ClusterIP, svcPort, nil
}
-func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
- ups, err := t.TranslateUpstream(namespace, svcName, svcPort)
+func (t *translator) translateUpstream(namespace, svcName, subset, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
+ ups, err := t.TranslateUpstream(namespace, svcName, subset, svcPort)
if err != nil {
return nil, err
}
@@ -128,6 +129,36 @@ func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity
return ups, nil
}
+func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels types.Labels, namespace string) apisixv1.UpstreamNodes {
+ if labels == nil {
+ return nodes
+ }
+
+ var filteredNodes apisixv1.UpstreamNodes
+ for _, node := range nodes {
+ podName, err := t.PodCache.GetNameByIP(node.Host)
+ if err != nil {
+ log.Errorw("failed to find pod name by ip, ignore it",
+ zap.Error(err),
+ zap.String("pod_ip", node.Host),
+ )
+ continue
+ }
+ pod, err := t.PodLister.Pods(namespace).Get(podName)
+ if err != nil {
+ log.Errorw("failed to find pod, ignore it",
+ zap.Error(err),
+ zap.String("pod_name", podName),
+ )
+ continue
+ }
+ if labels.IsSubsetOf(pod.Labels) {
+ filteredNodes = append(filteredNodes, node)
+ }
+ }
+ return filteredNodes
+}
+
func validateRemoteAddrs(remoteAddrs []string) error {
for _, addr := range remoteAddrs {
if ip := net.ParseIP(addr); ip == nil {
diff --git a/samples/deploy/crd/v1beta1/ApisixRoute.yaml b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
index 89f4e1a..974795e 100644
--- a/samples/deploy/crd/v1beta1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
@@ -166,6 +166,8 @@ spec:
weight:
type: integer
minimum: 0
+ subset:
+ type: string
required:
- serviceName
- servicePort
@@ -188,6 +190,8 @@ spec:
weight:
type: integer
minimum: 0
+ subset:
+ type: string
required:
- serviceName
- servicePort
@@ -238,6 +242,8 @@ spec:
resolveGranualrity:
type: string
enum: ["endpoint", "service"]
+ subset:
+ type: string
required:
- serviceName
- servicePort