You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by zh...@apache.org on 2022/10/20 09:15:46 UTC
[apisix-ingress-controller] branch master updated: feat: support external service (#1306)
This is an automated email from the ASF dual-hosted git repository.
zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 5c798213 feat: support external service (#1306)
5c798213 is described below
commit 5c798213da804493d3664ae4bc39dfceb9686f0d
Author: Sarasa Kisaragi <li...@gmail.com>
AuthorDate: Thu Oct 20 17:15:41 2022 +0800
feat: support external service (#1306)
---
pkg/kube/apisix/apis/config/v2/types.go | 75 ++-
.../apisix/apis/config/v2/zz_generated.deepcopy.go | 72 +++
pkg/kube/apisix_upstream.go | 12 +-
pkg/providers/apisix/apisix_route.go | 268 ++++++++--
pkg/providers/apisix/apisix_upstream.go | 326 +++++++++++-
pkg/providers/apisix/provider.go | 22 +-
pkg/providers/apisix/translation/apisix_plugin.go | 2 +-
pkg/providers/apisix/translation/apisix_route.go | 215 ++++++--
.../apisix/translation/apisix_route_test.go | 245 +++++++++
.../apisix/translation/apisix_upstream.go | 108 ++++
pkg/providers/apisix/translation/translator.go | 8 +-
pkg/providers/utils/string.go | 8 +
pkg/types/apisix/v1/types.go | 5 +
samples/deploy/crd/v1/ApisixRoute.yaml | 15 +-
samples/deploy/crd/v1/ApisixUpstream.yaml | 13 +
test/e2e/go.mod | 2 +-
test/e2e/scaffold/k8s.go | 13 +-
test/e2e/scaffold/scaffold.go | 14 +
test/e2e/suite-features/external-service.go | 582 +++++++++++++++++++++
19 files changed, 1880 insertions(+), 125 deletions(-)
diff --git a/pkg/kube/apisix/apis/config/v2/types.go b/pkg/kube/apisix/apis/config/v2/types.go
index 0225c0f2..f01f57e7 100644
--- a/pkg/kube/apisix/apis/config/v2/types.go
+++ b/pkg/kube/apisix/apis/config/v2/types.go
@@ -67,14 +67,17 @@ type ApisixRouteHTTP struct {
// Backends represents potential backends to proxy after the route
// rule matched. When number of backends are more than one, traffic-split
// plugin in APISIX will be used to split traffic based on the backend weight.
- Backends []ApisixRouteHTTPBackend `json:"backends,omitempty" yaml:"backends,omitempty"`
+ Backends []ApisixRouteHTTPBackend `json:"backends,omitempty" yaml:"backends,omitempty"`
+ // Upstreams refer to ApisixUpstream CRD
+ Upstreams []ApisixRouteUpstreamReference `json:"upstreams,omitempty" yaml:"upstreams,omitempty"`
+
Websocket bool `json:"websocket" yaml:"websocket"`
PluginConfigName string `json:"plugin_config_name,omitempty" yaml:"plugin_config_name,omitempty"`
Plugins []ApisixRoutePlugin `json:"plugins,omitempty" yaml:"plugins,omitempty"`
Authentication ApisixRouteAuthentication `json:"authentication,omitempty" yaml:"authentication,omitempty"`
}
-// ApisixRouteHTTPBackend represents a HTTP backend (a Kuberentes Service).
+// ApisixRouteHTTPBackend represents an HTTP backend (a Kubernetes Service).
type ApisixRouteHTTPBackend struct {
// The name (short) of the service, note cross namespace is forbidden,
// so be sure the ApisixRoute and Service are in the same namespace.
@@ -93,6 +96,13 @@ type ApisixRouteHTTPBackend struct {
Subset string `json:"subset,omitempty" yaml:"subset,omitempty"`
}
+// ApisixRouteUpstreamReference contains a ApisixUpstream CRD reference
+type ApisixRouteUpstreamReference struct {
+ Name string `json:"name,omitempty" yaml:"name"`
+ // +optional
+ Weight *int `json:"weight,omitempty" yaml:"weight"`
+}
+
// ApisixRouteHTTPMatch represents the match condition for hitting this route.
type ApisixRouteHTTPMatch struct {
// URI path predicates, at least one path should be
@@ -166,6 +176,20 @@ type ApisixRoutePlugin struct {
// any plugins.
type ApisixRoutePluginConfig map[string]interface{}
+func (p ApisixRoutePluginConfig) DeepCopyInto(out *ApisixRoutePluginConfig) {
+ b, _ := json.Marshal(&p)
+ _ = json.Unmarshal(b, out)
+}
+
+func (p *ApisixRoutePluginConfig) DeepCopy() *ApisixRoutePluginConfig {
+ if p == nil {
+ return nil
+ }
+ out := new(ApisixRoutePluginConfig)
+ p.DeepCopyInto(out)
+ return out
+}
+
// ApisixRouteAuthentication is the authentication-related
// configuration in ApisixRoute.
type ApisixRouteAuthentication struct {
@@ -189,20 +213,6 @@ type ApisixRouteAuthenticationJwtAuth struct {
Cookie string `json:"cookie,omitempty" yaml:"cookie,omitempty"`
}
-func (p ApisixRoutePluginConfig) DeepCopyInto(out *ApisixRoutePluginConfig) {
- b, _ := json.Marshal(&p)
- _ = json.Unmarshal(b, out)
-}
-
-func (p *ApisixRoutePluginConfig) DeepCopy() *ApisixRoutePluginConfig {
- if p == nil {
- return nil
- }
- out := new(ApisixRoutePluginConfig)
- p.DeepCopyInto(out)
- return out
-}
-
// ApisixRouteStream is the configuration for level 4 route
type ApisixRouteStream struct {
// The rule name, cannot be empty.
@@ -238,7 +248,6 @@ type ApisixRouteStreamBackend struct {
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
-
// ApisixRouteList contains a list of ApisixRoute.
type ApisixRouteList struct {
metav1.TypeMeta `json:",inline" yaml:",inline"`
@@ -250,7 +259,6 @@ type ApisixRouteList struct {
// +genclient:nonNamespaced
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status
-
// ApisixClusterConfig is the Schema for the ApisixClusterConfig resource.
// An ApisixClusterConfig is used to identify an APISIX cluster, it's a
// ClusterScoped resource so the name is unique.
@@ -310,7 +318,6 @@ type ApisixClusterAdminConfig struct {
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
-
// ApisixClusterConfigList contains a list of ApisixClusterConfig.
type ApisixClusterConfigList struct {
metav1.TypeMeta `json:",inline" yaml:",inline"`
@@ -322,7 +329,6 @@ type ApisixClusterConfigList struct {
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status
-
// ApisixConsumer is the Schema for the ApisixConsumer resource.
// An ApisixConsumer is used to identify a consumer.
type ApisixConsumer struct {
@@ -418,7 +424,6 @@ type ApisixConsumerHMACAuthValue struct {
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
-
// ApisixConsumerList contains a list of ApisixConsumer.
type ApisixConsumerList struct {
metav1.TypeMeta `json:",inline" yaml:",inline"`
@@ -443,13 +448,18 @@ type ApisixUpstream struct {
// ApisixUpstreamSpec describes the specification of ApisixUpstream.
type ApisixUpstreamSpec struct {
+ // ExternalNodes contains external nodes the Upstream should use
+ // If this field is set, the upstream will use these nodes directly without any further resolves
+ // +optional
+ ExternalNodes []ApisixUpstreamExternalNode `json:"externalNodes,omitempty" yaml:"externalNodes,omitempty"`
+
ApisixUpstreamConfig `json:",inline" yaml:",inline"`
PortLevelSettings []PortLevelSettings `json:"portLevelSettings,omitempty" yaml:"portLevelSettings,omitempty"`
}
// ApisixUpstreamConfig contains rich features on APISIX Upstream, for instance
-// load balancer, health check and etc.
+// load balancer, health check, etc.
type ApisixUpstreamConfig struct {
// LoadBalancer represents the load balancer configuration for Kubernetes Service.
// The default strategy is round robin.
@@ -483,6 +493,27 @@ type ApisixUpstreamConfig struct {
Subsets []ApisixUpstreamSubset `json:"subsets,omitempty" yaml:"subsets,omitempty"`
}
+// ApisixUpstreamExternalType is the external service type
+type ApisixUpstreamExternalType string
+
+const (
+ // ExternalTypeDomain type is a domain
+ // +k8s:deepcopy-gen=false
+ ExternalTypeDomain ApisixUpstreamExternalType = "Domain"
+
+ // ExternalTypeService type is a K8s ExternalName service
+ // +k8s:deepcopy-gen=false
+ ExternalTypeService ApisixUpstreamExternalType = "Service"
+)
+
+// ApisixUpstreamExternalNode is the external node conf
+type ApisixUpstreamExternalNode struct {
+ Name string `json:"name,omitempty" yaml:"name"`
+ Type ApisixUpstreamExternalType `json:"type,omitempty" yaml:"type"`
+ // +optional
+ Weight *int `json:"weight,omitempty" yaml:"weight"`
+}
+
// ApisixUpstreamSubset defines a single endpoints group of one Service.
type ApisixUpstreamSubset struct {
// Name is the name of subset.
diff --git a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
index f48ac7aa..339a15f3 100644
--- a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
@@ -781,6 +781,13 @@ func (in *ApisixRouteHTTP) DeepCopyInto(out *ApisixRouteHTTP) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
+ if in.Upstreams != nil {
+ in, out := &in.Upstreams, &out.Upstreams
+ *out = make([]ApisixRouteUpstreamReference, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
if in.Plugins != nil {
in, out := &in.Plugins, &out.Plugins
*out = make([]ApisixRoutePlugin, len(*in))
@@ -1048,6 +1055,27 @@ func (in *ApisixRouteStreamMatch) DeepCopy() *ApisixRouteStreamMatch {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixRouteUpstreamReference) DeepCopyInto(out *ApisixRouteUpstreamReference) {
+ *out = *in
+ if in.Weight != nil {
+ in, out := &in.Weight, &out.Weight
+ *out = new(int)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteUpstreamReference.
+func (in *ApisixRouteUpstreamReference) DeepCopy() *ApisixRouteUpstreamReference {
+ if in == nil {
+ return nil
+ }
+ out := new(ApisixRouteUpstreamReference)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApisixSecret) DeepCopyInto(out *ApisixSecret) {
*out = *in
@@ -1259,6 +1287,43 @@ func (in *ApisixUpstreamConfig) DeepCopy() *ApisixUpstreamConfig {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixUpstreamExternalNode) DeepCopyInto(out *ApisixUpstreamExternalNode) {
+ *out = *in
+ if in.Weight != nil {
+ in, out := &in.Weight, &out.Weight
+ *out = new(int)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixUpstreamExternalNode.
+func (in *ApisixUpstreamExternalNode) DeepCopy() *ApisixUpstreamExternalNode {
+ if in == nil {
+ return nil
+ }
+ out := new(ApisixUpstreamExternalNode)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixUpstreamExternalType) DeepCopyInto(out *ApisixUpstreamExternalType) {
+ *out = *in
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixUpstreamExternalType.
+func (in *ApisixUpstreamExternalType) DeepCopy() *ApisixUpstreamExternalType {
+ if in == nil {
+ return nil
+ }
+ out := new(ApisixUpstreamExternalType)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApisixUpstreamList) DeepCopyInto(out *ApisixUpstreamList) {
*out = *in
@@ -1295,6 +1360,13 @@ func (in *ApisixUpstreamList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApisixUpstreamSpec) DeepCopyInto(out *ApisixUpstreamSpec) {
*out = *in
+ if in.ExternalNodes != nil {
+ in, out := &in.ExternalNodes, &out.ExternalNodes
+ *out = make([]ApisixUpstreamExternalNode, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
in.ApisixUpstreamConfig.DeepCopyInto(&out.ApisixUpstreamConfig)
if in.PortLevelSettings != nil {
in, out := &in.PortLevelSettings, &out.PortLevelSettings
diff --git a/pkg/kube/apisix_upstream.go b/pkg/kube/apisix_upstream.go
index b3da9767..2ac51791 100644
--- a/pkg/kube/apisix_upstream.go
+++ b/pkg/kube/apisix_upstream.go
@@ -17,6 +17,8 @@ package kube
import (
"errors"
+ "k8s.io/apimachinery/pkg/labels"
+
"github.com/apache/apisix-ingress-controller/pkg/config"
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
@@ -28,9 +30,11 @@ import (
// it aims at to be compatible with different ApisixUpstream versions.
type ApisixUpstreamLister interface {
// V2beta3 gets the ApisixUpstream in apisix.apache.org/v2beta3.
- V2beta3(string, string) (ApisixUpstream, error)
+ V2beta3(namespace, name string) (ApisixUpstream, error)
// V2 gets the ApisixUpstream in apisix.apache.org/v2.
- V2(string, string) (ApisixUpstream, error)
+ V2(namespace, name string) (ApisixUpstream, error)
+ // ListV2 gets v2.ApisixUpstreams
+ ListV2(namespace string) ([]*configv2.ApisixUpstream, error)
}
// ApisixUpstreamInformer is an encapsulation for the informer of ApisixUpstream,
@@ -120,6 +124,10 @@ func (l *apisixUpstreamLister) V2(namespace, name string) (ApisixUpstream, error
}, nil
}
+func (l *apisixUpstreamLister) ListV2(namespace string) ([]*configv2.ApisixUpstream, error) {
+ return l.v2Lister.ApisixUpstreams(namespace).List(labels.Everything())
+}
+
// MustNewApisixUpstream creates a kube.ApisixUpstream object according to the
// type of obj.
func MustNewApisixUpstream(obj interface{}) ApisixUpstream {
diff --git a/pkg/providers/apisix/apisix_route.go b/pkg/providers/apisix/apisix_route.go
index 81eec188..b11539ad 100644
--- a/pkg/providers/apisix/apisix_route.go
+++ b/pkg/providers/apisix/apisix_route.go
@@ -44,28 +44,43 @@ import (
type apisixRouteController struct {
*apisixCommon
- workqueue workqueue.RateLimitingInterface
- workers int
+ workqueue workqueue.RateLimitingInterface
+ relatedWorkqueue workqueue.RateLimitingInterface
+ workers int
- svcInformer cache.SharedIndexInformer
- apisixRouteLister kube.ApisixRouteLister
- apisixRouteInformer cache.SharedIndexInformer
+ svcInformer cache.SharedIndexInformer
+ apisixRouteLister kube.ApisixRouteLister
+ apisixRouteInformer cache.SharedIndexInformer
+ apisixUpstreamInformer cache.SharedIndexInformer
svcLock sync.RWMutex
- svcMap map[string]map[string]struct{}
+ // service key -> apisix route key
+ svcMap map[string]map[string]struct{}
+
+ apisixUpstreamLock sync.RWMutex
+ // apisix upstream key -> apisix route key
+ apisixUpstreamMap map[string]map[string]struct{}
+}
+
+type routeEvent struct {
+ Key string
+ Type string
}
func newApisixRouteController(common *apisixCommon, apisixRouteInformer cache.SharedIndexInformer, apisixRouteLister kube.ApisixRouteLister) *apisixRouteController {
c := &apisixRouteController{
- apisixCommon: common,
- workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRoute"),
- workers: 1,
+ apisixCommon: common,
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRoute"),
+ relatedWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRouteRelated"),
+ workers: 1,
- svcInformer: common.SvcInformer,
- apisixRouteLister: apisixRouteLister,
- apisixRouteInformer: apisixRouteInformer,
+ svcInformer: common.SvcInformer,
+ apisixRouteLister: apisixRouteLister,
+ apisixRouteInformer: apisixRouteInformer,
+ apisixUpstreamInformer: common.ApisixUpstreamInformer,
- svcMap: make(map[string]map[string]struct{}),
+ svcMap: make(map[string]map[string]struct{}),
+ apisixUpstreamMap: make(map[string]map[string]struct{}),
}
c.apisixRouteInformer.AddEventHandler(
@@ -80,6 +95,12 @@ func newApisixRouteController(common *apisixCommon, apisixRouteInformer cache.Sh
AddFunc: c.onSvcAdd,
},
)
+ c.ApisixUpstreamInformer.AddEventHandler(
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: c.onApisixUpstreamAdd,
+ UpdateFunc: c.onApisixUpstreamUpdate,
+ },
+ )
return c
}
@@ -88,6 +109,7 @@ func (c *apisixRouteController) run(ctx context.Context) {
log.Info("ApisixRoute controller started")
defer log.Info("ApisixRoute controller exited")
defer c.workqueue.ShutDown()
+ defer c.relatedWorkqueue.ShutDown()
ok := cache.WaitForCacheSync(ctx.Done(), c.apisixRouteInformer.HasSynced, c.svcInformer.HasSynced)
if !ok {
@@ -97,6 +119,7 @@ func (c *apisixRouteController) run(ctx context.Context) {
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
+ go c.runRelatedWorker(ctx)
}
<-ctx.Done()
}
@@ -113,20 +136,40 @@ func (c *apisixRouteController) runWorker(ctx context.Context) {
err := c.sync(ctx, val)
c.workqueue.Done(obj)
c.handleSyncErr(obj, err)
- case string:
- err := c.handleSvcAdd(val)
+ }
+ }
+}
+
+func (c *apisixRouteController) runRelatedWorker(ctx context.Context) {
+ for {
+ obj, quit := c.relatedWorkqueue.Get()
+ if quit {
+ return
+ }
+
+ ev := obj.(*routeEvent)
+ switch ev.Type {
+ case "service":
+ err := c.handleSvcAdd(ev.Key)
c.workqueue.Done(obj)
- c.handleSvcErr(val, err)
+ c.handleSvcErr(ev, err)
+ case "ApisixUpstream":
+ err := c.handleApisixUpstreamChange(ev.Key)
+ c.workqueue.Done(obj)
+ c.handleApisixUpstreamErr(ev, err)
}
}
}
-func (c *apisixRouteController) syncServiceRelationship(ev *types.Event, name string, ar kube.ApisixRoute) {
+func (c *apisixRouteController) syncRelationship(ev *types.Event, routeKey string, ar kube.ApisixRoute) {
obj := ev.Object.(kube.ApisixRouteEvent)
var (
oldBackends []string
newBackends []string
+
+ oldUpstreams []string
+ newUpstreams []string
)
switch obj.GroupVersion {
case config.ApisixV2beta3:
@@ -182,6 +225,10 @@ func (c *apisixRouteController) syncServiceRelationship(ev *types.Event, name st
for _, backend := range rule.Backends {
oldBackends = append(oldBackends, old.Namespace+"/"+backend.ServiceName)
}
+
+ for _, upstream := range rule.Upstreams {
+ oldUpstreams = append(oldUpstreams, old.Namespace+"/"+upstream.Name)
+ }
}
}
if newObj != nil {
@@ -189,6 +236,9 @@ func (c *apisixRouteController) syncServiceRelationship(ev *types.Event, name st
for _, backend := range rule.Backends {
newBackends = append(newBackends, newObj.Namespace+"/"+backend.ServiceName)
}
+ for _, upstream := range rule.Upstreams {
+ newUpstreams = append(newUpstreams, newObj.Namespace+"/"+upstream.Name)
+ }
}
}
default:
@@ -203,19 +253,44 @@ func (c *apisixRouteController) syncServiceRelationship(ev *types.Event, name st
// The last event processed MAY not be the logical last event, so it may override the logical previous event
// We have a periodic full-sync, which reduce this problem, but it doesn't solve it completely.
- c.svcLock.Lock()
- defer c.svcLock.Unlock()
toDelete := utils.Difference(oldBackends, newBackends)
toAdd := utils.Difference(newBackends, oldBackends)
+ c.syncServiceRelationChanges(routeKey, toAdd, toDelete)
+
+ toDelete = utils.Difference(oldUpstreams, newUpstreams)
+ toAdd = utils.Difference(newUpstreams, oldUpstreams)
+ c.syncApisixUpstreamRelationChanges(routeKey, toAdd, toDelete)
+}
+
+func (c *apisixRouteController) syncServiceRelationChanges(routeKey string, toAdd, toDelete []string) {
+ c.svcLock.Lock()
+ defer c.svcLock.Unlock()
+
for _, svc := range toDelete {
- delete(c.svcMap[svc], name)
+ delete(c.svcMap[svc], routeKey)
}
for _, svc := range toAdd {
if _, ok := c.svcMap[svc]; !ok {
c.svcMap[svc] = make(map[string]struct{})
}
- c.svcMap[svc][name] = struct{}{}
+ c.svcMap[svc][routeKey] = struct{}{}
+ }
+}
+
+func (c *apisixRouteController) syncApisixUpstreamRelationChanges(routeKey string, toAdd, toDelete []string) {
+ c.apisixUpstreamLock.Lock()
+ defer c.apisixUpstreamLock.Unlock()
+
+ for _, au := range toDelete {
+ delete(c.apisixUpstreamMap[au], routeKey)
+ }
+
+ for _, au := range toAdd {
+ if _, ok := c.apisixUpstreamMap[au]; !ok {
+ c.apisixUpstreamMap[au] = make(map[string]struct{})
+ }
+ c.apisixUpstreamMap[au][routeKey] = struct{}{}
}
}
@@ -263,7 +338,8 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
}
}
- c.syncServiceRelationship(ev, name, ar)
+ // sync before translation
+ c.syncRelationship(ev, obj.Key, ar)
if ev.Type == types.EventDelete {
if ar != nil {
@@ -592,9 +668,12 @@ func (c *apisixRouteController) ResourceSync() {
objs := c.apisixRouteInformer.GetIndexer().List()
c.svcLock.Lock()
+ c.apisixUpstreamLock.Lock()
defer c.svcLock.Unlock()
+ defer c.apisixUpstreamLock.Unlock()
c.svcMap = make(map[string]map[string]struct{})
+ c.apisixUpstreamMap = make(map[string]map[string]struct{})
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
@@ -616,7 +695,7 @@ func (c *apisixRouteController) ResourceSync() {
},
})
- ns, name, err := cache.SplitMetaNamespaceKey(key)
+ ns, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Errorw("split ApisixRoute meta key failed",
zap.Error(err),
@@ -625,7 +704,10 @@ func (c *apisixRouteController) ResourceSync() {
continue
}
- var backends []string
+ var (
+ backends []string
+ upstreams []string
+ )
switch ar.GroupVersion() {
case config.ApisixV2beta3:
for _, rule := range ar.V2beta3().Spec.HTTP {
@@ -638,6 +720,9 @@ func (c *apisixRouteController) ResourceSync() {
for _, backend := range rule.Backends {
backends = append(backends, ns+"/"+backend.ServiceName)
}
+ for _, upstream := range rule.Upstreams {
+ upstreams = append(upstreams, ns+"/"+upstream.Name)
+ }
}
default:
log.Errorw("unknown ApisixRoute version",
@@ -649,7 +734,13 @@ func (c *apisixRouteController) ResourceSync() {
if _, ok := c.svcMap[svcKey]; !ok {
c.svcMap[svcKey] = make(map[string]struct{})
}
- c.svcMap[svcKey][name] = struct{}{}
+ c.svcMap[svcKey][key] = struct{}{}
+ }
+ for _, upstreamKey := range upstreams {
+ if _, ok := c.apisixUpstreamMap[upstreamKey]; !ok {
+ c.apisixUpstreamMap[upstreamKey] = make(map[string]struct{})
+ }
+ c.apisixUpstreamMap[upstreamKey][key] = struct{}{}
}
}
}
@@ -662,7 +753,7 @@ func (c *apisixRouteController) onSvcAdd(obj interface{}) {
if err != nil {
log.Errorw("found Service with bad meta key",
zap.Error(err),
- zap.String("key", key),
+ zap.Any("obj", obj),
)
return
}
@@ -670,29 +761,106 @@ func (c *apisixRouteController) onSvcAdd(obj interface{}) {
return
}
- c.workqueue.Add(key)
+ c.relatedWorkqueue.Add(&routeEvent{
+ Key: key,
+ Type: "service",
+ })
}
func (c *apisixRouteController) handleSvcAdd(key string) error {
- ns, _, err := cache.SplitMetaNamespaceKey(key)
+ log.Debugw("handle svc add", zap.String("key", key))
+ c.svcLock.RLock()
+ routes, ok := c.svcMap[key]
+ c.svcLock.RUnlock()
+
+ if ok {
+ for routeKey := range routes {
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: kube.ApisixRouteEvent{
+ Key: routeKey,
+ GroupVersion: c.Kubernetes.APIVersion,
+ },
+ })
+ }
+ }
+ return nil
+}
+
+func (c *apisixRouteController) handleSvcErr(ev *routeEvent, errOrigin error) {
+ if errOrigin == nil {
+ c.workqueue.Forget(ev)
+
+ return
+ }
+
+ log.Warnw("sync Service failed, will retry",
+ zap.Any("key", ev.Key),
+ zap.Error(errOrigin),
+ )
+ c.relatedWorkqueue.AddRateLimited(ev)
+}
+
+func (c *apisixRouteController) onApisixUpstreamAdd(obj interface{}) {
+ log.Debugw("ApisixUpstream add event arrived",
+ zap.Any("object", obj),
+ )
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("found Service with bad meta key",
+ zap.Error(err),
+ zap.Any("obj", obj),
+ )
+ return
+ }
+ if !c.namespaceProvider.IsWatchingNamespace(key) {
+ return
+ }
+
+ c.relatedWorkqueue.Add(&routeEvent{
+ Key: key,
+ Type: "ApisixUpstream",
+ })
+}
+
+func (c *apisixRouteController) onApisixUpstreamUpdate(oldObj, newObj interface{}) {
+ log.Debugw("ApisixUpstream add event arrived",
+ zap.Any("object", newObj),
+ )
+
+ key, err := cache.MetaNamespaceKeyFunc(newObj)
+ if err != nil {
+ log.Errorf("found ApisixUpstream resource with bad meta namespace key: %s", err)
+ return
+ }
if err != nil {
- log.Errorw("failed to split Service meta key",
+ log.Errorw("found Service with bad meta key",
zap.Error(err),
- zap.String("key", key),
+ zap.Any("obj", newObj),
)
- return nil
+ return
+ }
+ if !c.namespaceProvider.IsWatchingNamespace(key) {
+ return
}
+ c.relatedWorkqueue.Add(&routeEvent{
+ Key: key,
+ Type: "ApisixUpstream",
+ })
+}
+
+func (c *apisixRouteController) handleApisixUpstreamChange(key string) error {
c.svcLock.RLock()
- routes, ok := c.svcMap[key]
+ routes, ok := c.apisixUpstreamMap[key]
c.svcLock.RUnlock()
if ok {
- for route := range routes {
+ for routeKey := range routes {
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: kube.ApisixRouteEvent{
- Key: ns + "/" + route,
+ Key: routeKey,
GroupVersion: c.Kubernetes.APIVersion,
},
})
@@ -701,18 +869,18 @@ func (c *apisixRouteController) handleSvcAdd(key string) error {
return nil
}
-func (c *apisixRouteController) handleSvcErr(key string, errOrigin error) {
+func (c *apisixRouteController) handleApisixUpstreamErr(ev *routeEvent, errOrigin error) {
if errOrigin == nil {
- c.workqueue.Forget(key)
+ c.workqueue.Forget(ev)
return
}
- log.Warnw("sync Service failed, will retry",
- zap.Any("key", key),
+ log.Warnw("sync ApisixUpstream add event failed, will retry",
+ zap.Any("key", ev.Key),
zap.Error(errOrigin),
)
- c.workqueue.AddRateLimited(key)
+ c.workqueue.AddRateLimited(ev)
}
// recordStatus record resources status
@@ -792,3 +960,25 @@ func (c *apisixRouteController) recordStatus(at interface{}, reason string, err
log.Errorf("unsupported resource record: %s", v)
}
}
+
+func (c *apisixRouteController) NotifyServiceAdd(key string) {
+ if !c.namespaceProvider.IsWatchingNamespace(key) {
+ return
+ }
+
+ c.relatedWorkqueue.Add(&routeEvent{
+ Key: key,
+ Type: "service",
+ })
+}
+
+func (c *apisixRouteController) NotifyApisixUpstreamChange(key string) {
+ if !c.namespaceProvider.IsWatchingNamespace(key) {
+ return
+ }
+
+ c.relatedWorkqueue.Add(&routeEvent{
+ Key: key,
+ Type: "ApisixUpstream",
+ })
+}
diff --git a/pkg/providers/apisix/apisix_upstream.go b/pkg/providers/apisix/apisix_upstream.go
index 4d9e29b2..e3992abc 100644
--- a/pkg/providers/apisix/apisix_upstream.go
+++ b/pkg/providers/apisix/apisix_upstream.go
@@ -17,6 +17,7 @@ package apisix
import (
"context"
"fmt"
+ "sync"
"time"
"go.uber.org/zap"
@@ -43,25 +44,38 @@ import (
type apisixUpstreamController struct {
*apisixCommon
- workqueue workqueue.RateLimitingInterface
- workers int
+ workqueue workqueue.RateLimitingInterface
+ svcWorkqueue workqueue.RateLimitingInterface
+ workers int
svcInformer cache.SharedIndexInformer
svcLister listerscorev1.ServiceLister
apisixUpstreamInformer cache.SharedIndexInformer
apisixUpstreamLister kube.ApisixUpstreamLister
+
+ externalSvcLock sync.RWMutex
+ // external name service name -> apisix upstream name
+ externalServiceMap map[string]map[string]struct{}
+
+ // ApisixRouteController don't know how service change affect ApisixUpstream
+ // So we need to notify it here
+ notifyApisixUpstreamChange func(string)
}
-func newApisixUpstreamController(common *apisixCommon) *apisixUpstreamController {
+func newApisixUpstreamController(common *apisixCommon, notifyApisixUpstreamChange func(string)) *apisixUpstreamController {
c := &apisixUpstreamController{
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstream"),
+ svcWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstreamService"),
workers: 1,
svcInformer: common.SvcInformer,
svcLister: common.SvcLister,
apisixUpstreamLister: common.ApisixUpstreamLister,
apisixUpstreamInformer: common.ApisixUpstreamInformer,
+
+ externalServiceMap: make(map[string]map[string]struct{}),
+ notifyApisixUpstreamChange: notifyApisixUpstreamChange,
}
c.apisixUpstreamInformer.AddEventHandler(
@@ -71,6 +85,13 @@ func newApisixUpstreamController(common *apisixCommon) *apisixUpstreamController
DeleteFunc: c.onDelete,
},
)
+ c.svcInformer.AddEventHandler(
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: c.onSvcAdd,
+ UpdateFunc: c.onSvcUpdate,
+ DeleteFunc: c.onSvcDelete,
+ },
+ )
return c
}
@@ -78,6 +99,7 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
log.Info("ApisixUpstream controller started")
defer log.Info("ApisixUpstream controller exited")
defer c.workqueue.ShutDown()
+ defer c.svcWorkqueue.ShutDown()
if ok := cache.WaitForCacheSync(ctx.Done(), c.apisixUpstreamInformer.HasSynced, c.svcInformer.HasSynced); !ok {
log.Error("cache sync failed")
@@ -85,6 +107,7 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
}
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
+ go c.runSvcWorker(ctx)
}
<-ctx.Done()
@@ -102,6 +125,19 @@ func (c *apisixUpstreamController) runWorker(ctx context.Context) {
}
}
+func (c *apisixUpstreamController) runSvcWorker(ctx context.Context) {
+ for {
+ obj, quit := c.svcWorkqueue.Get()
+ if quit {
+ return
+ }
+ key := obj.(string)
+ err := c.handleSvcChange(ctx, key)
+ c.svcWorkqueue.Done(obj)
+ c.handleSvcErr(key, err)
+ }
+}
+
// sync Used to synchronize ApisixUpstream resources, because upstream alone exists in APISIX and will not be affected,
// the synchronization logic only includes upstream's unique configuration management
// So when ApisixUpstream was deleted, only the scheme / load balancer / healthcheck / retry / timeout
@@ -154,6 +190,8 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
multiVersioned = ev.Tombstone.(kube.ApisixUpstream)
}
+ c.syncRelationship(ev, key, multiVersioned)
+
switch event.GroupVersion {
case config.ApisixV2beta3:
au := multiVersioned.V2beta3()
@@ -241,9 +279,36 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
}
case config.ApisixV2:
au := multiVersioned.V2()
+ if au.Spec == nil {
+ return nil
+ }
+
+ if len(au.Spec.ExternalNodes) != 0 {
+ var newUps *apisixv1.Upstream
+ if ev.Type != types.EventDelete {
+ cfg := &au.Spec.ApisixUpstreamConfig
+ newUps, err = c.translator.TranslateUpstreamConfigV2(cfg)
+ if err != nil {
+ log.Errorw("failed to translate upstream config",
+ zap.Any("object", au),
+ zap.Error(err),
+ )
+ c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
+ c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
+ return err
+ }
+ }
+
+ err := c.updateExternalNodes(ctx, au, nil, newUps)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ }
var portLevelSettings map[int32]configv2.ApisixUpstreamConfig
- if au.Spec != nil && len(au.Spec.PortLevelSettings) > 0 {
+ if len(au.Spec.PortLevelSettings) > 0 {
portLevelSettings = make(map[int32]configv2.ApisixUpstreamConfig, len(au.Spec.PortLevelSettings))
for _, port := range au.Spec.PortLevelSettings {
portLevelSettings[port.Port] = port.ApisixUpstreamConfig
@@ -260,7 +325,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
var subsets []configv2.ApisixUpstreamSubset
subsets = append(subsets, configv2.ApisixUpstreamSubset{})
- if au.Spec != nil && len(au.Spec.Subsets) > 0 {
+ if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}
clusterName := c.Config.APISIX.DefaultClusterName
@@ -279,7 +344,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
return err
}
var newUps *apisixv1.Upstream
- if au.Spec != nil && ev.Type != types.EventDelete {
+ if ev.Type != types.EventDelete {
cfg, ok := portLevelSettings[port.Port]
if !ok {
cfg = au.Spec.ApisixUpstreamConfig
@@ -339,6 +404,115 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
return err
}
+func (c *apisixUpstreamController) updateExternalNodes(ctx context.Context, au *configv2.ApisixUpstream, old *configv2.ApisixUpstream, newUps *apisixv1.Upstream) error {
+ clusterName := c.Config.APISIX.DefaultClusterName
+
+ // TODO: if old is not nil, diff the external nodes change first
+
+ upsName := apisixv1.ComposeExternalUpstreamName(au.Namespace, au.Name)
+
+ ups, err := c.APISIX.Cluster(clusterName).Upstream().Get(ctx, upsName)
+ if err != nil {
+ if err != apisixcache.ErrNotFound {
+ log.Errorf("failed to get upstream %s: %s", upsName, err)
+ c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
+ c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
+ return err
+ }
+ // Do nothing if not found
+ } else {
+ nodes, err := c.translator.TranslateApisixUpstreamExternalNodes(au)
+ if err != nil {
+ log.Errorf("failed to translate upstream external nodes %s: %s", upsName, err)
+ c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
+ c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
+ return err
+ }
+ if newUps != nil {
+ newUps.Metadata = ups.Metadata
+ ups = newUps
+ }
+
+ ups.Nodes = nodes
+ if _, err := c.APISIX.Cluster(clusterName).Upstream().Update(ctx, ups); err != nil {
+ log.Errorw("failed to update external nodes upstream",
+ zap.Error(err),
+ zap.Any("upstream", ups),
+ zap.Any("ApisixUpstream", au),
+ zap.String("cluster", clusterName),
+ )
+ c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
+ c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
+ return err
+ }
+ }
+ return nil
+}
+
+func (c *apisixUpstreamController) syncRelationship(ev *types.Event, auKey string, au kube.ApisixUpstream) {
+ obj := ev.Object.(kube.ApisixUpstreamEvent)
+
+ if obj.GroupVersion != config.ApisixV2 {
+ return
+ }
+
+ var (
+ old *configv2.ApisixUpstream
+ newObj *configv2.ApisixUpstream
+ )
+
+ if ev.Type == types.EventUpdate {
+ old = obj.OldObject.V2()
+ } else if ev.Type == types.EventDelete {
+ old = ev.Tombstone.(kube.ApisixUpstream).V2()
+ }
+
+ if ev.Type != types.EventDelete {
+ newObj = au.V2()
+ }
+
+ var (
+ //oldExternalDomains []string
+ //newExternalDomains []string
+ oldExternalServices []string
+ newExternalServices []string
+ )
+ if old != nil {
+ for _, node := range old.Spec.ExternalNodes {
+ if node.Type == configv2.ExternalTypeDomain {
+ //oldExternalDomains = append(oldExternalDomains, node.Name)
+ } else if node.Type == configv2.ExternalTypeService {
+ oldExternalServices = append(oldExternalServices, old.Namespace+"/"+node.Name)
+ }
+ }
+ }
+ if newObj != nil {
+ for _, node := range newObj.Spec.ExternalNodes {
+ if node.Type == configv2.ExternalTypeDomain {
+ //newExternalDomains = append(newExternalDomains, node.Name)
+ } else if node.Type == configv2.ExternalTypeService {
+ newExternalServices = append(newExternalServices, newObj.Namespace+"/"+node.Name)
+ }
+ }
+ }
+
+ c.externalSvcLock.Lock()
+ defer c.externalSvcLock.Unlock()
+
+ toDelete := utils.Difference(oldExternalServices, newExternalServices)
+ toAdd := utils.Difference(newExternalServices, oldExternalServices)
+ for _, svc := range toDelete {
+ delete(c.externalServiceMap[svc], auKey)
+ }
+
+ for _, svc := range toAdd {
+ if _, ok := c.externalServiceMap[svc]; !ok {
+ c.externalServiceMap[svc] = make(map[string]struct{})
+ }
+ c.externalServiceMap[svc][auKey] = struct{}{}
+ }
+}
+
func (c *apisixUpstreamController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
@@ -494,6 +668,146 @@ func (c *apisixUpstreamController) ResourceSync() {
}
}
+func (c *apisixUpstreamController) onSvcAdd(obj interface{}) {
+ svc, ok := obj.(*corev1.Service)
+ if !ok {
+ log.Errorw("got service add event, but it is not a Service",
+ zap.Any("obj", obj),
+ )
+ }
+
+ log.Debugw("Service add event arrived",
+ zap.Any("object", obj),
+ )
+
+ if svc.Spec.Type != corev1.ServiceTypeExternalName {
+ return
+ }
+
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("found Service with bad meta key",
+ zap.Error(err),
+ zap.Any("obj", obj),
+ )
+ return
+ }
+ c.svcWorkqueue.Add(key)
+}
+
+func (c *apisixUpstreamController) onSvcUpdate(old, new interface{}) {
+ oldSvc, ok := old.(*corev1.Service)
+ if !ok {
+ log.Errorw("got service update event, but old one is not a Service",
+ zap.Any("old", old),
+ )
+ }
+ newSvc, ok := new.(*corev1.Service)
+ if !ok {
+ log.Errorw("got service update event, but new one is not a Service",
+ zap.Any("new", new),
+ )
+ }
+
+ if newSvc.Spec.Type != corev1.ServiceTypeExternalName {
+ return
+ }
+
+ if newSvc.Spec.ExternalName != oldSvc.Spec.ExternalName {
+ key, err := cache.MetaNamespaceKeyFunc(newSvc)
+ if err != nil {
+ log.Errorw("found Service with bad meta key",
+ zap.Error(err),
+ zap.Any("obj", newSvc),
+ )
+ return
+ }
+ c.svcWorkqueue.Add(key)
+ }
+}
+
+func (c *apisixUpstreamController) onSvcDelete(obj interface{}) {
+ svc, ok := obj.(*corev1.Service)
+ if !ok {
+ tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ return
+ }
+ svc, ok = tombstone.Obj.(*corev1.Service)
+ if !ok {
+ log.Errorw("got service delete event, but it is not a Service",
+ zap.Any("obj", obj),
+ )
+ return
+ }
+ }
+ if svc.Spec.Type != corev1.ServiceTypeExternalName {
+ return
+ }
+
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("found Service with bad meta key",
+ zap.Error(err),
+ zap.Any("obj", obj),
+ )
+ return
+ }
+ c.svcWorkqueue.Add(key)
+}
+
+func (c *apisixUpstreamController) handleSvcChange(ctx context.Context, key string) error {
+ var toUpdateUpstreams []string
+
+ c.externalSvcLock.RLock()
+ if ups, ok := c.externalServiceMap[key]; ok {
+ for upKey := range ups {
+ toUpdateUpstreams = append(toUpdateUpstreams, upKey)
+ }
+ }
+ c.externalSvcLock.RUnlock()
+
+ //log.Debugw("handleSvcChange",
+ // zap.Any("service map", c.externalServiceMap),
+ // zap.Strings("affectedUpstreams", toUpdateUpstreams),
+ //)
+
+ for _, upKey := range toUpdateUpstreams {
+ log.Debugw("Service change event trigger ApisixUpstream sync",
+ zap.Any("service", key),
+ zap.Any("ApisixUpstream", upKey),
+ )
+ c.notifyApisixUpstreamChange(upKey)
+ ns, name, err := cache.SplitMetaNamespaceKey(upKey)
+ if err != nil {
+ return err
+ }
+ au, err := c.apisixUpstreamLister.V2(ns, name)
+ if err != nil {
+ return err
+ }
+ err = c.updateExternalNodes(ctx, au.V2(), nil, nil)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (c *apisixUpstreamController) handleSvcErr(key string, errOrigin error) {
+ if errOrigin == nil {
+ c.workqueue.Forget(key)
+ return
+ }
+
+ log.Warnw("sync Service failed, will retry",
+ zap.Any("key", key),
+ zap.Error(errOrigin),
+ )
+ c.svcWorkqueue.AddRateLimited(key)
+}
+
// recordStatus record resources status
func (c *apisixUpstreamController) recordStatus(at interface{}, reason string, err error, status metav1.ConditionStatus, generation int64) {
// build condition
diff --git a/pkg/providers/apisix/provider.go b/pkg/providers/apisix/provider.go
index 0d97ef32..627adcfe 100644
--- a/pkg/providers/apisix/provider.go
+++ b/pkg/providers/apisix/provider.go
@@ -51,6 +51,8 @@ type Provider interface {
Init(ctx context.Context) error
ResourceSync()
+ NotifyServiceAdd(key string)
+ NotifyApisixUpstreamChange(key string)
SyncSecretChange(ctx context.Context, ev *types.Event, secret *corev1.Secret, secretMapKey string)
}
@@ -86,10 +88,12 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
apisixFactory := common.KubeClient.NewAPISIXSharedIndexInformerFactory()
p.apisixTranslator = apisixtranslation.NewApisixTranslator(&apisixtranslation.TranslatorOptions{
- Apisix: common.APISIX,
- ClusterName: common.Config.APISIX.DefaultClusterName,
- ServiceLister: common.SvcLister,
- SecretLister: common.SecretLister,
+ Apisix: common.APISIX,
+ ClusterName: common.Config.APISIX.DefaultClusterName,
+
+ ApisixUpstreamLister: common.ApisixUpstreamLister,
+ ServiceLister: common.SvcLister,
+ SecretLister: common.SecretLister,
}, translator)
c := &apisixCommon{
Common: common,
@@ -137,7 +141,7 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
apisixFactory.Apisix().V2().ApisixPluginConfigs().Lister(),
)
- p.apisixUpstreamController = newApisixUpstreamController(c)
+ p.apisixUpstreamController = newApisixUpstreamController(c, p.NotifyApisixUpstreamChange)
p.apisixRouteController = newApisixRouteController(c, p.apisixRouteInformer, apisixRouteLister)
p.apisixTlsController = newApisixTlsController(c, p.apisixTlsInformer, apisixTlsLister)
p.apisixClusterConfigController = newApisixClusterConfigController(c, p.apisixClusterConfigInformer, apisixClusterConfigLister)
@@ -201,6 +205,14 @@ func (p *apisixProvider) ResourceSync() {
e.Wait()
}
+func (p *apisixProvider) NotifyServiceAdd(key string) {
+ p.apisixRouteController.NotifyServiceAdd(key)
+}
+
+func (p *apisixProvider) NotifyApisixUpstreamChange(key string) {
+ p.apisixRouteController.NotifyApisixUpstreamChange(key)
+}
+
func (p *apisixProvider) SyncSecretChange(ctx context.Context, ev *types.Event, secret *corev1.Secret, secretMapKey string) {
p.apisixTlsController.SyncSecretChange(ctx, ev, secret, secretMapKey)
}
diff --git a/pkg/providers/apisix/translation/apisix_plugin.go b/pkg/providers/apisix/translation/apisix_plugin.go
index 1115cfe2..9a93281f 100644
--- a/pkg/providers/apisix/translation/apisix_plugin.go
+++ b/pkg/providers/apisix/translation/apisix_plugin.go
@@ -66,7 +66,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *translation.TranslateConte
})
}
- // Finally append the default upstream in the route.
+ // append the default upstream in the route.
wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{
Weight: defaultBackendWeight,
})
diff --git a/pkg/providers/apisix/translation/apisix_route.go b/pkg/providers/apisix/translation/apisix_route.go
index d5ab4bb6..aa22a829 100644
--- a/pkg/providers/apisix/translation/apisix_route.go
+++ b/pkg/providers/apisix/translation/apisix_route.go
@@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
+ "strconv"
"strings"
"go.uber.org/zap"
@@ -361,21 +362,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar
return errors.New("duplicated route rule name")
}
ruleNameMap[part.Name] = struct{}{}
- backends := part.Backends
- // Use the first backend as the default backend in Route,
- // others will be configured in traffic-split plugin.
- backend := backends[0]
- backends = backends[1:]
-
- svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace)
- if err != nil {
- log.Errorw("failed to get service port in backend",
- zap.Any("backend", backend),
- zap.Any("apisix_route", ar),
- zap.Error(err),
- )
- return err
- }
var timeout *apisixv1.UpstreamTimeout
if part.Timeout != nil {
@@ -425,7 +411,10 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar
}
}
- var exprs [][]apisixv1.StringOrSlice
+ var (
+ exprs [][]apisixv1.StringOrSlice
+ err error
+ )
if part.Match.NginxVars != nil {
exprs, err = t.TranslateRouteMatchExprs(part.Match.NginxVars)
if err != nil {
@@ -445,7 +434,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar
return err
}
- upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity)
route := apisixv1.NewDefaultRoute()
route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
route.ID = id.GenID(route.Name)
@@ -455,36 +443,158 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar
route.Hosts = part.Match.Hosts
route.Uris = part.Match.Paths
route.Methods = part.Match.Methods
- route.UpstreamId = id.GenID(upstreamName)
route.EnableWebsocket = part.Websocket
route.Plugins = pluginMap
route.Timeout = timeout
- if part.PluginConfigName != "" {
- route.PluginConfigId = id.GenID(apisixv1.ComposePluginConfigName(ar.Namespace, part.PluginConfigName))
- }
+ ctx.AddRoute(route)
+ // --- translate "Backends" ---
+ backends := part.Backends
if len(backends) > 0 {
- weight := translation.DefaultWeight
- if backend.Weight != nil {
- weight = *backend.Weight
- }
- plugin, err := t.translateTrafficSplitPlugin(ctx, ar.Namespace, weight, backends)
+ // Use the first backend as the default backend in Route,
+ // others will be configured in traffic-split plugin.
+ backend := backends[0]
+ backends = backends[1:]
+
+ svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace)
if err != nil {
- log.Errorw("failed to translate traffic-split plugin",
+ log.Errorw("failed to get service port in backend",
+ zap.Any("backend", backend),
+ zap.Any("apisix_route", ar),
zap.Error(err),
- zap.Any("ApisixRoute", ar),
)
return err
}
- route.Plugins["traffic-split"] = plugin
+
+ upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity)
+ route.UpstreamId = id.GenID(upstreamName)
+ if part.PluginConfigName != "" {
+ route.PluginConfigId = id.GenID(apisixv1.ComposePluginConfigName(ar.Namespace, part.PluginConfigName))
+ }
+
+ if len(backends) > 0 {
+ weight := translation.DefaultWeight
+ if backend.Weight != nil {
+ weight = *backend.Weight
+ }
+ plugin, err := t.translateTrafficSplitPlugin(ctx, ar.Namespace, weight, backends)
+ if err != nil {
+ log.Errorw("failed to translate traffic-split plugin",
+ zap.Error(err),
+ zap.Any("ApisixRoute", ar),
+ )
+ return err
+ }
+ route.Plugins["traffic-split"] = plugin
+ }
+ if !ctx.CheckUpstreamExist(upstreamName) {
+ ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
+ if err != nil {
+ return err
+ }
+ ctx.AddUpstream(ups)
+ }
}
- ctx.AddRoute(route)
- if !ctx.CheckUpstreamExist(upstreamName) {
- ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
+
+ if len(part.Backends) == 0 && len(part.Upstreams) > 0 {
+ // Only have Upstreams
+ upName := apisixv1.ComposeExternalUpstreamName(ar.Namespace, part.Upstreams[0].Name)
+ route.UpstreamId = id.GenID(upName)
+ }
+ // --- translate Upstreams ---
+ var ups []*apisixv1.Upstream
+ for i, au := range part.Upstreams {
+ up, err := t.translateExternalApisixUpstream(ar.Namespace, au.Name)
if err != nil {
- return err
+ log.Errorw(fmt.Sprintf("failed to translate ApisixUpstream at Upstream[%v]", i),
+ zap.Error(err),
+ zap.String("apisix_upstream", ar.Namespace+"/"+au.Name),
+ )
+ continue
}
- ctx.AddUpstream(ups)
+ if au.Weight != nil {
+ up.Labels["meta_weight"] = strconv.Itoa(*au.Weight)
+ } else {
+ up.Labels["meta_weight"] = strconv.Itoa(translation.DefaultWeight)
+ }
+ ups = append(ups, up)
+ }
+
+ if len(ups) == 0 {
+ continue
+ }
+
+ var wups []apisixv1.TrafficSplitConfigRuleWeightedUpstream
+ if len(part.Backends) == 0 {
+ if len(ups) > 1 {
+ for i, up := range ups {
+ weight, err := strconv.Atoi(up.Labels["meta_weight"])
+ if err != nil {
+ // shouldn't happen
+ log.Errorw(fmt.Sprintf("failed to parse translated upstream weight at %v", i),
+ zap.Error(err),
+ zap.String("meta_weight", up.Labels["meta_weight"]),
+ )
+ continue
+ }
+ if i == 0 {
+ // set as default
+ wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{
+ Weight: weight,
+ })
+ } else {
+ wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{
+ UpstreamID: ups[i].ID,
+ Weight: weight,
+ })
+ }
+ }
+ }
+ } else {
+ // Mixed backends and upstreams
+ if cfg, ok := route.Plugins["traffic-split"]; ok {
+ if tsCfg, ok := cfg.(*apisixv1.TrafficSplitConfig); ok {
+ wups = tsCfg.Rules[0].WeightedUpstreams
+ }
+ }
+ if len(wups) == 0 {
+ // append the default upstream in the route.
+ weight := translation.DefaultWeight
+ if part.Backends[0].Weight != nil {
+ weight = *part.Backends[0].Weight
+ }
+ wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{
+ Weight: weight,
+ })
+ }
+ for i, up := range ups {
+ weight, err := strconv.Atoi(up.Labels["meta_weight"])
+ if err != nil {
+ // shouldn't happen
+ log.Errorw(fmt.Sprintf("failed to parse translated upstream weight at %v", i),
+ zap.Error(err),
+ zap.String("meta_weight", up.Labels["meta_weight"]),
+ )
+ continue
+ }
+ wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{
+ UpstreamID: ups[i].ID,
+ Weight: weight,
+ })
+ }
+ }
+ if len(wups) > 0 {
+ route.Plugins["traffic-split"] = &apisixv1.TrafficSplitConfig{
+ Rules: []apisixv1.TrafficSplitConfigRule{
+ {
+ WeightedUpstreams: wups,
+ },
+ },
+ }
+ }
+
+ for _, up := range ups {
+ ctx.AddUpstream(up)
}
}
return nil
@@ -675,11 +785,6 @@ func (t *translator) translateHTTPRouteV2beta3NotStrictly(ctx *translation.Trans
// translateHTTPRouteV2NotStrictly translates http route with a loose way, only generate ID and Name for delete Event.
func (t *translator) translateHTTPRouteV2NotStrictly(ctx *translation.TranslateContext, ar *configv2.ApisixRoute) error {
for _, part := range ar.Spec.HTTP {
- backends := part.Backends
- // Use the first backend as the default backend in Route,
- // others will be configured in traffic-split plugin.
- backend := backends[0]
-
pluginMap := make(apisixv1.Plugins)
// add route plugins
for _, plugin := range part.Plugins {
@@ -711,7 +816,6 @@ func (t *translator) translateHTTPRouteV2NotStrictly(ctx *translation.TranslateC
}
}
- upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity)
route := apisixv1.NewDefaultRoute()
route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
route.ID = id.GenID(route.Name)
@@ -720,12 +824,33 @@ func (t *translator) translateHTTPRouteV2NotStrictly(ctx *translation.TranslateC
}
ctx.AddRoute(route)
- if !ctx.CheckUpstreamExist(upstreamName) {
- ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity)
- if err != nil {
- return err
+
+ if len(part.Backends) > 0 {
+ backends := part.Backends
+ // Use the first backend as the default backend in Route,
+ // others will be configured in traffic-split plugin.
+ backend := backends[0]
+
+ upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity)
+ if !ctx.CheckUpstreamExist(upstreamName) {
+ ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity)
+ if err != nil {
+ return err
+ }
+ ctx.AddUpstream(ups)
+ }
+ }
+ if len(part.Upstreams) > 0 {
+ upstreams := part.Upstreams
+ for _, upstream := range upstreams {
+ upstreamName := apisixv1.ComposeExternalUpstreamName(ar.Namespace, upstream.Name)
+ if !ctx.CheckUpstreamExist(upstreamName) {
+ ups := &apisixv1.Upstream{}
+ ups.Name = upstreamName
+ ups.ID = id.GenID(ups.Name)
+ ctx.AddUpstream(ups)
+ }
}
- ctx.AddUpstream(ups)
}
}
return nil
diff --git a/pkg/providers/apisix/translation/apisix_route_test.go b/pkg/providers/apisix/translation/apisix_route_test.go
index c84bdd0e..f0a54443 100644
--- a/pkg/providers/apisix/translation/apisix_route_test.go
+++ b/pkg/providers/apisix/translation/apisix_route_test.go
@@ -496,3 +496,248 @@ func TestTranslateApisixRouteV2beta3NotStrictly(t *testing.T) {
assert.Equal(t, id.GenID("test_svc1_81"), tx.Upstreams[0].ID, "upstream1 id error")
assert.Equal(t, id.GenID("test_svc2_82"), tx.Upstreams[1].ID, "upstream2 id error")
}
+
+func ptrOf[T interface{}](v T) *T {
+ return &v
+}
+
+func mockTranslatorV2(t *testing.T) (*translator, <-chan struct{}) {
+ svc := &corev1.Service{
+ TypeMeta: metav1.TypeMeta{},
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "svc",
+ Namespace: "test",
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {
+ Name: "port1",
+ Port: 80,
+ TargetPort: intstr.IntOrString{
+ Type: intstr.Int,
+ IntVal: 9080,
+ },
+ },
+ {
+ Name: "port2",
+ Port: 443,
+ TargetPort: intstr.IntOrString{
+ Type: intstr.Int,
+ IntVal: 9443,
+ },
+ },
+ },
+ },
+ }
+ endpoints := &corev1.Endpoints{
+ TypeMeta: metav1.TypeMeta{},
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "svc",
+ Namespace: "test",
+ },
+ Subsets: []corev1.EndpointSubset{
+ {
+ Ports: []corev1.EndpointPort{
+ {
+ Name: "port1",
+ Port: 9080,
+ },
+ {
+ Name: "port2",
+ Port: 9443,
+ },
+ },
+ Addresses: []corev1.EndpointAddress{
+ {IP: "192.168.1.1"},
+ {IP: "192.168.1.2"},
+ },
+ },
+ },
+ }
+
+ au := &configv2.ApisixUpstream{
+ TypeMeta: metav1.TypeMeta{},
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "au",
+ Namespace: "test",
+ },
+ Spec: &configv2.ApisixUpstreamSpec{
+ ExternalNodes: []configv2.ApisixUpstreamExternalNode{
+ {
+ Name: "httpbin.org",
+ Type: configv2.ExternalTypeDomain,
+ Weight: ptrOf(1),
+ },
+ },
+ },
+ }
+
+ client := fake.NewSimpleClientset()
+ informersFactory := informers.NewSharedInformerFactory(client, 0)
+ svcInformer := informersFactory.Core().V1().Services().Informer()
+ svcLister := informersFactory.Core().V1().Services().Lister()
+ epLister, epInformer := kube.NewEndpointListerAndInformer(informersFactory, false)
+ apisixClient := fakeapisix.NewSimpleClientset()
+ apisixInformersFactory := apisixinformers.NewSharedInformerFactory(apisixClient, 0)
+
+ auInformer := apisixInformersFactory.Apisix().V2().ApisixUpstreams().Informer()
+ auLister := kube.NewApisixUpstreamLister(
+ apisixInformersFactory.Apisix().V2beta3().ApisixUpstreams().Lister(),
+ apisixInformersFactory.Apisix().V2().ApisixUpstreams().Lister(),
+ )
+
+ _, err := client.CoreV1().Endpoints("test").Create(context.Background(), endpoints, metav1.CreateOptions{})
+ assert.Nil(t, err)
+ _, err = client.CoreV1().Services("test").Create(context.Background(), svc, metav1.CreateOptions{})
+ assert.Nil(t, err)
+ _, err = apisixClient.ApisixV2().ApisixUpstreams("test").Create(context.Background(), au, metav1.CreateOptions{})
+ assert.Nil(t, err)
+
+ tr := &translator{
+ &TranslatorOptions{
+ ServiceLister: svcLister,
+ ApisixUpstreamLister: auLister,
+ },
+ translation.NewTranslator(&translation.TranslatorOptions{
+ ServiceLister: svcLister,
+ EndpointLister: epLister,
+ ApisixUpstreamLister: auLister,
+ APIVersion: config.ApisixV2,
+ }),
+ }
+
+ processCh := make(chan struct{}, 2)
+ svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ processCh <- struct{}{}
+ },
+ })
+ epInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ processCh <- struct{}{}
+ },
+ })
+ auInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ processCh <- struct{}{}
+ },
+ })
+
+ stopCh := make(chan struct{})
+ defer close(stopCh)
+ go svcInformer.Run(stopCh)
+ go epInformer.Run(stopCh)
+ go auInformer.Run(stopCh)
+ cache.WaitForCacheSync(stopCh, svcInformer.HasSynced)
+
+ return tr, processCh
+}
+
+func TestTranslateApisixRouteV2WithUpstream(t *testing.T) {
+ tr, processCh := mockTranslatorV2(t)
+ <-processCh
+ <-processCh
+
+ ar := &configv2.ApisixRoute{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "ar",
+ Namespace: "test",
+ },
+ Spec: configv2.ApisixRouteSpec{
+ HTTP: []configv2.ApisixRouteHTTP{
+ {
+ Name: "rule1",
+ Match: configv2.ApisixRouteHTTPMatch{
+ Paths: []string{
+ "/*",
+ },
+ },
+ Backends: []configv2.ApisixRouteHTTPBackend{
+ {
+ ServiceName: "svc",
+ ServicePort: intstr.IntOrString{
+ IntVal: 80,
+ },
+ Weight: ptrOf(2),
+ },
+ },
+ Upstreams: []configv2.ApisixRouteUpstreamReference{
+ {
+ Name: "au",
+ Weight: ptrOf(1),
+ },
+ },
+ },
+ },
+ },
+ }
+
+ tctx, err := tr.TranslateRouteV2(ar)
+ assert.Nil(t, err)
+
+ assert.Equal(t, 1, len(tctx.Routes))
+ r := tctx.Routes[0]
+
+ assert.NotNil(t, r.Plugins["traffic-split"])
+
+ tsCfg, ok := r.Plugins["traffic-split"].(*apisixv1.TrafficSplitConfig)
+ assert.Equal(t, true, ok)
+ assert.Equal(t, 1, len(tsCfg.Rules))
+ assert.NotNil(t, tsCfg.Rules[0])
+ assert.NotNil(t, tsCfg.Rules[0].WeightedUpstreams, "weighted upstreams")
+
+ wups := tsCfg.Rules[0].WeightedUpstreams
+
+ upsName := apisixv1.ComposeExternalUpstreamName(ar.Namespace, "au")
+ upsID := id.GenID(upsName)
+ assert.Equal(t, []apisixv1.TrafficSplitConfigRuleWeightedUpstream{
+ {
+ // Default
+ UpstreamID: "",
+ Weight: 2,
+ },
+ {
+ UpstreamID: upsID,
+ Weight: 1,
+ },
+ }, wups)
+
+ assert.Equal(t, 2, len(tctx.Upstreams))
+ var ups *apisixv1.Upstream
+ for _, u := range tctx.Upstreams {
+ if u.ID == upsID {
+ ups = u
+ break
+ }
+ }
+ assert.NotNil(t, ups)
+
+ // unset useless data
+ ups.Desc = ""
+ assert.Equal(t, &apisixv1.Upstream{
+ Metadata: apisixv1.Metadata{
+ ID: upsID,
+ Name: upsName,
+ Desc: "",
+ Labels: map[string]string{
+ "managed-by": "apisix-ingress-controller",
+ "meta_weight": "1",
+ },
+ },
+ Type: apisixv1.LbRoundRobin,
+ HashOn: "",
+ Key: "",
+ Checks: nil,
+ Nodes: []apisixv1.UpstreamNode{
+ {
+ Host: "httpbin.org",
+ Port: 80,
+ Weight: 1,
+ },
+ },
+ Scheme: apisixv1.SchemeHTTP,
+ Retries: nil,
+ Timeout: nil,
+ TLS: nil,
+ }, ups)
+}
diff --git a/pkg/providers/apisix/translation/apisix_upstream.go b/pkg/providers/apisix/translation/apisix_upstream.go
index d3d725af..e91dcfad 100644
--- a/pkg/providers/apisix/translation/apisix_upstream.go
+++ b/pkg/providers/apisix/translation/apisix_upstream.go
@@ -15,7 +15,16 @@
package translation
import (
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/pkg/errors"
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+
"github.com/apache/apisix-ingress-controller/pkg/id"
+ v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
"github.com/apache/apisix-ingress-controller/pkg/providers/translation"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
@@ -47,3 +56,102 @@ func (t *translator) translateService(namespace, svcName, subset, svcResolveGran
ups.ID = id.GenID(ups.Name)
return ups, nil
}
+
+// TODO: Support Port field
+func (t *translator) TranslateApisixUpstreamExternalNodes(au *v2.ApisixUpstream) ([]apisixv1.UpstreamNode, error) {
+ var nodes []apisixv1.UpstreamNode
+ for i, node := range au.Spec.ExternalNodes {
+ if node.Type == v2.ExternalTypeDomain {
+ arr := strings.Split(node.Name, ":")
+
+ weight := translation.DefaultWeight
+ if node.Weight != nil {
+ weight = *node.Weight
+ }
+ n := apisixv1.UpstreamNode{
+ Host: arr[0],
+ Weight: weight,
+ }
+
+ if len(arr) == 1 {
+ if strings.HasPrefix(arr[0], "https://") {
+ n.Port = 443
+ } else {
+ n.Port = 80
+ }
+ } else if len(arr) == 2 {
+ port, err := strconv.Atoi(arr[1])
+ if err != nil {
+ return nil, errors.Wrap(err, fmt.Sprintf("failed to parse ApisixUpstream %s/%s port: at ExternalNodes[%v]: %s", au.Namespace, au.Name, i, node.Name))
+ }
+
+ n.Port = port
+ }
+
+ nodes = append(nodes, n)
+ } else if node.Type == v2.ExternalTypeService {
+ svc, err := t.ServiceLister.Services(au.Namespace).Get(node.Name)
+ if err != nil {
+ // In theory, ApisixRoute now watches all service add event, a not found error is already handled
+ if k8serrors.IsNotFound(err) {
+ // TODO: Should retry
+ return nil, err
+ }
+ return nil, err
+ }
+
+ if svc.Spec.Type != corev1.ServiceTypeExternalName {
+ return nil, fmt.Errorf("ApisixUpstream %s/%s ExternalNodes[%v] must refers to a ExternalName service: %s", au.Namespace, au.Name, i, node.Name)
+ }
+
+ weight := translation.DefaultWeight
+ if node.Weight != nil {
+ weight = *node.Weight
+ }
+ n := apisixv1.UpstreamNode{
+ Host: svc.Spec.ExternalName,
+ Weight: weight,
+ }
+
+ // TODO: Support Port field. This is a temporary solution.
+ n.Port = 80
+
+ nodes = append(nodes, n)
+ }
+ }
+ return nodes, nil
+}
+
+// TODO: Retry when ApisixUpstream/ExternalName service not found
+func (t *translator) translateExternalApisixUpstream(namespace, upstream string) (*apisixv1.Upstream, error) {
+ multiVersioned, err := t.ApisixUpstreamLister.V2(namespace, upstream)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ // TODO: Should retry
+ return nil, err
+ }
+ return nil, err
+ }
+
+ au := multiVersioned.V2()
+ if len(au.Spec.ExternalNodes) == 0 {
+ // should do further resolve
+ return nil, fmt.Errorf("%s/%s has empty ExternalNodes", namespace, upstream)
+ }
+
+ ups, err := t.TranslateUpstreamConfigV2(&au.Spec.ApisixUpstreamConfig)
+ if err != nil {
+ return nil, err
+ }
+ ups.Name = apisixv1.ComposeExternalUpstreamName(namespace, upstream)
+ ups.ID = id.GenID(ups.Name)
+
+ externalNodes, err := t.TranslateApisixUpstreamExternalNodes(au)
+ if err != nil {
+ return nil, err
+ }
+
+ ups.Nodes = append(ups.Nodes, externalNodes...)
+
+ return ups, nil
+}
diff --git a/pkg/providers/apisix/translation/translator.go b/pkg/providers/apisix/translation/translator.go
index 3b89283f..a15ff337 100644
--- a/pkg/providers/apisix/translation/translator.go
+++ b/pkg/providers/apisix/translation/translator.go
@@ -32,8 +32,9 @@ type TranslatorOptions struct {
Apisix apisix.APISIX
ClusterName string
- ServiceLister listerscorev1.ServiceLister
- SecretLister listerscorev1.SecretLister
+ ApisixUpstreamLister kube.ApisixUpstreamLister
+ ServiceLister listerscorev1.ServiceLister
+ SecretLister listerscorev1.SecretLister
}
type translator struct {
@@ -95,6 +96,9 @@ type ApisixTranslator interface {
TranslatePluginConfigV2NotStrictly(*configv2.ApisixPluginConfig) (*translation.TranslateContext, error)
TranslateRouteMatchExprs(nginxVars []configv2.ApisixRouteHTTPMatchExpr) ([][]apisixv1.StringOrSlice, error)
+
+ // TranslateApisixUpstreamExternalNodes translates an ApisixUpstream with external nodes to APISIX nodes.
+ TranslateApisixUpstreamExternalNodes(au *configv2.ApisixUpstream) ([]apisixv1.UpstreamNode, error)
}
func NewApisixTranslator(opts *TranslatorOptions, t translation.Translator) ApisixTranslator {
diff --git a/pkg/providers/utils/string.go b/pkg/providers/utils/string.go
index df1968c7..414bad23 100644
--- a/pkg/providers/utils/string.go
+++ b/pkg/providers/utils/string.go
@@ -24,6 +24,7 @@ func TruncateString(s string, max int) string {
}
// Difference returns elements only in a
+// Duplicated elements are considered as same element
func Difference(a, b []string) []string {
bMap := make(map[string]struct{}, len(b))
for _, elem := range b {
@@ -37,3 +38,10 @@ func Difference(a, b []string) []string {
}
return onlyInA
}
+
+func Equal(a, b []string) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ return len(Difference(a, b)) == 0 && len(Difference(b, a)) == 0
+}
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 20e5d4a7..1cf5ee83 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -507,6 +507,11 @@ func ComposeUpstreamName(namespace, name, subset string, port int32, resolveGran
return buf.String()
}
+// ComposeExternalUpstreamName uses ApisixUpstream namespace, name to compose the upstream name.
+func ComposeExternalUpstreamName(namespace, name string) string {
+ return namespace + "_" + name
+}
+
// ComposeRouteName uses namespace, name and rule name to compose
// the route name.
func ComposeRouteName(namespace, name string, rule string) string {
diff --git a/samples/deploy/crd/v1/ApisixRoute.yaml b/samples/deploy/crd/v1/ApisixRoute.yaml
index cf0f8e58..13654b36 100644
--- a/samples/deploy/crd/v1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1/ApisixRoute.yaml
@@ -609,7 +609,9 @@ spec:
minItems: 1
items:
type: object
- required: ["name", "match", "backends"]
+ anyOf:
+ - required: ["name", "match", "backends"]
+ - required: ["name", "match", "upstreams"]
properties:
name:
type: string
@@ -710,6 +712,17 @@ spec:
plugin_config_name:
type: string
minLength: 1
+ upstreams:
+ description: Upstreams refer to ApisixUpstream CRD
+ type: array
+ items:
+ description: ApisixRouteUpstreamReference contains a ApisixUpstream CRD reference
+ type: object
+ properties:
+ name:
+ type: string
+ weight:
+ type: integer
backends:
type: array
minItems: 1
diff --git a/samples/deploy/crd/v1/ApisixUpstream.yaml b/samples/deploy/crd/v1/ApisixUpstream.yaml
index 752efa53..27d6ef83 100644
--- a/samples/deploy/crd/v1/ApisixUpstream.yaml
+++ b/samples/deploy/crd/v1/ApisixUpstream.yaml
@@ -413,6 +413,19 @@ spec:
spec:
type: object
properties:
+ externalNodes:
+ description: ExternalNodes contains external nodes the Upstream should use If this field is set, the upstream will use these nodes directly without any further resolves
+ type: array
+ items:
+ description: ApisixUpstreamExternalNode is the external node conf
+ type: object
+ properties:
+ name:
+ type: string
+ type:
+ type: string
+ weight:
+ type: integer
subsets:
type: array
items:
diff --git a/test/e2e/go.mod b/test/e2e/go.mod
index c4fd20a9..f77183de 100644
--- a/test/e2e/go.mod
+++ b/test/e2e/go.mod
@@ -11,6 +11,7 @@ require (
github.com/gruntwork-io/terratest v0.40.22
github.com/onsi/ginkgo/v2 v2.2.0
github.com/stretchr/testify v1.8.0
+ go.uber.org/zap v1.23.0
k8s.io/api v0.25.2
k8s.io/apimachinery v0.25.2
k8s.io/client-go v0.25.2
@@ -92,7 +93,6 @@ require (
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
- go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.0.0-20220725212005-46097bf591d3 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go
index 1962f024..fbaf74d5 100644
--- a/test/e2e/scaffold/k8s.go
+++ b/test/e2e/scaffold/k8s.go
@@ -27,6 +27,7 @@ import (
"time"
"github.com/apache/apisix-ingress-controller/pkg/apisix"
+ "github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"github.com/gruntwork-io/terratest/modules/k8s"
@@ -34,6 +35,7 @@ import (
"github.com/gruntwork-io/terratest/modules/testing"
ginkgo "github.com/onsi/ginkgo/v2"
"github.com/stretchr/testify/assert"
+ "go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
@@ -121,7 +123,16 @@ func (s *Scaffold) CreateApisixRoute(name string, rules []ApisixRouteRule) {
// CreateResourceFromString creates resource from a loaded yaml string.
func (s *Scaffold) CreateResourceFromString(yaml string) error {
- return k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml)
+ err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml)
+ time.Sleep(5 * time.Second)
+
+ // if the error raised, it may be a &shell.ErrWithCmdOutput, which is useless in debug
+ if err != nil {
+ log.Errorw("create resource failed",
+ zap.Error(err),
+ )
+ }
+ return err
}
func (s *Scaffold) DeleteResourceFromString(yaml string) error {
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index ff72d2eb..614595ad 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -34,6 +34,7 @@ import (
"time"
"github.com/apache/apisix-ingress-controller/pkg/config"
+ "github.com/apache/apisix-ingress-controller/pkg/log"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gavv/httpexpect/v2"
"github.com/gruntwork-io/terratest/modules/k8s"
@@ -41,6 +42,7 @@ import (
"github.com/gruntwork-io/terratest/modules/testing"
ginkgo "github.com/onsi/ginkgo/v2"
"github.com/stretchr/testify/assert"
+ "go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
@@ -640,6 +642,18 @@ func ApisixResourceVersion() *apisixResourceVersionInfo {
return apisixResourceVersion
}
+func (s *Scaffold) DeleteResource(resourceType, name string) error {
+ err := k8s.RunKubectlE(s.t, s.kubectlOptions, "delete", resourceType, name)
+ if err != nil {
+ log.Errorw("delete resource failed",
+ zap.Error(err),
+ zap.String("resource", resourceType),
+ zap.String("name", name),
+ )
+ }
+ return err
+}
+
func (s *Scaffold) NamespaceSelectorLabelStrings() []string {
var labels []string
for k, v := range s.opts.NamespaceSelectorLabel {
diff --git a/test/e2e/suite-features/external-service.go b/test/e2e/suite-features/external-service.go
new file mode 100644
index 00000000..03fbc24a
--- /dev/null
+++ b/test/e2e/suite-features/external-service.go
@@ -0,0 +1,582 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package features
+
+import (
+ "fmt"
+ "net/http"
+ "reflect"
+ "time"
+
+ "github.com/apache/apisix-ingress-controller/pkg/id"
+ v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
+ "github.com/apache/apisix-ingress-controller/pkg/log"
+ "github.com/apache/apisix-ingress-controller/pkg/providers/translation"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
+ apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+ "github.com/onsi/ginkgo/v2"
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/zap"
+
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("suite-features: external services", func() {
+ PhaseCreateExternalService := func(s *scaffold.Scaffold, name, externalName string) {
+ extService := fmt.Sprintf(`
+apiVersion: v1
+kind: Service
+metadata:
+ name: %s
+spec:
+ type: ExternalName
+ externalName: %s
+`, name, externalName)
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(extService))
+ }
+ PhaseCreateApisixRoute := func(s *scaffold.Scaffold, name, upstream string) {
+ ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: %s
+spec:
+ http:
+ - name: rule1
+ match:
+ hosts:
+ - httpbin.org
+ paths:
+ - /*
+ exprs:
+ - subject:
+ scope: Header
+ name: X-Foo
+ op: Equal
+ value: bar
+ upstreams:
+ - name: %s
+`, name, upstream)
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar))
+ }
+
+ PhaseCreateApisixRouteWithHostRewrite := func(s *scaffold.Scaffold, name, upstream, rewriteHost string) {
+ ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: %s
+spec:
+ http:
+ - name: rule1
+ match:
+ hosts:
+ - httpbin.org
+ paths:
+ - /*
+ exprs:
+ - subject:
+ scope: Header
+ name: X-Foo
+ op: Equal
+ value: bar
+ upstreams:
+ - name: %s
+ plugins:
+ - name: proxy-rewrite
+ enable: true
+ config:
+ host: %s
+`, name, upstream, rewriteHost)
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar))
+ }
+
+ PhaseCreateApisixUpstream := func(s *scaffold.Scaffold, name string, nodeType v2.ApisixUpstreamExternalType, nodeName string) {
+ au := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixUpstream
+metadata:
+ name: %s
+spec:
+ externalNodes:
+ - type: %s
+ name: %s
+`, name, nodeType, nodeName)
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au))
+ }
+
+ PhaseValidateNoUpstreams := func(s *scaffold.Scaffold) {
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, 0, "upstream count")
+ }
+
+ PhaseValidateNoRoutes := func(s *scaffold.Scaffold) {
+ routes, err := s.ListApisixRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), routes, 0, "route count")
+ }
+
+ PhaseValidateFirstUpstream := func(s *scaffold.Scaffold, length int, node string, port, weight int) string {
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, length, "upstream count")
+ upstream := ups[0]
+ assert.Len(ginkgo.GinkgoT(), upstream.Nodes, 1)
+ assert.Equal(ginkgo.GinkgoT(), node, upstream.Nodes[0].Host)
+ assert.Equal(ginkgo.GinkgoT(), port, upstream.Nodes[0].Port)
+ assert.Equal(ginkgo.GinkgoT(), weight, upstream.Nodes[0].Weight)
+
+ return upstream.ID
+ }
+
+ PhaseValidateRouteAccess := func(s *scaffold.Scaffold, upstreamId string) {
+ routes, err := s.ListApisixRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), routes, 1, "route count")
+ assert.Equal(ginkgo.GinkgoT(), upstreamId, routes[0].UpstreamId)
+
+ _ = s.NewAPISIXClient().GET("/ip").
+ WithHeader("Host", "httpbin.org").
+ WithHeader("X-Foo", "bar").
+ Expect().
+ Status(http.StatusOK)
+ }
+
+ PhaseValidateRouteAccessCode := func(s *scaffold.Scaffold, upstreamId string, code int) {
+ routes, err := s.ListApisixRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), routes, 1, "route count")
+ assert.Equal(ginkgo.GinkgoT(), upstreamId, routes[0].UpstreamId)
+
+ _ = s.NewAPISIXClient().GET("/ip").
+ WithHeader("Host", "httpbin.org").
+ WithHeader("X-Foo", "bar").
+ Expect().
+ Status(code)
+ }
+
+ PhaseCreateHttpbin := func(s *scaffold.Scaffold, name string) string {
+ _httpbinDeploymentTemplate := fmt.Sprintf(`
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: %s
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: %s
+ strategy:
+ rollingUpdate:
+ maxSurge: 50%%
+ maxUnavailable: 1
+ type: RollingUpdate
+ template:
+ metadata:
+ labels:
+ app: %s
+ spec:
+ terminationGracePeriodSeconds: 0
+ containers:
+ - livenessProbe:
+ failureThreshold: 3
+ initialDelaySeconds: 2
+ periodSeconds: 5
+ successThreshold: 1
+ tcpSocket:
+ port: 80
+ timeoutSeconds: 2
+ readinessProbe:
+ failureThreshold: 3
+ initialDelaySeconds: 2
+ periodSeconds: 5
+ successThreshold: 1
+ tcpSocket:
+ port: 80
+ timeoutSeconds: 2
+ image: "localhost:5000/kennethreitz/httpbin:dev"
+ imagePullPolicy: IfNotPresent
+ name: httpbin
+ ports:
+ - containerPort: 80
+ name: "http"
+ protocol: "TCP"
+`, name, name, name)
+ _httpService := fmt.Sprintf(`
+apiVersion: v1
+kind: Service
+metadata:
+ name: %s
+spec:
+ selector:
+ app: %s
+ ports:
+ - name: http
+ port: 80
+ protocol: TCP
+ targetPort: 80
+ type: ClusterIP
+`, name, name)
+
+ err := s.CreateResourceFromString(s.FormatRegistry(_httpbinDeploymentTemplate))
+ assert.Nil(ginkgo.GinkgoT(), err, "create temp httpbin deployment")
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(_httpService), "create temp httpbin service")
+
+ return fmt.Sprintf("httpbin-temp.%s.svc.cluster.local", s.Namespace())
+ }
+
+ // Cases:
+ // --- Basic Function ---
+ // 1. ApisixRoute refers to ApisixUpstream, ApisixUpstream refers to third-party service
+ // 2. ApisixRoute refers to ApisixUpstream, ApisixUpstream refers to ExternalName service
+ // 3. ApisixRoute refers to ApisixUpstream, ApisixUpstream refers to multiple third-party or ExternalName services
+ // 4. ApisixRoute refers to ApisixUpstream and Backends, ApisixUpstream refers to ExternalName service
+ // --- Update Cases ---
+ // o 1. ApisixRoute refers to ApisixUpstream, but the ApisixUpstream is created later
+ // o 2. ApisixRoute refers to ApisixUpstream, but the ExternalName service is created later
+ // o 3. ApisixRoute refers to ApisixUpstream, but the ApisixUpstream is updated and change to another ExternalName service
+ // o 4. ApisixRoute refers to ApisixUpstream, the ApisixUpstream doesn't change, but the ExternalName service itself is updated
+ // --- Delete Cases ---
+ // 1. ApisixRoute is deleted, the generated resources should be removed
+
+ s := scaffold.NewDefaultV2Scaffold()
+
+ ginkgo.Describe("basic function: ", func() {
+ ginkgo.It("should be able to access third-party service", func() {
+ // -- Data preparation --
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbin.org")
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ ginkgo.It("should be able to access third-party service with plugins", func() {
+ // -- Data preparation --
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbun.org")
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+
+ // -- Expect failed --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbun.org", 80, translation.DefaultWeight)
+ PhaseValidateRouteAccessCode(s, upstreamId, http.StatusBadGateway)
+
+ // -- update --
+ PhaseCreateApisixRouteWithHostRewrite(s, "httpbin-route", "httpbin-upstream", "httpbun.org")
+
+ // -- validation --
+ upstreamId = PhaseValidateFirstUpstream(s, 1, "httpbun.org", 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ ginkgo.It("should be able to access external domain ExternalName service", func() {
+ // -- Data preparation --
+ PhaseCreateExternalService(s, "ext-httpbin", "httpbin.org")
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin")
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ ginkgo.It("should be able to access in-cluster ExternalName service", func() {
+ // -- Data preparation --
+ fqdn := PhaseCreateHttpbin(s, "httpbin-temp")
+
+ // We are only testing the functionality of the external service and do not care which namespace the service is in.
+ // The namespace of the external service should be watched.
+ PhaseCreateExternalService(s, "ext-httpbin", fqdn)
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin")
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn, 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ })
+ ginkgo.Describe("complex usage: ", func() {
+ PhaseCreateApisixUpstreamWithMultipleExternalNodes := func(s *scaffold.Scaffold, name string,
+ nodeTypeA v2.ApisixUpstreamExternalType, nodeNameA string, nodeTypeB v2.ApisixUpstreamExternalType, nodeNameB string) {
+ au := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixUpstream
+metadata:
+ name: %s
+spec:
+ externalNodes:
+ - type: %s
+ name: %s
+ - type: %s
+ name: %s
+`, name, nodeTypeA, nodeNameA, nodeTypeB, nodeNameB)
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au))
+ }
+
+ PhaseCreateApisixRouteWithHostRewriteAndBackend := func(s *scaffold.Scaffold, name, upstream, hostRewrite, serviceName string, servicePort int) {
+ ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: %s
+spec:
+ http:
+ - name: rule1
+ match:
+ hosts:
+ - httpbin.org
+ paths:
+ - /*
+ exprs:
+ - subject:
+ scope: Header
+ name: X-Foo
+ op: Equal
+ value: bar
+ upstreams:
+ - name: %s
+ backends:
+ - serviceName: %s
+ servicePort: %d
+ resolveGranularity: service
+ plugins:
+ - name: proxy-rewrite
+ enable: true
+ config:
+ host: %s
+`, name, upstream, serviceName, servicePort, hostRewrite)
+
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar))
+ }
+
+ validateHttpbinAndHttpbunAreAccessed := func() {
+ hasPoweredBy := false // httpbun.org
+ hasNoPoweredBy := false // httpbin.org
+ for i := 0; i < 20; i++ {
+ headers := s.NewAPISIXClient().GET("/ip").
+ WithHeader("Host", "httpbin.org").
+ WithHeader("X-Foo", "bar").
+ Expect().
+ Status(http.StatusOK).
+ Headers().Raw()
+ if val, ok := headers["X-Powered-By"]; ok {
+ switch value := val.(type) {
+ case []interface{}:
+ forloop:
+ for _, header := range value {
+ switch vv := header.(type) {
+ case string:
+ if vv == "httpbun" {
+ hasPoweredBy = true
+ break forloop
+ }
+ default:
+ log.Errorw("type", zap.Any("type", reflect.TypeOf(val)))
+ }
+ }
+ default:
+ log.Errorw("type", zap.Any("type", reflect.TypeOf(val)))
+ }
+ } else {
+ hasNoPoweredBy = true
+ }
+ if hasPoweredBy && hasNoPoweredBy {
+ break
+ }
+ }
+
+ assert.True(ginkgo.GinkgoT(), hasPoweredBy && hasNoPoweredBy, "both httpbin and httpbun should be accessed at least once")
+ }
+
+ type validateFactor struct {
+ port int
+ weight int
+ }
+ // Note: expected nodes has unique host
+ PhaseValidateMultipleNodes := func(s *scaffold.Scaffold, length int, nodes map[string]*validateFactor) {
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, 1, "upstream count")
+
+ upstream := ups[0]
+ assert.Len(ginkgo.GinkgoT(), upstream.Nodes, length)
+ for _, node := range upstream.Nodes {
+ host := node.Host
+ if factor, ok := nodes[host]; ok {
+ assert.Equal(ginkgo.GinkgoT(), factor.port, node.Port)
+ assert.Equal(ginkgo.GinkgoT(), factor.weight, node.Weight)
+ } else {
+ err := fmt.Errorf("host %s appear but it shouldn't", host)
+ assert.Nil(ginkgo.GinkgoT(), err)
+ }
+ }
+
+ routes, err := s.ListApisixRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), routes, 1, "route count")
+ assert.Equal(ginkgo.GinkgoT(), ups[0].ID, routes[0].UpstreamId)
+
+ validateHttpbinAndHttpbunAreAccessed()
+ }
+
+ // Note: expected nodes has unique host
+ PhaseValidateTrafficSplit := func(s *scaffold.Scaffold, length int, upstreamId string, nodes map[string]*validateFactor) {
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, length, "upstream count")
+
+ for _, upstream := range ups {
+ assert.Len(ginkgo.GinkgoT(), upstream.Nodes, 1)
+ host := upstream.Nodes[0].Host
+ if factor, ok := nodes[host]; ok {
+ assert.Equal(ginkgo.GinkgoT(), factor.port, upstream.Nodes[0].Port)
+ assert.Equal(ginkgo.GinkgoT(), factor.weight, upstream.Nodes[0].Weight)
+ } else {
+ err := fmt.Errorf("host %s appear but it shouldn't", host)
+ assert.Nil(ginkgo.GinkgoT(), err)
+ }
+ }
+
+ routes, err := s.ListApisixRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), routes, 1, "route count")
+ assert.Equal(ginkgo.GinkgoT(), upstreamId, routes[0].UpstreamId)
+
+ validateHttpbinAndHttpbunAreAccessed()
+ }
+
+ ginkgo.It("should be able to access multiple external services", func() {
+ // -- Data preparation --
+ PhaseCreateApisixUpstreamWithMultipleExternalNodes(s, "httpbin-upstream",
+ v2.ExternalTypeDomain, "httpbin.org", v2.ExternalTypeDomain, "httpbun.org")
+ PhaseCreateApisixRouteWithHostRewrite(s, "httpbin-route", "httpbin-upstream", "httpbun.org")
+
+ // -- validation --
+ PhaseValidateMultipleNodes(s, 2, map[string]*validateFactor{
+ "httpbin.org": {
+ port: 80,
+ weight: translation.DefaultWeight,
+ },
+ "httpbun.org": {
+ port: 80,
+ weight: translation.DefaultWeight,
+ },
+ })
+ })
+ ginkgo.It("should be able to use backends and upstreams together", func() {
+ // -- Data preparation --
+ PhaseCreateHttpbin(s, "httpbin-temp")
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbun.org")
+ PhaseCreateApisixRouteWithHostRewriteAndBackend(s, "httpbin-route", "httpbin-upstream", "httpbun.org", "httpbin-temp", 80)
+
+ svc, err := s.GetServiceByName("httpbin-temp")
+ assert.Nil(ginkgo.GinkgoT(), err, "get httpbin service")
+ ip := svc.Spec.ClusterIP
+
+ upName := apisixv1.ComposeUpstreamName(s.Namespace(), "httpbin-temp", "", 80, types.ResolveGranularity.Service)
+ upID := id.GenID(upName)
+
+ // -- validation --
+ PhaseValidateTrafficSplit(s, 2, upID, map[string]*validateFactor{
+ ip: {
+ port: 80,
+ weight: translation.DefaultWeight,
+ },
+ "httpbun.org": {
+ port: 80,
+ weight: translation.DefaultWeight,
+ },
+ })
+ })
+ })
+ ginkgo.Describe("update function: ", func() {
+ ginkgo.It("should be able to create the ApisixUpstream later", func() {
+ // -- Data preparation --
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+ PhaseValidateNoUpstreams(s)
+
+ // -- Data Update --
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbin.org")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ ginkgo.It("should be able to create the ExternalName service later", func() {
+ // -- Data preparation --
+ fqdn := PhaseCreateHttpbin(s, "httpbin-temp")
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin")
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+ PhaseValidateNoUpstreams(s)
+
+ // -- Data update --
+ PhaseCreateExternalService(s, "ext-httpbin", fqdn)
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn, 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ ginkgo.It("should be able to update the ApisixUpstream later", func() {
+ // -- Data preparation --
+ fqdn := PhaseCreateHttpbin(s, "httpbin-temp")
+ PhaseCreateExternalService(s, "ext-httpbin", fqdn)
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "doesnt-exist")
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+ PhaseValidateNoUpstreams(s)
+
+ // -- Data update --
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn, 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ ginkgo.It("should be able to update the ExternalName service later", func() {
+ // -- Data preparation --
+ PhaseCreateExternalService(s, "ext-httpbin", "unknown.org")
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin")
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+ PhaseValidateFirstUpstream(s, 1, "unknown.org", 80, translation.DefaultWeight)
+
+ // -- Data update --
+ PhaseCreateExternalService(s, "ext-httpbin", "httpbin.org")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ })
+ ginkgo.Describe("delete function: ", func() {
+ ginkgo.It("should be able to delete resources", func() {
+ // -- Data preparation --
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbin.org")
+ PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight)
+ PhaseValidateRouteAccess(s, upstreamId)
+
+ // -- delete --
+ assert.Nil(ginkgo.GinkgoT(), s.DeleteResource("ar", "httpbin-route"), "delete route")
+ assert.Nil(ginkgo.GinkgoT(), s.DeleteResource("au", "httpbin-upstream"), "delete upstream")
+ time.Sleep(time.Second * 15)
+
+ // -- validate --
+ PhaseValidateNoRoutes(s)
+ PhaseValidateNoUpstreams(s)
+ })
+ })
+})