You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/07/10 13:20:44 UTC
[apisix-ingress-controller] branch master updated: feat: add logic
for ApisixRoute v2beta1 (#576)
This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 1c17b41 feat: add logic for ApisixRoute v2beta1 (#576)
1c17b41 is described below
commit 1c17b41249361444b5b10f4a8897f62484b545b0
Author: kv <gx...@163.com>
AuthorDate: Sat Jul 10 21:20:36 2021 +0800
feat: add logic for ApisixRoute v2beta1 (#576)
---
pkg/config/config.go | 2 +
pkg/ingress/apisix_route.go | 61 ++++--
pkg/ingress/controller.go | 10 +-
pkg/ingress/pod.go | 2 +-
pkg/kube/apisix/apis/config/v2beta1/types.go | 51 +----
.../apis/config/v2beta1/zz_generated.deepcopy.go | 50 +----
pkg/kube/apisix_route.go | 23 +-
pkg/kube/translation/apisix_route.go | 231 ++++++++++++++++++++-
pkg/kube/translation/plugin.go | 6 +-
pkg/kube/translation/plugin_test.go | 10 +-
pkg/kube/translation/translator.go | 7 +
pkg/kube/translation/util.go | 63 +++++-
samples/deploy/crd/v1beta1/ApisixRoute.yaml | 4 +-
test/e2e/ingress/stream.go | 108 ++++++++++
test/e2e/scaffold/apisix.go | 4 +
test/e2e/scaffold/ingress.go | 2 +-
test/e2e/scaffold/k8s.go | 11 +
test/e2e/scaffold/scaffold.go | 18 ++
test/e2e/testdata/apisix-gw-config.yaml | 2 +
19 files changed, 535 insertions(+), 130 deletions(-)
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 8944607..c7c7d24 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -50,6 +50,8 @@ const (
ApisixRouteV1 = "apisix.apache.org/v1"
// ApisixRouteV2alpha1 represents apisixroute.apisix.apache.org/v2alpha1
ApisixRouteV2alpha1 = "apisix.apache.org/v2alpha1"
+ // ApisixRouteV2beta1 represents apisixroute.apisix.apache.org/v2beta1
+ ApisixRouteV2beta1 = "apisix.apache.org/v2beta1"
_minimalResyncInterval = 30 * time.Second
)
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index 5a1a6c3..de99a5d 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -93,12 +93,14 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
ar kube.ApisixRoute
tctx *translation.TranslateContext
)
- if obj.GroupVersion == kube.ApisixRouteV1 {
+ switch obj.GroupVersion {
+ case kube.ApisixRouteV1:
ar, err = c.controller.apisixRouteLister.V1(namespace, name)
- } else {
+ case kube.ApisixRouteV2alpha1:
ar, err = c.controller.apisixRouteLister.V2alpha1(namespace, name)
+ case kube.ApisixRouteV2beta1:
+ ar, err = c.controller.apisixRouteLister.V2beta1(namespace, name)
}
-
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get ApisixRoute",
@@ -129,7 +131,9 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
}
ar = ev.Tombstone.(kube.ApisixRoute)
}
- if obj.GroupVersion == kube.ApisixRouteV1 {
+ //
+ switch obj.GroupVersion {
+ case kube.ApisixRouteV1:
tctx, err = c.controller.translator.TranslateRouteV1(ar.V1())
if err != nil {
log.Errorw("failed to translate ApisixRoute v1",
@@ -138,7 +142,7 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
)
return err
}
- } else {
+ case kube.ApisixRouteV2alpha1:
if ev.Type != types.EventDelete {
tctx, err = c.controller.translator.TranslateRouteV2alpha1(ar.V2alpha1())
} else {
@@ -154,6 +158,19 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
)
return err
}
+ case kube.ApisixRouteV2beta1:
+ if ev.Type != types.EventDelete {
+ tctx, err = c.controller.translator.TranslateRouteV2beta1(ar.V2beta1())
+ } else {
+ tctx, err = c.controller.translator.TranslateRouteV2beta1NotStrictly(ar.V2beta1())
+ }
+ if err != nil {
+ log.Errorw("failed to translate ApisixRoute v2beta1",
+ zap.Error(err),
+ zap.Any("object", ar),
+ )
+ return err
+ }
}
log.Debugw("translated ApisixRoute",
@@ -180,18 +197,21 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
added = m
} else {
var oldCtx *translation.TranslateContext
- if obj.GroupVersion == kube.ApisixRouteV1 {
+ switch obj.GroupVersion {
+ case kube.ApisixRouteV1:
oldCtx, err = c.controller.translator.TranslateRouteV1(obj.OldObject.V1())
- } else {
+ case kube.ApisixRouteV2alpha1:
oldCtx, err = c.controller.translator.TranslateRouteV2alpha1(obj.OldObject.V2alpha1())
+ case kube.ApisixRouteV2beta1:
+ oldCtx, err = c.controller.translator.TranslateRouteV2beta1(obj.OldObject.V2beta1())
}
if err != nil {
- log.Errorw("failed to translate old ApisixRoute v2alpha1",
+ log.Errorw("failed to translate old ApisixRoute",
+ zap.String("version", obj.GroupVersion),
zap.String("event", "update"),
zap.Error(err),
zap.Any("ApisixRoute", ar),
)
-
return err
}
@@ -215,19 +235,26 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
return
}
var ar kube.ApisixRoute
- if event.GroupVersion == kube.ApisixRouteV1 {
+ switch event.GroupVersion {
+ case kube.ApisixRouteV1:
ar, errLocal = c.controller.apisixRouteLister.V1(namespace, name)
- } else {
+ case kube.ApisixRouteV2alpha1:
ar, errLocal = c.controller.apisixRouteLister.V2alpha1(namespace, name)
+ case kube.ApisixRouteV2beta1:
+ ar, errLocal = c.controller.apisixRouteLister.V2beta1(namespace, name)
}
if errOrigin == nil {
if ev.Type != types.EventDelete {
if errLocal == nil {
- if ar.GroupVersion() == kube.ApisixRouteV1 {
+ switch ar.GroupVersion() {
+ case kube.ApisixRouteV1:
c.controller.recorderEvent(ar.V1(), v1.EventTypeNormal, _resourceSynced, nil)
- } else if ar.GroupVersion() == kube.ApisixRouteV2alpha1 {
+ case kube.ApisixRouteV2alpha1:
c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(ar.V2alpha1(), _resourceSynced, nil, metav1.ConditionTrue)
+ case kube.ApisixRouteV2beta1:
+ c.controller.recorderEvent(ar.V2beta1(), v1.EventTypeNormal, _resourceSynced, nil)
+ c.controller.recordStatus(ar.V2beta1(), _resourceSynced, nil, metav1.ConditionTrue)
}
} else {
log.Errorw("failed list ApisixRoute",
@@ -245,11 +272,15 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
zap.Error(errOrigin),
)
if errLocal == nil {
- if ar.GroupVersion() == kube.ApisixRouteV1 {
+ switch ar.GroupVersion() {
+ case kube.ApisixRouteV1:
c.controller.recorderEvent(ar.V1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
- } else if ar.GroupVersion() == kube.ApisixRouteV2alpha1 {
+ case kube.ApisixRouteV2alpha1:
c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
c.controller.recordStatus(ar.V2alpha1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
+ case kube.ApisixRouteV2beta1:
+ c.controller.recorderEvent(ar.V2beta1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
+ c.controller.recordStatus(ar.V2beta1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
}
} else {
log.Errorw("failed list ApisixRoute",
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 0e7c620..7694f0b 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -194,6 +194,7 @@ func (c *Controller) initWhenStartLeading() {
c.apisixRouteLister = kube.NewApisixRouteLister(
apisixFactory.Apisix().V1().ApisixRoutes().Lister(),
apisixFactory.Apisix().V2alpha1().ApisixRoutes().Lister(),
+ apisixFactory.Apisix().V2beta1().ApisixRoutes().Lister(),
)
c.apisixUpstreamLister = apisixFactory.Apisix().V1().ApisixUpstreams().Lister()
c.apisixTlsLister = apisixFactory.Apisix().V1().ApisixTlses().Lister()
@@ -217,10 +218,13 @@ func (c *Controller) initWhenStartLeading() {
} else {
ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer()
}
- if c.cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
- apisixRouteInformer = apisixFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
- } else {
+ switch c.cfg.Kubernetes.ApisixRouteVersion {
+ case config.ApisixRouteV1:
apisixRouteInformer = apisixFactory.Apisix().V1().ApisixRoutes().Informer()
+ case config.ApisixRouteV2alpha1:
+ apisixRouteInformer = apisixFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
+ case config.ApisixRouteV2beta1:
+ apisixRouteInformer = apisixFactory.Apisix().V2beta1().ApisixRoutes().Informer()
}
c.podInformer = kubeFactory.Core().V1().Pods().Informer()
diff --git a/pkg/ingress/pod.go b/pkg/ingress/pod.go
index 624ad01..a5b1cd2 100644
--- a/pkg/ingress/pod.go
+++ b/pkg/ingress/pod.go
@@ -65,7 +65,7 @@ func (c *podController) onAdd(obj interface{}) {
return
}
log.Debugw("pod add event arrived",
- zap.Any("object", obj),
+ zap.String("obj.key", key),
)
pod := obj.(*corev1.Pod)
if err := c.controller.podCache.Add(pod); err != nil {
diff --git a/pkg/kube/apisix/apis/config/v2beta1/types.go b/pkg/kube/apisix/apis/config/v2beta1/types.go
index d2d4335..21eb5f6 100644
--- a/pkg/kube/apisix/apis/config/v2beta1/types.go
+++ b/pkg/kube/apisix/apis/config/v2beta1/types.go
@@ -19,6 +19,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
+
+ "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
)
// +genclient
@@ -53,14 +55,14 @@ type ApisixRouteHTTP struct {
Priority int `json:"priority,omitempty" yaml:"priority,omitempty"`
Match ApisixRouteHTTPMatch `json:"match,omitempty" yaml:"match,omitempty"`
// Deprecated: Backend will be removed in the future, use Backends instead.
- Backend ApisixRouteHTTPBackend `json:"backend" yaml:"backend"`
+ Backend v2alpha1.ApisixRouteHTTPBackend `json:"backend" yaml:"backend"`
// Backends represents potential backends to proxy after the route
// rule matched. When number of backends are more than one, traffic-split
// plugin in APISIX will be used to split traffic based on the backend weight.
- Backends []ApisixRouteHTTPBackend `json:"backends" yaml:"backends"`
- Websocket bool `json:"websocket" yaml:"websocket"`
- Plugins []ApisixRouteHTTPPlugin `json:"plugins,omitempty" yaml:"plugins,omitempty"`
- Authentication ApisixRouteAuthentication `json:"authentication,omitempty" yaml:"authentication,omitempty"`
+ Backends []v2alpha1.ApisixRouteHTTPBackend `json:"backends" yaml:"backends"`
+ Websocket bool `json:"websocket" yaml:"websocket"`
+ Plugins []ApisixRouteHTTPPlugin `json:"plugins,omitempty" yaml:"plugins,omitempty"`
+ Authentication ApisixRouteAuthentication `json:"authentication,omitempty" yaml:"authentication,omitempty"`
}
// ApisixRouteHTTPMatch represents the match condition for hitting this route.
@@ -90,25 +92,7 @@ type ApisixRouteHTTPMatch struct {
// value:
// - "127.0.0.1"
// - "10.0.5.11"
- NginxVars []ApisixRouteHTTPMatchExpr `json:"exprs,omitempty" yaml:"exprs,omitempty"`
-}
-
-// ApisixRouteHTTPMatchExpr represents a binary route match expression .
-type ApisixRouteHTTPMatchExpr struct {
- // Subject is the expression subject, it can
- // be any string composed by literals and nginx
- // vars.
- Subject ApisixRouteHTTPMatchExprSubject `json:"subject" yaml:"subject"`
- // Op is the operator.
- Op string `json:"op" yaml:"op"`
- // Set is an array type object of the expression.
- // It should be used when the Op is "in" or "not_in";
- Set []string `json:"set" yaml:"set"`
- // Value is the normal type object for the expression,
- // it should be used when the Op is not "in" and "not_in".
- // Set and Value are exclusive so only of them can be set
- // in the same time.
- Value string `json:"value" yaml:"value"`
+ NginxVars []v2alpha1.ApisixRouteHTTPMatchExpr `json:"exprs,omitempty" yaml:"exprs,omitempty"`
}
// ApisixRouteHTTPMatchExprSubject describes the route match expression subject.
@@ -122,25 +106,6 @@ type ApisixRouteHTTPMatchExprSubject struct {
Name string `json:"name" yaml:"name"`
}
-// ApisixRouteHTTPBackend represents a HTTP backend (a Kuberentes Service).
-type ApisixRouteHTTPBackend struct {
- // The name (short) of the service, note cross namespace is forbidden,
- // so be sure the ApisixRoute and Service are in the same namespace.
- ServiceName string `json:"serviceName" yaml:"serviceName"`
- // The service port, could be the name or the port number.
- ServicePort intstr.IntOrString `json:"servicePort" yaml:"servicePort"`
- // The resolve granularity, can be "endpoints" or "service",
- // when set to "endpoints", the pod ips will be used; other
- // wise, the service ClusterIP or ExternalIP will be used,
- // default is endpoints.
- ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
- // Weight of this backend.
- Weight int `json:"weight" yaml:"weight"`
- // Subset specifies a subset for the target Service. The subset should be pre-defined
- // in ApisixUpstream about this service.
- Subset string `json:"subset" yaml:"subset"`
-}
-
// ApisixRouteHTTPPlugin represents an APISIX plugin.
type ApisixRouteHTTPPlugin struct {
// The plugin name.
diff --git a/pkg/kube/apisix/apis/config/v2beta1/zz_generated.deepcopy.go b/pkg/kube/apisix/apis/config/v2beta1/zz_generated.deepcopy.go
index ac06d4d..f064c5e 100644
--- a/pkg/kube/apisix/apis/config/v2beta1/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v2beta1/zz_generated.deepcopy.go
@@ -20,6 +20,7 @@
package v2beta1
import (
+ v2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
@@ -89,11 +90,13 @@ func (in *ApisixRouteAuthenticationKeyAuth) DeepCopy() *ApisixRouteAuthenticatio
func (in *ApisixRouteHTTP) DeepCopyInto(out *ApisixRouteHTTP) {
*out = *in
in.Match.DeepCopyInto(&out.Match)
- out.Backend = in.Backend
+ in.Backend.DeepCopyInto(&out.Backend)
if in.Backends != nil {
in, out := &in.Backends, &out.Backends
- *out = make([]ApisixRouteHTTPBackend, len(*in))
- copy(*out, *in)
+ *out = make([]v2alpha1.ApisixRouteHTTPBackend, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
}
if in.Plugins != nil {
in, out := &in.Plugins, &out.Plugins
@@ -117,23 +120,6 @@ func (in *ApisixRouteHTTP) DeepCopy() *ApisixRouteHTTP {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *ApisixRouteHTTPBackend) DeepCopyInto(out *ApisixRouteHTTPBackend) {
- *out = *in
- out.ServicePort = in.ServicePort
- return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteHTTPBackend.
-func (in *ApisixRouteHTTPBackend) DeepCopy() *ApisixRouteHTTPBackend {
- if in == nil {
- return nil
- }
- out := new(ApisixRouteHTTPBackend)
- in.DeepCopyInto(out)
- return out
-}
-
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApisixRouteHTTPMatch) DeepCopyInto(out *ApisixRouteHTTPMatch) {
*out = *in
if in.Paths != nil {
@@ -158,7 +144,7 @@ func (in *ApisixRouteHTTPMatch) DeepCopyInto(out *ApisixRouteHTTPMatch) {
}
if in.NginxVars != nil {
in, out := &in.NginxVars, &out.NginxVars
- *out = make([]ApisixRouteHTTPMatchExpr, len(*in))
+ *out = make([]v2alpha1.ApisixRouteHTTPMatchExpr, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
@@ -177,28 +163,6 @@ func (in *ApisixRouteHTTPMatch) DeepCopy() *ApisixRouteHTTPMatch {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *ApisixRouteHTTPMatchExpr) DeepCopyInto(out *ApisixRouteHTTPMatchExpr) {
- *out = *in
- out.Subject = in.Subject
- if in.Set != nil {
- in, out := &in.Set, &out.Set
- *out = make([]string, len(*in))
- copy(*out, *in)
- }
- return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteHTTPMatchExpr.
-func (in *ApisixRouteHTTPMatchExpr) DeepCopy() *ApisixRouteHTTPMatchExpr {
- if in == nil {
- return nil
- }
- out := new(ApisixRouteHTTPMatchExpr)
- in.DeepCopyInto(out)
- return out
-}
-
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApisixRouteHTTPMatchExprSubject) DeepCopyInto(out *ApisixRouteHTTPMatchExprSubject) {
*out = *in
return
diff --git a/pkg/kube/apisix_route.go b/pkg/kube/apisix_route.go
index b51a0d6..e5274c1 100644
--- a/pkg/kube/apisix_route.go
+++ b/pkg/kube/apisix_route.go
@@ -22,6 +22,7 @@ import (
configv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta1"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
listersv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2alpha1"
+ listersv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2beta1"
)
const (
@@ -40,6 +41,8 @@ type ApisixRouteLister interface {
V1(string, string) (ApisixRoute, error)
// V2alpha1 gets the ApisixRoute in apisix.apache.org/v2alpha1.
V2alpha1(string, string) (ApisixRoute, error)
+ // V2beta1 gets the ApisixRoute in apisix.apache.org/v2beta1.
+ V2beta1(string, string) (ApisixRoute, error)
}
// ApisixRouteInformer is an encapsulation for the informer of ApisixRoute,
@@ -118,6 +121,7 @@ func (ar *apisixRoute) ResourceVersion() string {
type apisixRouteLister struct {
v1Lister listersv1.ApisixRouteLister
v2alpha1Lister listersv2alpha1.ApisixRouteLister
+ v2beta1Lister listersv2beta1.ApisixRouteLister
}
func (l *apisixRouteLister) V1(namespace, name string) (ApisixRoute, error) {
@@ -142,6 +146,17 @@ func (l *apisixRouteLister) V2alpha1(namespace, name string) (ApisixRoute, error
}, nil
}
+func (l *apisixRouteLister) V2beta1(namespace, name string) (ApisixRoute, error) {
+ ar, err := l.v2beta1Lister.ApisixRoutes(namespace).Get(name)
+ if err != nil {
+ return nil, err
+ }
+ return &apisixRoute{
+ groupVersion: ApisixRouteV2beta1,
+ v2beta1: ar,
+ }, nil
+}
+
// MustNewApisixRoute creates a kube.ApisixRoute object according to the
// type of obj.
func MustNewApisixRoute(obj interface{}) ApisixRoute {
@@ -156,6 +171,11 @@ func MustNewApisixRoute(obj interface{}) ApisixRoute {
groupVersion: ApisixRouteV2alpha1,
v2alpha1: ar,
}
+ case *configv2beta1.ApisixRoute:
+ return &apisixRoute{
+ groupVersion: ApisixRouteV2beta1,
+ v2beta1: ar,
+ }
default:
panic("invalid ApisixRoute type")
}
@@ -181,9 +201,10 @@ func NewApisixRoute(obj interface{}) (ApisixRoute, error) {
}
}
-func NewApisixRouteLister(v1 listersv1.ApisixRouteLister, v2alpha1 listersv2alpha1.ApisixRouteLister) ApisixRouteLister {
+func NewApisixRouteLister(v1 listersv1.ApisixRouteLister, v2alpha1 listersv2alpha1.ApisixRouteLister, v2beta1 listersv2beta1.ApisixRouteLister) ApisixRouteLister {
return &apisixRouteLister{
v1Lister: v1,
v2alpha1Lister: v2alpha1,
+ v2beta1Lister: v2beta1,
}
}
diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go
index a8ea661..3cd4c76 100644
--- a/pkg/kube/translation/apisix_route.go
+++ b/pkg/kube/translation/apisix_route.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/id"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
+ configv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta1"
"github.com/apache/apisix-ingress-controller/pkg/log"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -133,6 +134,150 @@ func (t *translator) translateHTTPRouteNotStrictly(ctx *TranslateContext, ar *co
return nil
}
+func (t *translator) TranslateRouteV2beta1(ar *configv2beta1.ApisixRoute) (*TranslateContext, error) {
+ ctx := &TranslateContext{
+ upstreamMap: make(map[string]struct{}),
+ }
+
+ if err := t.translateHTTPRouteV2beta1(ctx, ar); err != nil {
+ return nil, err
+ }
+ if err := t.translateStreamRoute(ctx, ar); err != nil {
+ return nil, err
+ }
+ return ctx, nil
+}
+
+func (t *translator) TranslateRouteV2beta1NotStrictly(ar *configv2beta1.ApisixRoute) (*TranslateContext, error) {
+ ctx := &TranslateContext{
+ upstreamMap: make(map[string]struct{}),
+ }
+
+ if err := t.translateHTTPRouteV2beta1NotStrictly(ctx, ar); err != nil {
+ return nil, err
+ }
+ if err := t.translateStreamRouteNotStrictly(ctx, ar); err != nil {
+ return nil, err
+ }
+ return ctx, nil
+}
+
+func (t *translator) translateHTTPRouteV2beta1(ctx *TranslateContext, ar *configv2beta1.ApisixRoute) error {
+ ruleNameMap := make(map[string]struct{})
+ for _, part := range ar.Spec.HTTP {
+ if _, ok := ruleNameMap[part.Name]; ok {
+ return errors.New("duplicated route rule name")
+ }
+ ruleNameMap[part.Name] = struct{}{}
+ backends := part.Backends
+ backend := part.Backend
+ if len(backends) > 0 {
+ // Use the first backend as the default backend in Route,
+ // others will be configured in traffic-split plugin.
+ backend = backends[0]
+ backends = backends[1:]
+ } // else use the deprecated Backend.
+
+ svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(&backend, ar.Namespace)
+ if err != nil {
+ log.Errorw("failed to get service port in backend",
+ zap.Any("backend", backend),
+ zap.Any("apisix_route", ar),
+ zap.Error(err),
+ )
+ return err
+ }
+
+ pluginMap := make(apisixv1.Plugins)
+ // add route plugins
+ for _, plugin := range part.Plugins {
+ if !plugin.Enable {
+ continue
+ }
+ if plugin.Config != nil {
+ pluginMap[plugin.Name] = plugin.Config
+ } else {
+ pluginMap[plugin.Name] = make(map[string]interface{})
+ }
+ }
+
+ // add KeyAuth and basicAuth plugin
+ if part.Authentication.Enable {
+ switch part.Authentication.Type {
+ case "keyAuth":
+ pluginMap["key-auth"] = part.Authentication.KeyAuth
+ case "basicAuth":
+ pluginMap["basic-auth"] = make(map[string]interface{})
+ default:
+ pluginMap["basic-auth"] = make(map[string]interface{})
+ }
+ }
+
+ var exprs [][]apisixv1.StringOrSlice
+ if part.Match.NginxVars != nil {
+ exprs, err = t.translateRouteMatchExprs(part.Match.NginxVars)
+ if err != nil {
+ log.Errorw("ApisixRoute with bad nginxVars",
+ zap.Error(err),
+ zap.Any("ApisixRoute", ar),
+ )
+ return err
+ }
+ }
+ if err := validateRemoteAddrs(part.Match.RemoteAddrs); err != nil {
+ log.Errorw("ApisixRoute with invalid remote addrs",
+ zap.Error(err),
+ zap.Strings("remote_addrs", part.Match.RemoteAddrs),
+ zap.Any("ApisixRoute", ar),
+ )
+ return err
+ }
+
+ upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort)
+ route := apisixv1.NewDefaultRoute()
+ route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
+ route.ID = id.GenID(route.Name)
+ route.Priority = part.Priority
+ route.RemoteAddrs = part.Match.RemoteAddrs
+ route.Vars = exprs
+ route.Hosts = part.Match.Hosts
+ route.Uris = part.Match.Paths
+ route.Methods = part.Match.Methods
+ route.UpstreamId = id.GenID(upstreamName)
+ route.EnableWebsocket = part.Websocket
+ route.Plugins = pluginMap
+
+ if len(backends) > 0 {
+ weight := _defaultWeight
+ if backend.Weight != nil {
+ weight = *backend.Weight
+ }
+ backendPoints := make([]*configv2alpha1.ApisixRouteHTTPBackend, 0)
+ for _, b := range backends {
+ backendPoints = append(backendPoints, &b)
+ }
+ plugin, err := t.translateTrafficSplitPlugin(ctx, ar.Namespace, weight, backendPoints)
+ if err != nil {
+ log.Errorw("failed to translate traffic-split plugin",
+ zap.Error(err),
+ zap.Any("ApisixRoute", ar),
+ )
+ return err
+ }
+ route.Plugins["traffic-split"] = plugin
+ }
+ ctx.addRoute(route)
+ if !ctx.checkUpstreamExist(upstreamName) {
+ ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
+ if err != nil {
+ return err
+ }
+ ctx.addUpstream(ups)
+ }
+ }
+ return nil
+}
+
func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha1.ApisixRoute) error {
ruleNameMap := make(map[string]struct{})
for _, part := range ar.Spec.HTTP {
@@ -149,7 +294,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha
backends = backends[1:]
} // else use the deprecated Backend.
- svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ar)
+ svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ar.Namespace)
if err != nil {
log.Errorw("failed to get service port in backend",
zap.Any("backend", backend),
@@ -223,7 +368,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha
if backend.Weight != nil {
weight = *backend.Weight
}
- plugin, err := t.translateTrafficSplitPlugin(ctx, ar, weight, backends)
+ plugin, err := t.translateTrafficSplitPlugin(ctx, ar.Namespace, weight, backends)
if err != nil {
log.Errorw("failed to translate traffic-split plugin",
zap.Error(err),
@@ -366,6 +511,41 @@ func (t *translator) translateTCPRouteNotStrictly(ctx *TranslateContext, ar *con
return nil
}
+func (t *translator) translateStreamRoute(ctx *TranslateContext, ar *configv2beta1.ApisixRoute) error {
+ ruleNameMap := make(map[string]struct{})
+ for _, part := range ar.Spec.Stream {
+ if _, ok := ruleNameMap[part.Name]; ok {
+ return errors.New("duplicated route rule name")
+ }
+ ruleNameMap[part.Name] = struct{}{}
+ backend := part.Backend
+ svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPort(backend, ar.Namespace)
+ if err != nil {
+ log.Errorw("failed to get service port in backend",
+ zap.Any("backend", backend),
+ zap.Any("apisix_route", ar),
+ zap.Error(err),
+ )
+ return err
+ }
+ sr := apisixv1.NewDefaultStreamRoute()
+ name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
+ sr.ID = id.GenID(name)
+ sr.ServerPort = part.Match.IngressPort
+ ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
+ if err != nil {
+ return err
+ }
+ sr.UpstreamId = ups.ID
+ ctx.addStreamRoute(sr)
+ if !ctx.checkUpstreamExist(ups.Name) {
+ ctx.addUpstream(ups)
+ }
+
+ }
+ return nil
+}
+
func (t *translator) translateTCPRoute(ctx *TranslateContext, ar *configv2alpha1.ApisixRoute) error {
ruleNameMap := make(map[string]struct{})
for _, part := range ar.Spec.TCP {
@@ -400,3 +580,50 @@ func (t *translator) translateTCPRoute(ctx *TranslateContext, ar *configv2alpha1
}
return nil
}
+
+// translateHTTPRouteV2beta1NotStrictly translates http route with a loose way, only generate ID and Name for delete Event.
+func (t *translator) translateHTTPRouteV2beta1NotStrictly(ctx *TranslateContext, ar *configv2beta1.ApisixRoute) error {
+ for _, part := range ar.Spec.HTTP {
+ backends := part.Backends
+ backend := part.Backend
+ if len(backends) > 0 {
+ // Use the first backend as the default backend in Route,
+ // others will be configured in traffic-split plugin.
+ backend = backends[0]
+ } // else use the deprecated Backend.
+ upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal)
+ route := apisixv1.NewDefaultRoute()
+ route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
+ route.ID = id.GenID(route.Name)
+ ctx.addRoute(route)
+ if !ctx.checkUpstreamExist(upstreamName) {
+ ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal)
+ if err != nil {
+ return err
+ }
+ ctx.addUpstream(ups)
+ }
+ }
+ return nil
+}
+
+// translateStreamRouteNotStrictly translates tcp route with a loose way, only generate ID and Name for delete Event.
+func (t *translator) translateStreamRouteNotStrictly(ctx *TranslateContext, ar *configv2beta1.ApisixRoute) error {
+ for _, part := range ar.Spec.Stream {
+ backend := &part.Backend
+ sr := apisixv1.NewDefaultStreamRoute()
+ name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
+ sr.ID = id.GenID(name)
+ sr.ServerPort = part.Match.IngressPort
+ ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal)
+ if err != nil {
+ return err
+ }
+ sr.UpstreamId = ups.ID
+ ctx.addStreamRoute(sr)
+ if !ctx.checkUpstreamExist(ups.Name) {
+ ctx.addUpstream(ups)
+ }
+ }
+ return nil
+}
diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go
index 3cc6490..046f409 100644
--- a/pkg/kube/translation/plugin.go
+++ b/pkg/kube/translation/plugin.go
@@ -27,18 +27,18 @@ var (
_errPasswordNotFoundOrInvalid = errors.New("key \"password\" not found or invalid in secret")
)
-func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ar *configv2alpha1.ApisixRoute, defaultBackendWeight int,
+func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ns string, defaultBackendWeight int,
backends []*configv2alpha1.ApisixRouteHTTPBackend) (*apisixv1.TrafficSplitConfig, error) {
var (
wups []apisixv1.TrafficSplitConfigRuleWeightedUpstream
)
for _, backend := range backends {
- svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ar)
+ svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ns)
if err != nil {
return nil, err
}
- ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
+ ups, err := t.translateUpstream(ns, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return nil, err
}
diff --git a/pkg/kube/translation/plugin_test.go b/pkg/kube/translation/plugin_test.go
index 534bc3c..0072da6 100644
--- a/pkg/kube/translation/plugin_test.go
+++ b/pkg/kube/translation/plugin_test.go
@@ -178,7 +178,7 @@ func TestTranslateTrafficSplitPlugin(t *testing.T) {
ctx := &TranslateContext{
upstreamMap: make(map[string]struct{}),
}
- cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends)
+ cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends)
assert.Nil(t, err)
assert.Len(t, ctx.Upstreams, 2)
@@ -347,7 +347,7 @@ func TestTranslateTrafficSplitPluginWithSameUpstreams(t *testing.T) {
ApisixUpstreamLister: auLister,
}}
ctx := &TranslateContext{upstreamMap: make(map[string]struct{})}
- cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends)
+ cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends)
assert.Nil(t, err)
assert.Len(t, ctx.Upstreams, 1)
@@ -511,7 +511,7 @@ func TestTranslateTrafficSplitPluginBadCases(t *testing.T) {
ApisixUpstreamLister: auLister,
}}
ctx := &TranslateContext{upstreamMap: make(map[string]struct{})}
- cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends)
+ cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends)
assert.Nil(t, cfg)
assert.Len(t, ctx.Upstreams, 0)
assert.Equal(t, err.Error(), "service \"svc-2\" not found")
@@ -519,14 +519,14 @@ func TestTranslateTrafficSplitPluginBadCases(t *testing.T) {
backends[0].ServiceName = "svc-1"
backends[1].ServicePort.StrVal = "port-not-found"
ctx = &TranslateContext{upstreamMap: make(map[string]struct{})}
- cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends)
+ cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends)
assert.Nil(t, cfg)
assert.Equal(t, err.Error(), "service.spec.ports: port not defined")
backends[1].ServicePort.StrVal = "port2"
backends[1].ResolveGranularity = "service"
ctx = &TranslateContext{upstreamMap: make(map[string]struct{})}
- cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends)
+ cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends)
assert.Nil(t, cfg)
assert.Equal(t, err.Error(), "conflict headless service and backend resolve granularity")
}
diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go
index e642d43..c1f6ac9 100644
--- a/pkg/kube/translation/translator.go
+++ b/pkg/kube/translation/translator.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/kube"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
+ configv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta1"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
@@ -73,6 +74,12 @@ type Translator interface {
// TranslateRouteV2alpha1NotStrictly translates the configv2alpha1.ApisixRoute object into several Route
// and Upstream resources not strictly, only used for delete event.
TranslateRouteV2alpha1NotStrictly(*configv2alpha1.ApisixRoute) (*TranslateContext, error)
+ // TranslateRouteV2beta1 translates the configv2beta1.ApisixRoute object into several Route
+ // and Upstream resources.
+ TranslateRouteV2beta1(*configv2beta1.ApisixRoute) (*TranslateContext, error)
+ // TranslateRouteV2beta1NotStrictly translates the configv2beta1.ApisixRoute object into several Route
+ // and Upstream resources not strictly, only used for delete event.
+ TranslateRouteV2beta1NotStrictly(*configv2beta1.ApisixRoute) (*TranslateContext, error)
// TranslateSSL translates the configv2alpha1.ApisixTls object into the APISIX SSL resource.
TranslateSSL(*configv1.ApisixTls) (*apisixv1.Ssl, error)
// TranslateClusterConfig translates the configv2alpha1.ApisixClusterConfig object into the APISIX
diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go
index c57d809..88ba6f9 100644
--- a/pkg/kube/translation/util.go
+++ b/pkg/kube/translation/util.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/id"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
+ configv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta1"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
@@ -32,12 +33,19 @@ var (
_errInvalidAddress = errors.New("address is neither IP or CIDR")
)
-func (t *translator) getServiceClusterIPAndPort(backend *configv2alpha1.ApisixRouteHTTPBackend, ar *configv2alpha1.ApisixRoute) (string, int32, error) {
- svc, err := t.ServiceLister.Services(ar.Namespace).Get(backend.ServiceName)
+func (t *translator) getServiceClusterIPAndPort(backend *configv2alpha1.ApisixRouteHTTPBackend, ns string) (string, int32, error) {
+ svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName)
if err != nil {
return "", 0, err
}
svcPort := int32(-1)
+ if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" {
+ log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity",
+ zap.Any("namespace", ns),
+ zap.Any("service", svc),
+ )
+ return "", 0, errors.New("conflict headless service and backend resolve granularity")
+ }
loop:
for _, port := range svc.Spec.Ports {
switch backend.ServicePort.Type {
@@ -55,19 +63,12 @@ loop:
}
if svcPort == -1 {
log.Errorw("ApisixRoute refers to non-existent Service port",
- zap.Any("ApisixRoute", ar),
+ zap.String("namespace", ns),
zap.String("port", backend.ServicePort.String()),
)
return "", 0, err
}
- if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" {
- log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity",
- zap.Any("ApisixRoute", ar),
- zap.Any("service", svc),
- )
- return "", 0, errors.New("conflict headless service and backend resolve granularity")
- }
return svc.Spec.ClusterIP, svcPort, nil
}
@@ -77,6 +78,13 @@ func (t *translator) getTCPServiceClusterIPAndPort(backend *configv2alpha1.Apisi
return "", 0, err
}
svcPort := int32(-1)
+ if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" {
+ log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity",
+ zap.Any("ApisixRoute", ar),
+ zap.Any("service", svc),
+ )
+ return "", 0, errors.New("conflict headless service and backend resolve granularity")
+ }
loop:
for _, port := range svc.Spec.Ports {
switch backend.ServicePort.Type {
@@ -100,13 +108,46 @@ loop:
return "", 0, err
}
+ return svc.Spec.ClusterIP, svcPort, nil
+}
+
+// getStreamServiceClusterIPAndPort is for v2beta1 streamRoute
+func (t *translator) getStreamServiceClusterIPAndPort(backend configv2beta1.ApisixRouteStreamBackend, ns string) (string, int32, error) {
+ svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName)
+ if err != nil {
+ return "", 0, err
+ }
+ svcPort := int32(-1)
if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" {
log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity",
- zap.Any("ApisixRoute", ar),
+ zap.String("ApisixRoute namespace", ns),
zap.Any("service", svc),
)
return "", 0, errors.New("conflict headless service and backend resolve granularity")
}
+loop:
+ for _, port := range svc.Spec.Ports {
+ switch backend.ServicePort.Type {
+ case intstr.Int:
+ if backend.ServicePort.IntVal == port.Port {
+ svcPort = port.Port
+ break loop
+ }
+ case intstr.String:
+ if backend.ServicePort.StrVal == port.Name {
+ svcPort = port.Port
+ break loop
+ }
+ }
+ }
+ if svcPort == -1 {
+ log.Errorw("ApisixRoute refers to non-existent Service port",
+ zap.String("ApisixRoute namespace", ns),
+ zap.String("port", backend.ServicePort.String()),
+ )
+ return "", 0, err
+ }
+
return svc.Spec.ClusterIP, svcPort, nil
}
diff --git a/samples/deploy/crd/v1beta1/ApisixRoute.yaml b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
index 3b8a377..bb63b39 100644
--- a/samples/deploy/crd/v1beta1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
@@ -47,10 +47,10 @@ spec:
deprecated: true
- name: v2alpha1
served: true
- storage: true
+ storage: false
- name: v2beta1
served: true
- storage: false
+ storage: true
scope: Namespaced
names:
plural: apisixroutes
diff --git a/test/e2e/ingress/stream.go b/test/e2e/ingress/stream.go
new file mode 100644
index 0000000..d57cbc4
--- /dev/null
+++ b/test/e2e/ingress/stream.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ingress
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/onsi/ginkgo"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("ApisixRoute stream Testing with v2beta1", func() {
+ opts := &scaffold.Options{
+ Name: "default",
+ Kubeconfig: scaffold.GetKubeconfig(),
+ APISIXConfigPath: "testdata/apisix-gw-config.yaml",
+ IngressAPISIXReplicas: 1,
+ HTTPBinServicePort: 80,
+ APISIXRouteVersion: "apisix.apache.org/v2beta1",
+ }
+ s := scaffold.NewScaffold(opts)
+ ginkgo.It("stream tcp proxy", func() {
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta1
+kind: ApisixRoute
+metadata:
+ name: httpbin-tcp-route
+spec:
+ stream:
+ - name: rule1
+ protocol: TCP
+ match:
+ ingressPort: 9100
+ backend:
+ serviceName: %s
+ servicePort: %d
+`, backendSvc, backendSvcPort[0])
+
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute))
+ time.Sleep(9 * time.Second)
+
+ err := s.EnsureNumApisixStreamRoutesCreated(1)
+ assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+
+ sr, err := s.ListApisixStreamRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), sr, 1)
+ assert.Equal(ginkgo.GinkgoT(), sr[0].ServerPort, int32(9100))
+
+ resp := s.NewAPISIXClientWithTCPProxy().GET("/ip").Expect()
+ resp.Body().Contains("origin")
+
+ resp = s.NewAPISIXClientWithTCPProxy().GET("/get").WithHeader("x-my-header", "x-my-value").Expect()
+ resp.Body().Contains("x-my-value")
+ })
+ ginkgo.It("stream udp proxy", func() {
+ apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta1
+kind: ApisixRoute
+metadata:
+ name: httpbin-udp-route
+spec:
+ stream:
+ - name: rule1
+ protocol: UDP
+ match:
+ ingressPort: 9200
+ backend:
+ serviceName: kube-dns
+ servicePort: 53
+`)
+ // update namespace only for this case
+ s.UpdateNamespace("kube-system")
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute))
+ time.Sleep(9 * time.Second)
+
+ err := s.EnsureNumApisixStreamRoutesCreated(1)
+ assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+
+ sr, err := s.ListApisixStreamRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), sr, 1)
+ assert.Equal(ginkgo.GinkgoT(), sr[0].ServerPort, int32(9200))
+ // test dns query
+ r := s.DNSResolver()
+ host := "httpbin.org"
+ _, err = r.LookupIPAddr(context.Background(), host)
+ assert.Nil(ginkgo.GinkgoT(), err, "dns query error")
+ })
+})
diff --git a/test/e2e/scaffold/apisix.go b/test/e2e/scaffold/apisix.go
index 6c5f519..53b01bb 100644
--- a/test/e2e/scaffold/apisix.go
+++ b/test/e2e/scaffold/apisix.go
@@ -119,6 +119,10 @@ spec:
port: 9100
protocol: TCP
targetPort: 9100
+ - name: udp
+ port: 9200
+ protocol: UDP
+ targetPort: 9200
- name: http-control
port: 9090
protocol: TCP
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 2784884..578907a 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -253,7 +253,7 @@ spec:
- --default-apisix-cluster-admin-key
- edd1c9f034335f136f87ad84b625c8f1
- --app-namespace
- - %s
+ - %s,kube-system
- --apisix-route-version
- %s
serviceAccount: ingress-apisix-e2e-test-service-account
diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go
index 4517bda..941ec41 100644
--- a/test/e2e/scaffold/k8s.go
+++ b/test/e2e/scaffold/k8s.go
@@ -359,11 +359,13 @@ func (s *Scaffold) newAPISIXTunnels() error {
httpNodePort int
httpsNodePort int
tcpNodePort int
+ udpNodePort int
controlNodePort int
adminPort int
httpPort int
httpsPort int
tcpPort int
+ udpPort int
controlPort int
)
for _, port := range s.apisixService.Spec.Ports {
@@ -379,6 +381,9 @@ func (s *Scaffold) newAPISIXTunnels() error {
} else if port.Name == "tcp" {
tcpNodePort = int(port.NodePort)
tcpPort = int(port.Port)
+ } else if port.Name == "udp" {
+ udpNodePort = int(port.NodePort)
+ udpPort = int(port.Port)
} else if port.Name == "http-control" {
controlNodePort = int(port.NodePort)
controlPort = int(port.Port)
@@ -393,6 +398,8 @@ func (s *Scaffold) newAPISIXTunnels() error {
httpsNodePort, httpsPort)
s.apisixTCPTunnel = k8s.NewTunnel(s.kubectlOptions, k8s.ResourceTypeService, "apisix-service-e2e-test",
tcpNodePort, tcpPort)
+ s.apisixUDPTunnel = k8s.NewTunnel(s.kubectlOptions, k8s.ResourceTypeService, "apisix-service-e2e-test",
+ udpNodePort, udpPort)
s.apisixControlTunnel = k8s.NewTunnel(s.kubectlOptions, k8s.ResourceTypeService, "apisix-service-e2e-test",
controlNodePort, controlPort)
@@ -412,6 +419,10 @@ func (s *Scaffold) newAPISIXTunnels() error {
return err
}
s.addFinalizers(s.apisixTCPTunnel.Close)
+ if err := s.apisixUDPTunnel.ForwardPortE(s.t); err != nil {
+ return err
+ }
+ s.addFinalizers(s.apisixUDPTunnel.Close)
if err := s.apisixControlTunnel.ForwardPortE(s.t); err != nil {
return err
}
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index a1c8dd2..7e85ec8 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -20,6 +20,7 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
+ "net"
"net/http"
"net/url"
"os"
@@ -67,6 +68,7 @@ type Scaffold struct {
apisixHttpTunnel *k8s.Tunnel
apisixHttpsTunnel *k8s.Tunnel
apisixTCPTunnel *k8s.Tunnel
+ apisixUDPTunnel *k8s.Tunnel
apisixControlTunnel *k8s.Tunnel
// Used for template rendering.
@@ -205,6 +207,22 @@ func (s *Scaffold) NewAPISIXClientWithTCPProxy() *httpexpect.Expect {
})
}
+func (s *Scaffold) DNSResolver() *net.Resolver {
+ return &net.Resolver{
+ PreferGo: false,
+ Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
+ d := net.Dialer{
+ Timeout: time.Millisecond * time.Duration(10000),
+ }
+ return d.DialContext(ctx, "udp", s.apisixUDPTunnel.Endpoint())
+ },
+ }
+}
+
+func (s *Scaffold) UpdateNamespace(ns string) {
+ s.kubectlOptions.Namespace = ns
+}
+
// NewAPISIXHttpsClient creates the default HTTPS client.
func (s *Scaffold) NewAPISIXHttpsClient(host string) *httpexpect.Expect {
u := url.URL{
diff --git a/test/e2e/testdata/apisix-gw-config.yaml b/test/e2e/testdata/apisix-gw-config.yaml
index 4b75474..3598f8e 100644
--- a/test/e2e/testdata/apisix-gw-config.yaml
+++ b/test/e2e/testdata/apisix-gw-config.yaml
@@ -29,6 +29,8 @@ apisix:
stream_proxy: # TCP/UDP proxy
tcp: # TCP proxy port list
- 9100
+ udp:
+ - 9200
etcd:
host: # it's possible to define multiple etcd hosts addresses of the same etcd cluster.
- "http://{{ .EtcdServiceFQDN }}:2379" # multiple etcd address