You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by zh...@apache.org on 2022/06/16 01:24:59 UTC
[apisix-ingress-controller] branch master updated: feat: support GatewayClass, refactor gateway modules (#1079)
This is an automated email from the ASF dual-hosted git repository.
zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new c48a62ab feat: support GatewayClass, refactor gateway modules (#1079)
c48a62ab is described below
commit c48a62abfbd0b546c1d89bb7931caf1ed11abbb3
Author: Sarasa Kisaragi <li...@gmail.com>
AuthorDate: Thu Jun 16 09:24:54 2022 +0800
feat: support GatewayClass, refactor gateway modules (#1079)
---
pkg/ingress/apisix_pluginconfig.go | 17 +-
pkg/ingress/apisix_route.go | 29 +--
pkg/ingress/compare.go | 25 +--
pkg/ingress/controller.go | 229 +++++++++------------
pkg/ingress/{ => gateway}/gateway.go | 81 +++++++-
pkg/ingress/gateway/gateway_class.go | 229 +++++++++++++++++++++
pkg/ingress/{ => gateway}/gateway_httproute.go | 37 ++--
pkg/ingress/gateway/provider.go | 179 ++++++++++++++++
.../gateway}/translation/gateway_httproute.go | 18 +-
.../gateway}/translation/gateway_httproute_test.go | 11 +-
pkg/ingress/gateway/translation/translator.go | 44 ++++
pkg/ingress/ingress.go | 29 +--
pkg/ingress/{ => namespace}/namespace.go | 36 ++--
pkg/ingress/namespace/provider.go | 171 +++++++++++++++
pkg/ingress/namespace/provider_mock.go | 55 +++++
pkg/ingress/pod_test.go | 26 +--
pkg/ingress/status.go | 148 +------------
pkg/ingress/utils/executor.go | 56 +++++
pkg/ingress/utils/ingress_status.go | 112 ++++++++++
pkg/ingress/{ => utils}/manifest.go | 136 ++++++------
pkg/ingress/{ => utils}/manifest_test.go | 70 +++----
pkg/ingress/utils/string.go | 25 +++
pkg/kube/translation/apisix_pluginconfig.go | 16 +-
pkg/kube/translation/apisix_route.go | 84 ++++----
pkg/kube/translation/context.go | 14 +-
pkg/kube/translation/context_test.go | 22 +-
pkg/kube/translation/ingress.go | 28 +--
pkg/kube/translation/plugin.go | 2 +-
pkg/kube/translation/translator.go | 3 -
pkg/kube/translation/util.go | 7 -
30 files changed, 1331 insertions(+), 608 deletions(-)
diff --git a/pkg/ingress/apisix_pluginconfig.go b/pkg/ingress/apisix_pluginconfig.go
index a1e4db2c..3faf885b 100644
--- a/pkg/ingress/apisix_pluginconfig.go
+++ b/pkg/ingress/apisix_pluginconfig.go
@@ -27,6 +27,7 @@ import (
"k8s.io/client-go/util/workqueue"
"github.com/apache/apisix-ingress-controller/pkg/config"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/kube/translation"
"github.com/apache/apisix-ingress-controller/pkg/log"
@@ -167,14 +168,14 @@ func (c *apisixPluginConfigController) sync(ctx context.Context, ev *types.Event
zap.Any("pluginConfigs", tctx.PluginConfigs),
)
- m := &manifest{
- pluginConfigs: tctx.PluginConfigs,
+ m := &utils.Manifest{
+ PluginConfigs: tctx.PluginConfigs,
}
var (
- added *manifest
- updated *manifest
- deleted *manifest
+ added *utils.Manifest
+ updated *utils.Manifest
+ deleted *utils.Manifest
)
if ev.Type == types.EventDelete {
@@ -199,10 +200,10 @@ func (c *apisixPluginConfigController) sync(ctx context.Context, ev *types.Event
return err
}
- om := &manifest{
- pluginConfigs: oldCtx.PluginConfigs,
+ om := &utils.Manifest{
+ PluginConfigs: oldCtx.PluginConfigs,
}
- added, updated, deleted = m.diff(om)
+ added, updated, deleted = m.Diff(om)
}
return c.controller.syncManifests(ctx, added, updated, deleted)
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index aece9687..c2ecee33 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -26,6 +26,7 @@ import (
"k8s.io/client-go/util/workqueue"
apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
"github.com/apache/apisix-ingress-controller/pkg/kube"
v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
@@ -189,17 +190,17 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
zap.Any("pluginConfigs", tctx.PluginConfigs),
)
- m := &manifest{
- routes: tctx.Routes,
- upstreams: tctx.Upstreams,
- streamRoutes: tctx.StreamRoutes,
- pluginConfigs: tctx.PluginConfigs,
+ m := &utils.Manifest{
+ Routes: tctx.Routes,
+ Upstreams: tctx.Upstreams,
+ StreamRoutes: tctx.StreamRoutes,
+ PluginConfigs: tctx.PluginConfigs,
}
var (
- added *manifest
- updated *manifest
- deleted *manifest
+ added *utils.Manifest
+ updated *utils.Manifest
+ deleted *utils.Manifest
)
if ev.Type == types.EventDelete {
@@ -226,13 +227,13 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
return err
}
- om := &manifest{
- routes: oldCtx.Routes,
- upstreams: oldCtx.Upstreams,
- streamRoutes: oldCtx.StreamRoutes,
- pluginConfigs: oldCtx.PluginConfigs,
+ om := &utils.Manifest{
+ Routes: oldCtx.Routes,
+ Upstreams: oldCtx.Upstreams,
+ StreamRoutes: oldCtx.StreamRoutes,
+ PluginConfigs: oldCtx.PluginConfigs,
}
- added, updated, deleted = m.diff(om)
+ added, updated, deleted = m.Diff(om)
}
return c.controller.syncManifests(ctx, added, updated, deleted)
diff --git a/pkg/ingress/compare.go b/pkg/ingress/compare.go
index 6ab081e4..3818d2e1 100644
--- a/pkg/ingress/compare.go
+++ b/pkg/ingress/compare.go
@@ -20,7 +20,6 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "github.com/apache/apisix-ingress-controller/pkg/api/validation"
"github.com/apache/apisix-ingress-controller/pkg/log"
)
@@ -46,24 +45,9 @@ func (c *Controller) CompareResources(ctx context.Context) error {
consumerMapA6 = make(map[string]string)
pluginConfigMapA6 = make(map[string]string)
)
- // watchingNamespaces and watchingLabels are empty means to monitor all namespaces.
- if !validation.HasValueInSyncMap(c.watchingNamespaces) && len(c.watchingLabels) == 0 {
- opts := v1.ListOptions{}
- // list all namespaces
- nsList, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
- if err != nil {
- log.Error(err.Error())
- ctx.Done()
- } else {
- wns := new(sync.Map)
- for _, v := range nsList.Items {
- wns.Store(v.Name, struct{}{})
- }
- c.watchingNamespaces = wns
- }
- }
- c.watchingNamespaces.Range(func(key, value interface{}) bool {
+ namespaces := c.namespaceProvider.WatchingNamespaces()
+ for _, key := range namespaces {
log.Debugf("start to watch namespace: %s", key)
wg.Add(1)
go func(ns string) {
@@ -139,9 +123,8 @@ func (c *Controller) CompareResources(ctx context.Context) error {
}
}
}
- }(key.(string))
- return true
- })
+ }(key)
+ }
wg.Wait()
// 2.get all cache routes
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index f9c834ce..80c36e34 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -18,7 +18,6 @@ import (
"context"
"fmt"
"os"
- "strings"
"sync"
"time"
@@ -35,13 +34,14 @@ import (
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
- gatewaylistersv1alpha2 "sigs.k8s.io/gateway-api/pkg/client/listers/gateway/apis/v1alpha2"
"github.com/apache/apisix-ingress-controller/pkg/api"
- "github.com/apache/apisix-ingress-controller/pkg/api/validation"
"github.com/apache/apisix-ingress-controller/pkg/apisix"
apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/config"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/gateway"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/namespace"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
"github.com/apache/apisix-ingress-controller/pkg/kube"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
apisixscheme "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme"
@@ -68,18 +68,15 @@ const (
// Controller is the ingress apisix controller object.
type Controller struct {
- name string
- namespace string
- cfg *config.Config
- wg sync.WaitGroup
- watchingNamespaces *sync.Map
- watchingLabels types.Labels
- apisix apisix.APISIX
- podCache types.PodCache
- translator translation.Translator
- apiServer *api.Server
- MetricsCollector metrics.Collector
- kubeClient *kube.KubeClient
+ name string
+ namespace string
+ cfg *config.Config
+ apisix apisix.APISIX
+ podCache types.PodCache
+ translator translation.Translator
+ apiServer *api.Server
+ MetricsCollector metrics.Collector
+ kubeClient *kube.KubeClient
// recorder event
recorder record.EventRecorder
// this map enrolls which ApisixTls objects refer to a Kubernetes
@@ -93,8 +90,6 @@ type Controller struct {
leaderContextCancelFunc context.CancelFunc
// common informers and listers
- namespaceInformer cache.SharedIndexInformer
- namespaceLister listerscorev1.NamespaceLister
podInformer cache.SharedIndexInformer
podLister listerscorev1.PodLister
epInformer cache.SharedIndexInformer
@@ -117,20 +112,16 @@ type Controller struct {
apisixConsumerLister kube.ApisixConsumerLister
apisixPluginConfigInformer cache.SharedIndexInformer
apisixPluginConfigLister kube.ApisixPluginConfigLister
- gatewayInformer cache.SharedIndexInformer
- gatewayLister gatewaylistersv1alpha2.GatewayLister
- gatewayHttpRouteInformer cache.SharedIndexInformer
- gatewayHttpRouteLister gatewaylistersv1alpha2.HTTPRouteLister
// resource controllers
- namespaceController *namespaceController
- podController *podController
- endpointsController *endpointsController
- endpointSliceController *endpointSliceController
- ingressController *ingressController
- secretController *secretController
- gatewayController *gatewayController
- gatewayHTTPRouteController *gatewayHTTPRouteController
+ podController *podController
+ endpointsController *endpointsController
+ endpointSliceController *endpointSliceController
+ ingressController *ingressController
+ secretController *secretController
+
+ namespaceProvider namespace.WatchingProvider
+ gatewayProvider *gateway.Provider
apisixUpstreamController *apisixUpstreamController
apisixRouteController *apisixRouteController
@@ -162,39 +153,21 @@ func NewController(cfg *config.Config) (*Controller, error) {
return nil, err
}
- var (
- watchingNamespace = new(sync.Map)
- watchingLabels = make(map[string]string)
- )
- if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
- for _, ns := range cfg.Kubernetes.AppNamespaces {
- watchingNamespace.Store(ns, struct{}{})
- }
- }
-
- // support namespace label-selector
- for _, labels := range cfg.Kubernetes.NamespaceSelector {
- labelSlice := strings.Split(labels, "=")
- watchingLabels[labelSlice[0]] = labelSlice[1]
- }
-
// recorder
utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme))
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.Client.CoreV1().Events("")})
c := &Controller{
- name: podName,
- namespace: podNamespace,
- cfg: cfg,
- apiServer: apiSrv,
- apisix: client,
- MetricsCollector: metrics.NewPrometheusCollector(),
- kubeClient: kubeClient,
- watchingNamespaces: watchingNamespace,
- watchingLabels: watchingLabels,
- secretSSLMap: new(sync.Map),
- recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
+ name: podName,
+ namespace: podNamespace,
+ cfg: cfg,
+ apiServer: apiSrv,
+ apisix: client,
+ MetricsCollector: metrics.NewPrometheusCollector(),
+ kubeClient: kubeClient,
+ secretSSLMap: new(sync.Map),
+ recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
podCache: types.NewPodCache(),
}
@@ -213,9 +186,7 @@ func (c *Controller) initWhenStartLeading() {
kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()
- gatewayFactory := c.kubeClient.NewGatewaySharedIndexInformerFactory()
- c.namespaceLister = kubeFactory.Core().V1().Namespaces().Lister()
c.podLister = kubeFactory.Core().V1().Pods().Lister()
c.epLister, c.epInformer = kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices)
c.svcLister = kubeFactory.Core().V1().Services().Lister()
@@ -266,12 +237,6 @@ func (c *Controller) initWhenStartLeading() {
ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer()
}
- c.gatewayLister = gatewayFactory.Gateway().V1alpha2().Gateways().Lister()
- c.gatewayInformer = gatewayFactory.Gateway().V1alpha2().Gateways().Informer()
-
- c.gatewayHttpRouteLister = gatewayFactory.Gateway().V1alpha2().HTTPRoutes().Lister()
- c.gatewayHttpRouteInformer = gatewayFactory.Gateway().V1alpha2().HTTPRoutes().Informer()
-
switch c.cfg.Kubernetes.ApisixRouteVersion {
case config.ApisixRouteV2beta2:
apisixRouteInformer = apisixFactory.Apisix().V2beta2().ApisixRoutes().Informer()
@@ -319,7 +284,6 @@ func (c *Controller) initWhenStartLeading() {
panic(fmt.Errorf("unsupported ApisixPluginConfig version %v", c.cfg.Kubernetes.ApisixPluginConfigVersion))
}
- c.namespaceInformer = kubeFactory.Core().V1().Namespaces().Informer()
c.podInformer = kubeFactory.Core().V1().Pods().Informer()
c.svcInformer = kubeFactory.Core().V1().Services().Informer()
c.ingressInformer = ingressInformer
@@ -336,7 +300,6 @@ func (c *Controller) initWhenStartLeading() {
} else {
c.endpointsController = c.newEndpointsController()
}
- c.namespaceController = c.newNamespaceController()
c.podController = c.newPodController()
c.apisixUpstreamController = c.newApisixUpstreamController()
c.ingressController = c.newIngressController()
@@ -346,8 +309,10 @@ func (c *Controller) initWhenStartLeading() {
c.secretController = c.newSecretController()
c.apisixConsumerController = c.newApisixConsumerController()
c.apisixPluginConfigController = c.newApisixPluginConfigController()
- c.gatewayController = c.newGatewayController()
- c.gatewayHTTPRouteController = c.newGatewayHTTPRouteController()
+}
+
+func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted *utils.Manifest) error {
+ return utils.SyncManifests(ctx, c.apisix, c.cfg.APISIX.DefaultClusterName, added, updated, deleted)
}
// recorderEvent recorder events for resources
@@ -366,14 +331,6 @@ func (c *Controller) recorderEventS(object runtime.Object, eventtype, reason str
c.recorder.Event(object, eventtype, reason, msg)
}
-func (c *Controller) goAttach(handler func()) {
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
- handler()
- }()
-}
-
// Eventf implements the resourcelock.EventRecorder interface.
func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string, message string, _ ...interface{}) {
log.Infow(reason, zap.String("message", message), zap.String("event_type", eventType))
@@ -501,63 +458,75 @@ func (c *Controller) run(ctx context.Context) {
c.initWhenStartLeading()
- // list namespaces and init watchingNamespaces
- if err := c.initWatchingNamespacesByLabels(ctx); err != nil {
+ c.namespaceProvider, err = namespace.NewWatchingProvider(ctx, c.kubeClient, c.cfg)
+ if err != nil {
+ ctx.Done()
+ return
+ }
+
+ c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{
+ Cfg: c.cfg,
+ APISIX: c.apisix,
+ APISIXClusterName: c.cfg.APISIX.DefaultClusterName,
+ KubeTranslator: c.translator,
+ RestConfig: nil,
+ KubeClient: c.kubeClient.Client,
+ MetricsCollector: c.MetricsCollector,
+ NamespaceProvider: c.namespaceProvider,
+ })
+ if err != nil {
ctx.Done()
return
}
+
// compare resources of k8s with objects of APISIX
if err = c.CompareResources(ctx); err != nil {
ctx.Done()
return
}
- c.goAttach(func() {
+ e := utils.ParallelExecutor{}
+
+ e.Add(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
- c.goAttach(func() {
- c.namespaceInformer.Run(ctx.Done())
- })
- c.goAttach(func() {
+ e.Add(func() {
c.podInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.epInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.svcInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.ingressInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixRouteInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixUpstreamInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixClusterConfigInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.secretInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixTlsInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixConsumerInformer.Run(ctx.Done())
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixPluginConfigInformer.Run(ctx.Done())
})
- c.goAttach(func() {
- c.namespaceController.run(ctx)
- })
- c.goAttach(func() {
+ e.Add(func() {
c.podController.run(ctx)
})
- c.goAttach(func() {
+ e.Add(func() {
if c.cfg.Kubernetes.WatchEndpointSlices {
c.endpointSliceController.run(ctx)
} else {
@@ -565,46 +534,38 @@ func (c *Controller) run(ctx context.Context) {
}
})
- if c.cfg.Kubernetes.EnableGatewayAPI {
- c.goAttach(func() {
- c.gatewayInformer.Run(ctx.Done())
- })
-
- c.goAttach(func() {
- c.gatewayHttpRouteInformer.Run(ctx.Done())
- })
-
- c.goAttach(func() {
- c.gatewayController.run(ctx)
- })
+ e.Add(func() {
+ c.namespaceProvider.Run(ctx)
+ })
- c.goAttach(func() {
- c.gatewayHTTPRouteController.run(ctx)
+ if c.cfg.Kubernetes.EnableGatewayAPI {
+ e.Add(func() {
+ c.gatewayProvider.Run(ctx)
})
}
- c.goAttach(func() {
+ e.Add(func() {
c.apisixUpstreamController.run(ctx)
})
- c.goAttach(func() {
+ e.Add(func() {
c.ingressController.run(ctx)
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixRouteController.run(ctx)
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixClusterConfigController.run(ctx)
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixTlsController.run(ctx)
})
- c.goAttach(func() {
+ e.Add(func() {
c.secretController.run(ctx)
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixConsumerController.run(ctx)
})
- c.goAttach(func() {
+ e.Add(func() {
c.apisixPluginConfigController.run(ctx)
})
@@ -616,25 +577,21 @@ func (c *Controller) run(ctx context.Context) {
)
<-ctx.Done()
- c.wg.Wait()
+ e.Wait()
+
+ for _, execErr := range e.Errors() {
+ log.Error(execErr.Error())
+ }
+ if len(e.Errors()) > 0 {
+ log.Error("Start failed, abort...")
+ cancelFunc()
+ }
}
// isWatchingNamespace accepts a resource key, getting the namespace part
// and checking whether the namespace is being watched.
func (c *Controller) isWatchingNamespace(key string) (ok bool) {
- if !validation.HasValueInSyncMap(c.watchingNamespaces) {
- ok = true
- return
- }
- ns, _, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- // Ignore resource with invalid key.
- ok = false
- log.Warnf("resource %s was ignored since: %s", key, err)
- return
- }
- _, ok = c.watchingNamespaces.Load(ns)
- return
+ return c.namespaceProvider.IsWatchingNamespace(key)
}
func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types.EventType) error {
@@ -739,8 +696,8 @@ func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, clust
zap.String("cluster", cluster.String()),
)
- updated := &manifest{
- upstreams: []*apisixv1.Upstream{upstream},
+ updated := &utils.Manifest{
+ Upstreams: []*apisixv1.Upstream{upstream},
}
return c.syncManifests(ctx, nil, updated, nil)
}
diff --git a/pkg/ingress/gateway.go b/pkg/ingress/gateway/gateway.go
similarity index 64%
rename from pkg/ingress/gateway.go
rename to pkg/ingress/gateway/gateway.go
index 3e4ef6a5..58bea6bb 100644
--- a/pkg/ingress/gateway.go
+++ b/pkg/ingress/gateway/gateway.go
@@ -12,30 +12,33 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package ingress
+package gateway
import (
"context"
"time"
"go.uber.org/zap"
+ apiv1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
type gatewayController struct {
- controller *Controller
+ controller *Provider
workqueue workqueue.RateLimitingInterface
workers int
}
-func (c *Controller) newGatewayController() *gatewayController {
+func newGatewayController(c *Provider) *gatewayController {
ctl := &gatewayController{
controller: c,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Gateway"),
@@ -108,6 +111,10 @@ func (c *gatewayController) sync(ctx context.Context, ev *types.Event) error {
return nil
}
gateway = ev.Tombstone.(*gatewayv1alpha2.Gateway)
+ //} else {
+ //if c.controller.HasGatewayClass(string(gateway.Spec.GatewayClassName)) {
+ // // TODO: Translate listeners
+ //}
}
// TODO The current implementation does not fully support the definition of Gateway.
@@ -115,7 +122,7 @@ func (c *gatewayController) sync(ctx context.Context, ev *types.Event) error {
// At present, we choose to directly update `GatewayStatus.Addresses`
// to indicate that we have picked the Gateway resource.
- c.controller.recordStatus(gateway, string(gatewayv1alpha2.ListenerReasonReady), nil, metav1.ConditionTrue, gateway.Generation)
+ c.recordStatus(gateway, string(gatewayv1alpha2.ListenerReasonReady), metav1.ConditionTrue, gateway.Generation)
return nil
}
@@ -148,7 +155,7 @@ func (c *gatewayController) onAdd(obj interface{}) {
log.Errorf("found gateway resource with bad meta namespace key: %s", err)
return
}
- if !c.controller.isWatchingNamespace(key) {
+ if !c.controller.NamespaceProvider.IsWatchingNamespace(key) {
return
}
log.Debugw("gateway add event arrived",
@@ -162,3 +169,67 @@ func (c *gatewayController) onAdd(obj interface{}) {
}
func (c *gatewayController) onUpdate(oldObj, newObj interface{}) {}
func (c *gatewayController) OnDelete(obj interface{}) {}
+
+// recordStatus record resources status
+func (c *gatewayController) recordStatus(v *gatewayv1alpha2.Gateway, reason string, status metav1.ConditionStatus, generation int64) {
+ v = v.DeepCopy()
+
+ gatewayCondition := metav1.Condition{
+ Type: string(gatewayv1alpha2.ListenerConditionReady),
+ Reason: reason,
+ Status: status,
+ Message: "Gateway's status has been successfully updated",
+ ObservedGeneration: generation,
+ }
+
+ if v.Status.Conditions == nil {
+ conditions := make([]metav1.Condition, 0)
+ v.Status.Conditions = conditions
+ } else {
+ meta.SetStatusCondition(&v.Status.Conditions, gatewayCondition)
+ }
+
+ lbips, err := utils.IngressLBStatusIPs(c.controller.Cfg.IngressPublishService, c.controller.Cfg.IngressStatusAddress, c.controller.KubeClient)
+ if err != nil {
+ log.Errorw("failed to get APISIX gateway external IPs",
+ zap.Error(err),
+ )
+ }
+
+ v.Status.Addresses = convLBIPToGatewayAddr(lbips)
+ if _, errRecord := c.controller.gatewayClient.GatewayV1alpha2().Gateways(v.Namespace).UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
+ log.Errorw("failed to record status change for Gateway resource",
+ zap.Error(errRecord),
+ zap.String("name", v.Name),
+ zap.String("namespace", v.Namespace),
+ )
+ }
+}
+
+// convLBIPToGatewayAddr convert LoadBalancerIngress to GatewayAddress format
+func convLBIPToGatewayAddr(lbips []apiv1.LoadBalancerIngress) []gatewayv1alpha2.GatewayAddress {
+ var gas []gatewayv1alpha2.GatewayAddress
+
+ // In the definition, there is also an address type called NamedAddress,
+ // which we currently do not implement
+ HostnameAddressType := gatewayv1alpha2.HostnameAddressType
+ IPAddressType := gatewayv1alpha2.IPAddressType
+
+ for _, lbip := range lbips {
+ if v := lbip.Hostname; v != "" {
+ gas = append(gas, gatewayv1alpha2.GatewayAddress{
+ Type: &HostnameAddressType,
+ Value: v,
+ })
+ }
+
+ if v := lbip.IP; v != "" {
+ gas = append(gas, gatewayv1alpha2.GatewayAddress{
+ Type: &IPAddressType,
+ Value: v,
+ })
+ }
+ }
+
+ return gas
+}
diff --git a/pkg/ingress/gateway/gateway_class.go b/pkg/ingress/gateway/gateway_class.go
new file mode 100644
index 00000000..00d2a4fb
--- /dev/null
+++ b/pkg/ingress/gateway/gateway_class.go
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package gateway
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "go.uber.org/zap"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+ "sigs.k8s.io/gateway-api/apis/v1alpha2"
+
+ "github.com/apache/apisix-ingress-controller/pkg/log"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
+)
+
+const (
+ GatewayClassName = "apisix-ingress-controller"
+)
+
+type gatewayClassController struct {
+ controller *Provider
+ workqueue workqueue.RateLimitingInterface
+ workers int
+}
+
+func newGatewayClassController(c *Provider) (*gatewayClassController, error) {
+ ctrl := &gatewayClassController{
+ controller: c,
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "GatewayClass"),
+ workers: 1,
+ }
+
+ err := ctrl.init()
+ if err != nil {
+ return nil, err
+ }
+
+ // TODO: change to event channel
+ ctrl.controller.gatewayClassInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: ctrl.onAdd,
+ UpdateFunc: ctrl.onUpdate,
+ DeleteFunc: ctrl.onDelete,
+ })
+ return ctrl, nil
+}
+
+func (c *gatewayClassController) init() error {
+ classes, err := c.controller.gatewayClassLister.List(labels.Everything())
+ if err != nil {
+ return err
+ }
+
+ for _, gatewayClass := range classes {
+ if gatewayClass.Spec.ControllerName == GatewayClassName {
+ err := c.markAsUpdated(gatewayClass)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+func (c *gatewayClassController) markAsUpdated(gatewayClass *v1alpha2.GatewayClass) error {
+ gc := gatewayClass.DeepCopy()
+
+ condition := metav1.Condition{
+ Type: string(v1alpha2.GatewayClassConditionStatusAccepted),
+ Status: metav1.ConditionTrue,
+ Reason: "Updated",
+ Message: fmt.Sprintf("Updated by apisix-ingress-controller, sync at %v", time.Now()),
+ LastTransitionTime: metav1.Now(),
+ }
+
+ var newConditions []metav1.Condition
+ for _, cond := range gc.Status.Conditions {
+ if cond.Type == condition.Type {
+ if cond.Status == condition.Status {
+ // Update message to record last sync time, don't change LastTransitionTime
+ cond.Message = condition.Message
+ } else {
+ newConditions = append(newConditions, condition)
+ }
+ }
+
+ if cond.Type != condition.Type {
+ newConditions = append(newConditions, cond)
+ }
+ }
+
+ gc.Status.Conditions = newConditions
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ _, err := c.controller.gatewayClient.GatewayV1alpha2().GatewayClasses().UpdateStatus(ctx, gc, metav1.UpdateOptions{})
+ if err != nil {
+ log.Errorw("failed to update GatewayClass status",
+ zap.Error(err),
+ zap.String("name", gatewayClass.Name),
+ )
+ return err
+ }
+
+ c.controller.AddGatewayClass(gatewayClass.Name)
+
+ return nil
+}
+
+func (c *gatewayClassController) run(ctx context.Context) {
+ log.Info("gateway HTTPRoute controller started")
+ defer log.Info("gateway HTTPRoute controller exited")
+ defer c.workqueue.ShutDown()
+
+ if !cache.WaitForCacheSync(ctx.Done(), c.controller.gatewayClassInformer.HasSynced) {
+ log.Error("sync Gateway HTTPRoute cache failed")
+ return
+ }
+
+ for i := 0; i < c.workers; i++ {
+ go c.runWorker(ctx)
+ }
+ <-ctx.Done()
+}
+
+func (c *gatewayClassController) runWorker(ctx context.Context) {
+ for {
+ obj, quit := c.workqueue.Get()
+ if quit {
+ return
+ }
+ err := c.sync(ctx, obj.(*types.Event))
+ c.workqueue.Done(obj)
+ c.handleSyncErr(obj, err)
+ }
+}
+
+func (c *gatewayClassController) sync(ctx context.Context, ev *types.Event) error {
+ if ev.Type == types.EventAdd {
+ key := ev.Object.(string)
+ gatewayClass, err := c.controller.gatewayClassLister.Get(key)
+ if err != nil {
+ return err
+ }
+
+ if gatewayClass.Spec.ControllerName == GatewayClassName {
+ return c.markAsUpdated(gatewayClass)
+ }
+ } else if ev.Type == types.EventDelete {
+ key := ev.Object.(string)
+ c.controller.RemoveGatewayClass(key)
+ }
+
+ return nil
+}
+
+func (c *gatewayClassController) handleSyncErr(obj interface{}, err error) {
+ if err == nil {
+ c.workqueue.Forget(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("gateway_class", "success")
+ return
+ }
+ event := obj.(*types.Event)
+ if k8serrors.IsNotFound(err) && event.Type != types.EventDelete {
+ log.Infow("sync gateway HTTPRoute but not found, ignore",
+ zap.String("event_type", event.Type.String()),
+ zap.String("HTTPRoute ", event.Object.(string)),
+ )
+ c.workqueue.Forget(event)
+ return
+ }
+ log.Warnw("sync gateway HTTPRoute failed, will retry",
+ zap.Any("object", obj),
+ zap.Error(err),
+ )
+ c.workqueue.AddRateLimited(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("gateway_class", "failure")
+}
+
+func (c *gatewayClassController) onAdd(obj interface{}) {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorf("found gateway HTTPRoute resource with bad meta namespace key: %s", err)
+ return
+ }
+ if !c.controller.NamespaceProvider.IsWatchingNamespace(key) {
+ return
+ }
+ log.Debugw("gateway HTTPRoute add event arrived",
+ zap.Any("object", obj),
+ )
+
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: key,
+ })
+}
+
+func (c *gatewayClassController) onUpdate(oldObj, newObj interface{}) {
+ // Ignore update event since ControllerName is immutable
+}
+
+func (c *gatewayClassController) onDelete(obj interface{}) {
+ gatewayClass := obj.(*v1alpha2.GatewayClass)
+ c.workqueue.Add(&types.Event{
+ Type: types.EventDelete,
+ Object: gatewayClass.Name,
+ Tombstone: gatewayClass,
+ })
+}
diff --git a/pkg/ingress/gateway_httproute.go b/pkg/ingress/gateway/gateway_httproute.go
similarity index 87%
rename from pkg/ingress/gateway_httproute.go
rename to pkg/ingress/gateway/gateway_httproute.go
index acb9c5e3..a7109240 100644
--- a/pkg/ingress/gateway_httproute.go
+++ b/pkg/ingress/gateway/gateway_httproute.go
@@ -12,7 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package ingress
+package gateway
import (
"context"
@@ -24,25 +24,26 @@ import (
"k8s.io/client-go/util/workqueue"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
"github.com/apache/apisix-ingress-controller/pkg/kube/translation"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
type gatewayHTTPRouteController struct {
- controller *Controller
+ controller *Provider
workqueue workqueue.RateLimitingInterface
workers int
}
-func (c *Controller) newGatewayHTTPRouteController() *gatewayHTTPRouteController {
+func newGatewayHTTPRouteController(c *Provider) *gatewayHTTPRouteController {
ctrl := &gatewayHTTPRouteController{
controller: c,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "GatewayHTTPRoute"),
workers: 1,
}
- ctrl.controller.gatewayHttpRouteInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ ctrl.controller.gatewayHTTPRouteInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.onAdd,
UpdateFunc: ctrl.onUpdate,
DeleteFunc: ctrl.OnDelete,
@@ -55,7 +56,7 @@ func (c *gatewayHTTPRouteController) run(ctx context.Context) {
defer log.Info("gateway HTTPRoute controller exited")
defer c.workqueue.ShutDown()
- if !cache.WaitForCacheSync(ctx.Done(), c.controller.gatewayHttpRouteInformer.HasSynced) {
+ if !cache.WaitForCacheSync(ctx.Done(), c.controller.gatewayHTTPRouteInformer.HasSynced) {
log.Error("sync Gateway HTTPRoute cache failed")
return
}
@@ -91,7 +92,7 @@ func (c *gatewayHTTPRouteController) sync(ctx context.Context, ev *types.Event)
log.Debugw("sync HTTPRoute", zap.String("key", key))
- httpRoute, err := c.controller.gatewayHttpRouteLister.HTTPRoutes(namespace).Get(name)
+ httpRoute, err := c.controller.gatewayHTTPRouteLister.HTTPRoutes(namespace).Get(name)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get Gateway HTTPRoute",
@@ -134,15 +135,15 @@ func (c *gatewayHTTPRouteController) sync(ctx context.Context, ev *types.Event)
zap.Any("routes", tctx.Routes),
zap.Any("upstreams", tctx.Upstreams),
)
- m := &manifest{
- routes: tctx.Routes,
- upstreams: tctx.Upstreams,
+ m := &utils.Manifest{
+ Routes: tctx.Routes,
+ Upstreams: tctx.Upstreams,
}
var (
- added *manifest
- updated *manifest
- deleted *manifest
+ added *utils.Manifest
+ updated *utils.Manifest
+ deleted *utils.Manifest
)
if ev.Type == types.EventDelete {
@@ -163,14 +164,14 @@ func (c *gatewayHTTPRouteController) sync(ctx context.Context, ev *types.Event)
return err
}
- om := &manifest{
- routes: oldCtx.Routes,
- upstreams: oldCtx.Upstreams,
+ om := &utils.Manifest{
+ Routes: oldCtx.Routes,
+ Upstreams: oldCtx.Upstreams,
}
- added, updated, deleted = m.diff(om)
+ added, updated, deleted = m.Diff(om)
}
- return c.controller.syncManifests(ctx, added, updated, deleted)
+ return utils.SyncManifests(ctx, c.controller.APISIX, c.controller.APISIXClusterName, added, updated, deleted)
}
func (c *gatewayHTTPRouteController) handleSyncErr(obj interface{}, err error) {
@@ -202,7 +203,7 @@ func (c *gatewayHTTPRouteController) onAdd(obj interface{}) {
log.Errorf("found gateway HTTPRoute resource with bad meta namespace key: %s", err)
return
}
- if !c.controller.isWatchingNamespace(key) {
+ if !c.controller.NamespaceProvider.IsWatchingNamespace(key) {
return
}
log.Debugw("gateway HTTPRoute add event arrived",
diff --git a/pkg/ingress/gateway/provider.go b/pkg/ingress/gateway/provider.go
new file mode 100644
index 00000000..0e87a82c
--- /dev/null
+++ b/pkg/ingress/gateway/provider.go
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package gateway
+
+import (
+ "context"
+ "sync"
+
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/cache"
+ gatewayclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/gateway/versioned"
+ gatewayexternalversions "sigs.k8s.io/gateway-api/pkg/client/informers/gateway/externalversions"
+ gatewaylistersv1alpha2 "sigs.k8s.io/gateway-api/pkg/client/listers/gateway/apis/v1alpha2"
+
+ "github.com/apache/apisix-ingress-controller/pkg/apisix"
+ "github.com/apache/apisix-ingress-controller/pkg/config"
+ gatewaytranslation "github.com/apache/apisix-ingress-controller/pkg/ingress/gateway/translation"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/namespace"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
+ "github.com/apache/apisix-ingress-controller/pkg/kube"
+ "github.com/apache/apisix-ingress-controller/pkg/kube/translation"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
+)
+
+const (
+ ProviderName = "GatewayAPI"
+)
+
+type Provider struct {
+ name string
+
+ gatewayNamesLock sync.RWMutex
+ gatewayNames map[string]struct{}
+
+ *ProviderOptions
+ gatewayClient gatewayclientset.Interface
+
+ translator gatewaytranslation.Translator
+
+ gatewayController *gatewayController
+ gatewayInformer cache.SharedIndexInformer
+ gatewayLister gatewaylistersv1alpha2.GatewayLister
+
+ gatewayClassController *gatewayClassController
+ gatewayClassInformer cache.SharedIndexInformer
+ gatewayClassLister gatewaylistersv1alpha2.GatewayClassLister
+
+ gatewayHTTPRouteController *gatewayHTTPRouteController
+ gatewayHTTPRouteInformer cache.SharedIndexInformer
+ gatewayHTTPRouteLister gatewaylistersv1alpha2.HTTPRouteLister
+}
+
+type ProviderOptions struct {
+ Cfg *config.Config
+ APISIX apisix.APISIX
+ APISIXClusterName string
+ KubeTranslator translation.Translator
+ RestConfig *rest.Config
+ KubeClient kubernetes.Interface
+ MetricsCollector metrics.Collector
+ NamespaceProvider namespace.WatchingProvider
+}
+
+func NewGatewayProvider(opts *ProviderOptions) (*Provider, error) {
+ var err error
+ if opts.RestConfig == nil {
+ restConfig, err := kube.BuildRestConfig(opts.Cfg.Kubernetes.Kubeconfig, "")
+ if err != nil {
+ return nil, err
+ }
+
+ opts.RestConfig = restConfig
+ }
+ gatewayKubeClient, err := gatewayclientset.NewForConfig(opts.RestConfig)
+ if err != nil {
+ return nil, err
+ }
+
+ p := &Provider{
+ name: ProviderName,
+
+ ProviderOptions: opts,
+ gatewayClient: gatewayKubeClient,
+
+ translator: gatewaytranslation.NewTranslator(&gatewaytranslation.TranslatorOptions{
+ KubeTranslator: opts.KubeTranslator,
+ }),
+ }
+
+ gatewayFactory := gatewayexternalversions.NewSharedInformerFactory(p.gatewayClient, p.Cfg.Kubernetes.ResyncInterval.Duration)
+
+ p.gatewayLister = gatewayFactory.Gateway().V1alpha2().Gateways().Lister()
+ p.gatewayInformer = gatewayFactory.Gateway().V1alpha2().Gateways().Informer()
+
+ p.gatewayClassLister = gatewayFactory.Gateway().V1alpha2().GatewayClasses().Lister()
+ p.gatewayClassInformer = gatewayFactory.Gateway().V1alpha2().GatewayClasses().Informer()
+
+ p.gatewayHTTPRouteLister = gatewayFactory.Gateway().V1alpha2().HTTPRoutes().Lister()
+ p.gatewayHTTPRouteInformer = gatewayFactory.Gateway().V1alpha2().HTTPRoutes().Informer()
+
+ p.gatewayController = newGatewayController(p)
+
+ p.gatewayClassController, err = newGatewayClassController(p)
+ if err != nil {
+ return nil, err
+ }
+
+ p.gatewayHTTPRouteController = newGatewayHTTPRouteController(p)
+
+ return p, nil
+}
+
+func (p *Provider) Run(ctx context.Context) {
+ e := utils.ParallelExecutor{}
+
+ e.Add(func() {
+ p.gatewayInformer.Run(ctx.Done())
+ })
+
+ e.Add(func() {
+ p.gatewayClassInformer.Run(ctx.Done())
+ })
+
+ e.Add(func() {
+ p.gatewayHTTPRouteInformer.Run(ctx.Done())
+ })
+
+ e.Add(func() {
+ p.gatewayController.run(ctx)
+ })
+
+ e.Add(func() {
+ p.gatewayClassController.run(ctx)
+ })
+
+ e.Add(func() {
+ p.gatewayHTTPRouteController.run(ctx)
+ })
+
+ e.Wait()
+}
+
+func (p *Provider) AddGatewayClass(name string) {
+ p.gatewayNamesLock.Lock()
+ defer p.gatewayNamesLock.Unlock()
+
+ p.gatewayNames[name] = struct{}{}
+
+}
+func (p *Provider) RemoveGatewayClass(name string) {
+ p.gatewayNamesLock.Lock()
+ defer p.gatewayNamesLock.Unlock()
+
+ delete(p.gatewayNames, name)
+}
+
+func (p *Provider) HasGatewayClass(name string) bool {
+ p.gatewayNamesLock.RLock()
+ defer p.gatewayNamesLock.RUnlock()
+
+ _, ok := p.gatewayNames[name]
+ return ok
+}
diff --git a/pkg/kube/translation/gateway_httproute.go b/pkg/ingress/gateway/translation/gateway_httproute.go
similarity index 92%
rename from pkg/kube/translation/gateway_httproute.go
rename to pkg/ingress/gateway/translation/gateway_httproute.go
index 31a176d4..3868bafb 100644
--- a/pkg/kube/translation/gateway_httproute.go
+++ b/pkg/ingress/gateway/translation/gateway_httproute.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
//
-package translation
+package gateway_translation
import (
"fmt"
@@ -26,12 +26,14 @@ import (
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
"github.com/apache/apisix-ingress-controller/pkg/id"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
+ "github.com/apache/apisix-ingress-controller/pkg/kube/translation"
"github.com/apache/apisix-ingress-controller/pkg/log"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
-func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha2.HTTPRoute) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha2.HTTPRoute) (*translation.TranslateContext, error) {
+ ctx := translation.DefaultEmptyTranslateContext()
var hosts []string
for _, hostname := range httpRoute.Spec.Hostnames {
@@ -82,7 +84,7 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha
continue
}
- ups, err := t.TranslateUpstream(ns, string(backend.Name), "", int32(*backend.Port))
+ ups, err := t.KubeTranslator.TranslateUpstream(ns, string(backend.Name), "", int32(*backend.Port))
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j))
}
@@ -90,12 +92,12 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha
// APISIX limits max length of label value
// https://github.com/apache/apisix/blob/5b95b85faea3094d5e466ee2d39a52f1f805abbb/apisix/schema_def.lua#L85
- ups.Labels["meta_namespace"] = truncate(ns, 64)
- ups.Labels["meta_backend"] = truncate(string(backend.Name), 64)
+ ups.Labels["meta_namespace"] = utils.TruncateString(ns, 64)
+ ups.Labels["meta_backend"] = utils.TruncateString(string(backend.Name), 64)
ups.Labels["meta_port"] = fmt.Sprintf("%v", int32(*backend.Port))
ups.ID = id.GenID(name)
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
ruleUpstreams = append(ruleUpstreams, ups)
if backend.Weight == nil {
@@ -154,7 +156,7 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha
}
}
- ctx.addRoute(route)
+ ctx.AddRoute(route)
}
//TODO: Support filters
diff --git a/pkg/kube/translation/gateway_httproute_test.go b/pkg/ingress/gateway/translation/gateway_httproute_test.go
similarity index 97%
rename from pkg/kube/translation/gateway_httproute_test.go
rename to pkg/ingress/gateway/translation/gateway_httproute_test.go
index 9b8d98f2..4f26c454 100644
--- a/pkg/kube/translation/gateway_httproute_test.go
+++ b/pkg/ingress/gateway/translation/gateway_httproute_test.go
@@ -12,7 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package translation
+package gateway_translation
import (
"context"
@@ -30,6 +30,7 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/kube"
fakeapisix "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/fake"
apisixinformers "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
+ "github.com/apache/apisix-ingress-controller/pkg/kube/translation"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -102,9 +103,11 @@ func mockHTTPRouteTranslator(t *testing.T) (*translator, <-chan struct{}) {
tr := &translator{
&TranslatorOptions{
- EndpointLister: epLister,
- ServiceLister: svcLister,
- ApisixUpstreamLister: apisixInformersFactory.Apisix().V2beta3().ApisixUpstreams().Lister(),
+ KubeTranslator: translation.NewTranslator(&translation.TranslatorOptions{
+ EndpointLister: epLister,
+ ServiceLister: svcLister,
+ ApisixUpstreamLister: apisixInformersFactory.Apisix().V2beta3().ApisixUpstreams().Lister(),
+ }),
},
}
diff --git a/pkg/ingress/gateway/translation/translator.go b/pkg/ingress/gateway/translation/translator.go
new file mode 100644
index 00000000..3a8a5fc3
--- /dev/null
+++ b/pkg/ingress/gateway/translation/translator.go
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package gateway_translation
+
+import (
+ gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
+
+ "github.com/apache/apisix-ingress-controller/pkg/kube/translation"
+)
+
+type TranslatorOptions struct {
+ KubeTranslator translation.Translator
+}
+
+type translator struct {
+ *TranslatorOptions
+}
+
+type Translator interface {
+ // TranslateGatewayHTTPRouteV1Alpha2 translates Gateway API HTTPRoute to APISIX resources
+ TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha2.HTTPRoute) (*translation.TranslateContext, error)
+}
+
+// NewTranslator initializes a APISIX CRD resources Translator.
+func NewTranslator(opts *TranslatorOptions) Translator {
+ return &translator{
+ TranslatorOptions: opts,
+ }
+}
diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go
index b99c800c..8e5c0d1e 100644
--- a/pkg/ingress/ingress.go
+++ b/pkg/ingress/ingress.go
@@ -25,6 +25,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
@@ -143,17 +144,17 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("pluginConfigs", tctx.PluginConfigs),
)
- m := &manifest{
- ssl: tctx.SSL,
- routes: tctx.Routes,
- upstreams: tctx.Upstreams,
- pluginConfigs: tctx.PluginConfigs,
+ m := &utils.Manifest{
+ SSLs: tctx.SSL,
+ Routes: tctx.Routes,
+ Upstreams: tctx.Upstreams,
+ PluginConfigs: tctx.PluginConfigs,
}
var (
- added *manifest
- updated *manifest
- deleted *manifest
+ added *utils.Manifest
+ updated *utils.Manifest
+ deleted *utils.Manifest
)
if ev.Type == types.EventDelete {
@@ -170,13 +171,13 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
)
return err
}
- om := &manifest{
- routes: oldCtx.Routes,
- upstreams: oldCtx.Upstreams,
- ssl: oldCtx.SSL,
- pluginConfigs: oldCtx.PluginConfigs,
+ om := &utils.Manifest{
+ Routes: oldCtx.Routes,
+ Upstreams: oldCtx.Upstreams,
+ SSLs: oldCtx.SSL,
+ PluginConfigs: oldCtx.PluginConfigs,
}
- added, updated, deleted = m.diff(om)
+ added, updated, deleted = m.Diff(om)
}
if err := c.controller.syncManifests(ctx, added, updated, deleted); err != nil {
log.Errorw("failed to sync ingress artifacts",
diff --git a/pkg/ingress/namespace.go b/pkg/ingress/namespace/namespace.go
similarity index 84%
rename from pkg/ingress/namespace.go
rename to pkg/ingress/namespace/namespace.go
index 1e4a566a..a00acbec 100644
--- a/pkg/ingress/namespace.go
+++ b/pkg/ingress/namespace/namespace.go
@@ -12,7 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package ingress
+package namespace
import (
"context"
@@ -22,7 +22,6 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@@ -30,13 +29,21 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/types"
)
+// FIXME: Controller should be the Core Part,
+// Provider should act as "EventHandler", register there functions to Controller
+type EventHandler interface {
+ OnAdd()
+ OnUpdate()
+ OnDelete()
+}
+
type namespaceController struct {
- controller *Controller
+ controller *watchingProvider
workqueue workqueue.RateLimitingInterface
workers int
}
-func (c *Controller) newNamespaceController() *namespaceController {
+func newNamespaceController(c *watchingProvider) *namespaceController {
ctl := &namespaceController{
controller: c,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Namespace"),
@@ -52,25 +59,6 @@ func (c *Controller) newNamespaceController() *namespaceController {
return ctl
}
-func (c *Controller) initWatchingNamespacesByLabels(ctx context.Context) error {
- labelSelector := metav1.LabelSelector{MatchLabels: c.watchingLabels}
- opts := metav1.ListOptions{
- LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
- }
- namespaces, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
- if err != nil {
- return err
- }
- var nss []string
-
- for _, ns := range namespaces.Items {
- nss = append(nss, ns.Name)
- c.watchingNamespaces.Store(ns.Name, struct{}{})
- }
- log.Infow("label selector watching namespaces", zap.Strings("namespaces", nss))
- return nil
-}
-
func (c *namespaceController) run(ctx context.Context) {
if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.namespaceInformer.HasSynced); !ok {
log.Error("namespace informers sync failed")
@@ -99,7 +87,7 @@ func (c *namespaceController) runWorker(ctx context.Context) {
func (c *namespaceController) sync(ctx context.Context, ev *types.Event) error {
if ev.Type != types.EventDelete {
// check the labels of specify namespace
- namespace, err := c.controller.kubeClient.Client.CoreV1().Namespaces().Get(ctx, ev.Object.(string), metav1.GetOptions{})
+ namespace, err := c.controller.kube.Client.CoreV1().Namespaces().Get(ctx, ev.Object.(string), metav1.GetOptions{})
if err != nil {
return err
} else {
diff --git a/pkg/ingress/namespace/provider.go b/pkg/ingress/namespace/provider.go
new file mode 100644
index 00000000..e8f442dd
--- /dev/null
+++ b/pkg/ingress/namespace/provider.go
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package namespace
+
+import (
+ "context"
+ "strings"
+ "sync"
+
+ "go.uber.org/zap"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ listerscorev1 "k8s.io/client-go/listers/core/v1"
+ "k8s.io/client-go/tools/cache"
+
+ "github.com/apache/apisix-ingress-controller/pkg/api/validation"
+ "github.com/apache/apisix-ingress-controller/pkg/config"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
+ "github.com/apache/apisix-ingress-controller/pkg/kube"
+ "github.com/apache/apisix-ingress-controller/pkg/log"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
+)
+
+type WatchingProvider interface {
+ Run(ctx context.Context)
+ IsWatchingNamespace(key string) bool
+ WatchingNamespaces() []string
+}
+
+func NewWatchingProvider(ctx context.Context, kube *kube.KubeClient, cfg *config.Config) (WatchingProvider, error) {
+ var (
+ watchingNamespaces = new(sync.Map)
+ watchingLabels = make(map[string]string)
+ )
+ if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
+ for _, ns := range cfg.Kubernetes.AppNamespaces {
+ watchingNamespaces.Store(ns, struct{}{})
+ }
+ }
+ // support namespace label-selector
+ for _, selector := range cfg.Kubernetes.NamespaceSelector {
+ labelSlice := strings.Split(selector, "=")
+ watchingLabels[labelSlice[0]] = labelSlice[1]
+ }
+
+ // watchingNamespaces and watchingLabels are empty means to monitor all namespaces.
+ if !validation.HasValueInSyncMap(watchingNamespaces) && len(watchingLabels) == 0 {
+ opts := metav1.ListOptions{}
+ // list all namespaces
+ nsList, err := kube.Client.CoreV1().Namespaces().List(ctx, opts)
+ if err != nil {
+ log.Error(err.Error())
+ ctx.Done()
+ } else {
+ wns := new(sync.Map)
+ for _, v := range nsList.Items {
+ wns.Store(v.Name, struct{}{})
+ }
+ watchingNamespaces = wns
+ }
+ }
+
+ c := &watchingProvider{
+ kube: kube,
+ cfg: cfg,
+
+ watchingNamespaces: watchingNamespaces,
+ watchingLabels: watchingLabels,
+ }
+
+ kubeFactory := kube.NewSharedIndexInformerFactory()
+ c.namespaceInformer = kubeFactory.Core().V1().Namespaces().Informer()
+ c.namespaceLister = kubeFactory.Core().V1().Namespaces().Lister()
+
+ c.controller = newNamespaceController(c)
+
+ err := c.initWatchingNamespacesByLabels(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return c, nil
+}
+
+type watchingProvider struct {
+ kube *kube.KubeClient
+ cfg *config.Config
+
+ watchingNamespaces *sync.Map
+ watchingLabels types.Labels
+
+ namespaceInformer cache.SharedIndexInformer
+ namespaceLister listerscorev1.NamespaceLister
+
+ controller *namespaceController
+}
+
+func (c *watchingProvider) initWatchingNamespacesByLabels(ctx context.Context) error {
+ labelSelector := metav1.LabelSelector{MatchLabels: c.watchingLabels}
+ opts := metav1.ListOptions{
+ LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
+ }
+ namespaces, err := c.kube.Client.CoreV1().Namespaces().List(ctx, opts)
+ if err != nil {
+ return err
+ }
+ var nss []string
+
+ for _, ns := range namespaces.Items {
+ nss = append(nss, ns.Name)
+ c.watchingNamespaces.Store(ns.Name, struct{}{})
+ }
+ log.Infow("label selector watching namespaces", zap.Strings("namespaces", nss))
+ return nil
+}
+
+func (c *watchingProvider) Run(ctx context.Context) {
+ e := utils.ParallelExecutor{}
+
+ e.Add(func() {
+ c.namespaceInformer.Run(ctx.Done())
+ })
+
+ e.Add(func() {
+ c.controller.run(ctx)
+ })
+
+ e.Wait()
+}
+
+func (c *watchingProvider) WatchingNamespaces() []string {
+ var keys []string
+ c.watchingNamespaces.Range(func(key, _ interface{}) bool {
+ keys = append(keys, key.(string))
+ return true
+ })
+ return keys
+}
+
+// IsWatchingNamespace accepts a resource key, getting the namespace part
+// and checking whether the namespace is being watched.
+func (c *watchingProvider) IsWatchingNamespace(key string) (ok bool) {
+ if !validation.HasValueInSyncMap(c.watchingNamespaces) {
+ ok = true
+ return
+ }
+ ns, _, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ // Ignore resource with invalid key.
+ ok = false
+ log.Warnf("resource %s was ignored since: %s", key, err)
+ return
+ }
+ _, ok = c.watchingNamespaces.Load(ns)
+ return
+}
diff --git a/pkg/ingress/namespace/provider_mock.go b/pkg/ingress/namespace/provider_mock.go
new file mode 100644
index 00000000..e211d400
--- /dev/null
+++ b/pkg/ingress/namespace/provider_mock.go
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package namespace
+
+import (
+ "context"
+
+ "k8s.io/client-go/tools/cache"
+)
+
+func NewMockWatchingProvider(namespaces []string) WatchingProvider {
+ return &mockWatchingProvider{
+ namespaces: namespaces,
+ }
+}
+
+type mockWatchingProvider struct {
+ namespaces []string
+}
+
+func (c *mockWatchingProvider) Run(ctx context.Context) {
+}
+
+func (c *mockWatchingProvider) WatchingNamespaces() []string {
+ return c.namespaces
+}
+
+func (c *mockWatchingProvider) IsWatchingNamespace(key string) (ok bool) {
+ ns, _, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return false
+ }
+
+ for _, namespace := range c.namespaces {
+ if namespace == ns {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/ingress/pod_test.go b/pkg/ingress/pod_test.go
index b75886b3..4235703c 100644
--- a/pkg/ingress/pod_test.go
+++ b/pkg/ingress/pod_test.go
@@ -15,7 +15,6 @@
package ingress
import (
- "sync"
"testing"
"time"
@@ -23,18 +22,17 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/namespace"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
func TestPodOnAdd(t *testing.T) {
- watchingNamespace := new(sync.Map)
- watchingNamespace.Store("default", struct{}{})
ctl := &podController{
controller: &Controller{
- watchingNamespaces: watchingNamespace,
- podCache: types.NewPodCache(),
- MetricsCollector: metrics.NewPrometheusCollector(),
+ namespaceProvider: namespace.NewMockWatchingProvider([]string{"default"}),
+ podCache: types.NewPodCache(),
+ MetricsCollector: metrics.NewPrometheusCollector(),
},
}
@@ -70,13 +68,11 @@ func TestPodOnAdd(t *testing.T) {
}
func TestPodOnDelete(t *testing.T) {
- watchingNamespace := new(sync.Map)
- watchingNamespace.Store("default", struct{}{})
ctl := &podController{
controller: &Controller{
- watchingNamespaces: watchingNamespace,
- podCache: types.NewPodCache(),
- MetricsCollector: metrics.NewPrometheusCollector(),
+ namespaceProvider: namespace.NewMockWatchingProvider([]string{"default"}),
+ podCache: types.NewPodCache(),
+ MetricsCollector: metrics.NewPrometheusCollector(),
},
}
@@ -115,13 +111,11 @@ func TestPodOnDelete(t *testing.T) {
}
func TestPodOnUpdate(t *testing.T) {
- watchingNamespace := new(sync.Map)
- watchingNamespace.Store("default", struct{}{})
ctl := &podController{
controller: &Controller{
- watchingNamespaces: watchingNamespace,
- podCache: types.NewPodCache(),
- MetricsCollector: metrics.NewPrometheusCollector(),
+ namespaceProvider: namespace.NewMockWatchingProvider([]string{"default"}),
+ podCache: types.NewPodCache(),
+ MetricsCollector: metrics.NewPrometheusCollector(),
},
}
diff --git a/pkg/ingress/status.go b/pkg/ingress/status.go
index 604227a0..4a1a4975 100644
--- a/pkg/ingress/status.go
+++ b/pkg/ingress/status.go
@@ -17,9 +17,6 @@ package ingress
import (
"context"
- "fmt"
- "net"
- "time"
"go.uber.org/zap"
apiv1 "k8s.io/api/core/v1"
@@ -29,9 +26,8 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
- "k8s.io/client-go/tools/cache"
- gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
+ "github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
configv2beta2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta2"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
@@ -39,9 +35,8 @@ import (
)
const (
- _conditionType = "ResourcesAvailable"
- _commonSuccessMessage = "Sync Successfully"
- _gatewayLBNotReadyMessage = "The LoadBalancer used by the APISIX gateway is not yet ready"
+ _conditionType = "ResourcesAvailable"
+ _commonSuccessMessage = "Sync Successfully"
)
// verifyGeneration verify generation to decide whether to update status
@@ -301,148 +296,13 @@ func (c *Controller) recordStatus(at interface{}, reason string, err error, stat
zap.String("namespace", v.Namespace),
)
}
- case *gatewayv1alpha2.Gateway:
- gatewayCondition := metav1.Condition{
- Type: string(gatewayv1alpha2.ListenerConditionReady),
- Reason: reason,
- Status: status,
- Message: "Gateway's status has been successfully updated",
- ObservedGeneration: generation,
- }
-
- gatewayKubeClient := c.kubeClient.GatewayClient
-
- if v.Status.Conditions == nil {
- conditions := make([]metav1.Condition, 0)
- v.Status.Conditions = conditions
- } else {
- meta.SetStatusCondition(&v.Status.Conditions, gatewayCondition)
- }
-
- lbips, err := c.ingressLBStatusIPs()
- if err != nil {
- log.Errorw("failed to get APISIX gateway external IPs",
- zap.Error(err),
- )
- }
-
- v.Status.Addresses = convLBIPToGatewayAddr(lbips)
- if _, errRecord := gatewayKubeClient.GatewayV1alpha2().Gateways(v.Namespace).UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
- log.Errorw("failed to record status change for Gateway resource",
- zap.Error(errRecord),
- zap.String("name", v.Name),
- zap.String("namespace", v.Namespace),
- )
- }
default:
// This should not be executed
log.Errorf("unsupported resource record: %s", v)
}
}
-// ingressPublishAddresses get addressed used to expose Ingress
-func (c *Controller) ingressPublishAddresses() ([]string, error) {
- ingressPublishService := c.cfg.IngressPublishService
- ingressStatusAddress := c.cfg.IngressStatusAddress
- addrs := []string{}
-
- // if ingressStatusAddress is specified, it will be used first
- if len(ingressStatusAddress) > 0 {
- addrs = append(addrs, ingressStatusAddress...)
- return addrs, nil
- }
-
- namespace, name, err := cache.SplitMetaNamespaceKey(ingressPublishService)
- if err != nil {
- log.Errorf("invalid ingressPublishService %s: %s", ingressPublishService, err)
- return nil, err
- }
-
- kubeClient := c.kubeClient.Client
- svc, err := kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
- if err != nil {
- return nil, err
- }
-
- switch svc.Spec.Type {
- case apiv1.ServiceTypeLoadBalancer:
- if len(svc.Status.LoadBalancer.Ingress) < 1 {
- return addrs, fmt.Errorf("%s", _gatewayLBNotReadyMessage)
- }
-
- for _, ip := range svc.Status.LoadBalancer.Ingress {
- if ip.IP == "" {
- // typically AWS load-balancers
- addrs = append(addrs, ip.Hostname)
- } else {
- addrs = append(addrs, ip.IP)
- }
- }
-
- addrs = append(addrs, svc.Spec.ExternalIPs...)
- return addrs, nil
- default:
- return addrs, nil
- }
-
-}
-
// ingressLBStatusIPs organizes the available addresses
func (c *Controller) ingressLBStatusIPs() ([]apiv1.LoadBalancerIngress, error) {
- lbips := []apiv1.LoadBalancerIngress{}
- var ips []string
-
- for {
- var err error
- ips, err = c.ingressPublishAddresses()
- if err != nil {
- if err.Error() == _gatewayLBNotReadyMessage {
- log.Warnf("%s. Provided service: %s", _gatewayLBNotReadyMessage, c.cfg.IngressPublishService)
- time.Sleep(time.Second)
- continue
- }
-
- return nil, err
- }
- break
- }
-
- for _, ip := range ips {
- if net.ParseIP(ip) == nil {
- lbips = append(lbips, apiv1.LoadBalancerIngress{Hostname: ip})
- } else {
- lbips = append(lbips, apiv1.LoadBalancerIngress{IP: ip})
- }
-
- }
-
- return lbips, nil
-}
-
-// convLBIPToGatewayAddr convert LoadBalancerIngress to GatewayAddress format
-func convLBIPToGatewayAddr(lbips []apiv1.LoadBalancerIngress) []gatewayv1alpha2.GatewayAddress {
- gas := []gatewayv1alpha2.GatewayAddress{}
-
- // In the definition, there is also an address type called NamedAddress,
- // which we currently do not implement
- HostnameAddressType := gatewayv1alpha2.HostnameAddressType
- IPAddressType := gatewayv1alpha2.IPAddressType
-
- for _, lbip := range lbips {
- if v := lbip.Hostname; v != "" {
- gas = append(gas, gatewayv1alpha2.GatewayAddress{
- Type: &HostnameAddressType,
- Value: v,
- })
- }
-
- if v := lbip.IP; v != "" {
- gas = append(gas, gatewayv1alpha2.GatewayAddress{
- Type: &IPAddressType,
- Value: v,
- })
- }
- }
-
- return gas
+ return utils.IngressLBStatusIPs(c.cfg.IngressPublishService, c.cfg.IngressStatusAddress, c.kubeClient.Client)
}
diff --git a/pkg/ingress/utils/executor.go b/pkg/ingress/utils/executor.go
new file mode 100644
index 00000000..d4e26461
--- /dev/null
+++ b/pkg/ingress/utils/executor.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package utils
+
+import "sync"
+
+type ParallelExecutor struct {
+ wg sync.WaitGroup
+
+ errorsLock sync.Mutex
+ errors []error
+}
+
+func (exec *ParallelExecutor) Add(handler func()) {
+ exec.wg.Add(1)
+ go func() {
+ defer exec.wg.Done()
+ handler()
+ }()
+}
+
+func (exec *ParallelExecutor) AddE(handler func() error) {
+ exec.wg.Add(1)
+ go func() {
+ defer exec.wg.Done()
+ err := handler()
+ if err != nil {
+ exec.errorsLock.Lock()
+ defer exec.errorsLock.Unlock()
+ exec.errors = append(exec.errors, err)
+ }
+ }()
+}
+
+func (exec *ParallelExecutor) Wait() {
+ exec.wg.Wait()
+}
+
+func (exec *ParallelExecutor) Errors() []error {
+ return exec.errors
+}
diff --git a/pkg/ingress/utils/ingress_status.go b/pkg/ingress/utils/ingress_status.go
new file mode 100644
index 00000000..2adb3905
--- /dev/null
+++ b/pkg/ingress/utils/ingress_status.go
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package utils
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "time"
+
+ apiv1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+
+ "github.com/apache/apisix-ingress-controller/pkg/log"
+)
+
+const (
+ _gatewayLBNotReadyMessage = "The LoadBalancer used by the APISIX gateway is not yet ready"
+)
+
+// IngressPublishAddresses get addressed used to expose Ingress
+func IngressPublishAddresses(ingressPublishService string, ingressStatusAddress []string, kubeClient kubernetes.Interface) ([]string, error) {
+ addrs := []string{}
+
+ // if ingressStatusAddress is specified, it will be used first
+ if len(ingressStatusAddress) > 0 {
+ addrs = append(addrs, ingressStatusAddress...)
+ return addrs, nil
+ }
+
+ namespace, name, err := cache.SplitMetaNamespaceKey(ingressPublishService)
+ if err != nil {
+ log.Errorf("invalid ingressPublishService %s: %s", ingressPublishService, err)
+ return nil, err
+ }
+
+ svc, err := kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return nil, err
+ }
+
+ switch svc.Spec.Type {
+ case apiv1.ServiceTypeLoadBalancer:
+ if len(svc.Status.LoadBalancer.Ingress) < 1 {
+ return addrs, fmt.Errorf("%s", _gatewayLBNotReadyMessage)
+ }
+
+ for _, ip := range svc.Status.LoadBalancer.Ingress {
+ if ip.IP == "" {
+ // typically AWS load-balancers
+ addrs = append(addrs, ip.Hostname)
+ } else {
+ addrs = append(addrs, ip.IP)
+ }
+ }
+
+ addrs = append(addrs, svc.Spec.ExternalIPs...)
+ return addrs, nil
+ default:
+ return addrs, nil
+ }
+
+}
+
+// IngressLBStatusIPs organizes the available addresses
+func IngressLBStatusIPs(ingressPublishService string, ingressStatusAddress []string, kubeClient kubernetes.Interface) ([]apiv1.LoadBalancerIngress, error) {
+ lbips := []apiv1.LoadBalancerIngress{}
+ var ips []string
+
+ for {
+ var err error
+ ips, err = IngressPublishAddresses(ingressPublishService, ingressStatusAddress, kubeClient)
+ if err != nil {
+ if err.Error() == _gatewayLBNotReadyMessage {
+ log.Warnf("%s. Provided service: %s", _gatewayLBNotReadyMessage, ingressPublishService)
+ time.Sleep(time.Second)
+ continue
+ }
+
+ return nil, err
+ }
+ break
+ }
+
+ for _, ip := range ips {
+ if net.ParseIP(ip) == nil {
+ lbips = append(lbips, apiv1.LoadBalancerIngress{Hostname: ip})
+ } else {
+ lbips = append(lbips, apiv1.LoadBalancerIngress{IP: ip})
+ }
+
+ }
+
+ return lbips, nil
+}
diff --git a/pkg/ingress/manifest.go b/pkg/ingress/utils/manifest.go
similarity index 63%
rename from pkg/ingress/manifest.go
rename to pkg/ingress/utils/manifest.go
index bf4edfa6..841248ea 100644
--- a/pkg/ingress/manifest.go
+++ b/pkg/ingress/utils/manifest.go
@@ -12,7 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package ingress
+package utils
import (
"context"
@@ -21,12 +21,13 @@ import (
"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
+ "github.com/apache/apisix-ingress-controller/pkg/apisix"
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/log"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
-func diffSSL(olds, news []*apisixv1.Ssl) (added, updated, deleted []*apisixv1.Ssl) {
+func DiffSSL(olds, news []*apisixv1.Ssl) (added, updated, deleted []*apisixv1.Ssl) {
if olds == nil {
return news, nil, nil
}
@@ -58,7 +59,7 @@ func diffSSL(olds, news []*apisixv1.Ssl) (added, updated, deleted []*apisixv1.Ss
return
}
-func diffRoutes(olds, news []*apisixv1.Route) (added, updated, deleted []*apisixv1.Route) {
+func DiffRoutes(olds, news []*apisixv1.Route) (added, updated, deleted []*apisixv1.Route) {
if olds == nil {
return news, nil, nil
}
@@ -90,7 +91,7 @@ func diffRoutes(olds, news []*apisixv1.Route) (added, updated, deleted []*apisix
return
}
-func diffUpstreams(olds, news []*apisixv1.Upstream) (added, updated, deleted []*apisixv1.Upstream) {
+func DiffUpstreams(olds, news []*apisixv1.Upstream) (added, updated, deleted []*apisixv1.Upstream) {
oldMap := make(map[string]*apisixv1.Upstream, len(olds))
newMap := make(map[string]*apisixv1.Upstream, len(news))
for _, u := range olds {
@@ -115,7 +116,7 @@ func diffUpstreams(olds, news []*apisixv1.Upstream) (added, updated, deleted []*
return
}
-func diffStreamRoutes(olds, news []*apisixv1.StreamRoute) (added, updated, deleted []*apisixv1.StreamRoute) {
+func DiffStreamRoutes(olds, news []*apisixv1.StreamRoute) (added, updated, deleted []*apisixv1.StreamRoute) {
oldMap := make(map[string]*apisixv1.StreamRoute, len(olds))
newMap := make(map[string]*apisixv1.StreamRoute, len(news))
for _, sr := range olds {
@@ -140,7 +141,7 @@ func diffStreamRoutes(olds, news []*apisixv1.StreamRoute) (added, updated, delet
return
}
-func diffPluginConfigs(olds, news []*apisixv1.PluginConfig) (added, updated, deleted []*apisixv1.PluginConfig) {
+func DiffPluginConfigs(olds, news []*apisixv1.PluginConfig) (added, updated, deleted []*apisixv1.PluginConfig) {
oldMap := make(map[string]*apisixv1.PluginConfig, len(olds))
newMap := make(map[string]*apisixv1.PluginConfig, len(news))
for _, sr := range olds {
@@ -165,73 +166,72 @@ func diffPluginConfigs(olds, news []*apisixv1.PluginConfig) (added, updated, del
return
}
-type manifest struct {
- routes []*apisixv1.Route
- upstreams []*apisixv1.Upstream
- streamRoutes []*apisixv1.StreamRoute
- ssl []*apisixv1.Ssl
- pluginConfigs []*apisixv1.PluginConfig
+type Manifest struct {
+ Routes []*apisixv1.Route
+ Upstreams []*apisixv1.Upstream
+ StreamRoutes []*apisixv1.StreamRoute
+ SSLs []*apisixv1.Ssl
+ PluginConfigs []*apisixv1.PluginConfig
}
-func (m *manifest) diff(om *manifest) (added, updated, deleted *manifest) {
- sa, su, sd := diffSSL(om.ssl, m.ssl)
- ar, ur, dr := diffRoutes(om.routes, m.routes)
- au, uu, du := diffUpstreams(om.upstreams, m.upstreams)
- asr, usr, dsr := diffStreamRoutes(om.streamRoutes, m.streamRoutes)
- apc, upc, dpc := diffPluginConfigs(om.pluginConfigs, m.pluginConfigs)
+func (m *Manifest) Diff(om *Manifest) (added, updated, deleted *Manifest) {
+ sa, su, sd := DiffSSL(om.SSLs, m.SSLs)
+ ar, ur, dr := DiffRoutes(om.Routes, m.Routes)
+ au, uu, du := DiffUpstreams(om.Upstreams, m.Upstreams)
+ asr, usr, dsr := DiffStreamRoutes(om.StreamRoutes, m.StreamRoutes)
+ apc, upc, dpc := DiffPluginConfigs(om.PluginConfigs, m.PluginConfigs)
if ar != nil || au != nil || asr != nil || sa != nil || apc != nil {
- added = &manifest{
- routes: ar,
- upstreams: au,
- streamRoutes: asr,
- ssl: sa,
- pluginConfigs: apc,
+ added = &Manifest{
+ Routes: ar,
+ Upstreams: au,
+ StreamRoutes: asr,
+ SSLs: sa,
+ PluginConfigs: apc,
}
}
if ur != nil || uu != nil || usr != nil || su != nil || upc != nil {
- updated = &manifest{
- routes: ur,
- upstreams: uu,
- streamRoutes: usr,
- ssl: su,
- pluginConfigs: upc,
+ updated = &Manifest{
+ Routes: ur,
+ Upstreams: uu,
+ StreamRoutes: usr,
+ SSLs: su,
+ PluginConfigs: upc,
}
}
if dr != nil || du != nil || dsr != nil || sd != nil || dpc != nil {
- deleted = &manifest{
- routes: dr,
- upstreams: du,
- streamRoutes: dsr,
- ssl: sd,
- pluginConfigs: dpc,
+ deleted = &Manifest{
+ Routes: dr,
+ Upstreams: du,
+ StreamRoutes: dsr,
+ SSLs: sd,
+ PluginConfigs: dpc,
}
}
return
}
-func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted *manifest) error {
+func SyncManifests(ctx context.Context, apisix apisix.APISIX, clusterName string, added, updated, deleted *Manifest) error {
var merr *multierror.Error
- clusterName := c.cfg.APISIX.DefaultClusterName
if deleted != nil {
- for _, ssl := range deleted.ssl {
- if err := c.apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil {
+ for _, ssl := range deleted.SSLs {
+ if err := apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, r := range deleted.routes {
- if err := c.apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil {
+ for _, r := range deleted.Routes {
+ if err := apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, sr := range deleted.streamRoutes {
- if err := c.apisix.Cluster(clusterName).StreamRoute().Delete(ctx, sr); err != nil {
+ for _, sr := range deleted.StreamRoutes {
+ if err := apisix.Cluster(clusterName).StreamRoute().Delete(ctx, sr); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, u := range deleted.upstreams {
- if err := c.apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil {
+ for _, u := range deleted.Upstreams {
+ if err := apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil {
// Upstream might be referenced by other routes.
if err != cache.ErrStillInUse {
merr = multierror.Append(merr, err)
@@ -243,8 +243,8 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted
}
}
}
- for _, pc := range deleted.pluginConfigs {
- if err := c.apisix.Cluster(clusterName).PluginConfig().Delete(ctx, pc); err != nil {
+ for _, pc := range deleted.PluginConfigs {
+ if err := apisix.Cluster(clusterName).PluginConfig().Delete(ctx, pc); err != nil {
// pluginConfig might be referenced by other routes.
if err != cache.ErrStillInUse {
merr = multierror.Append(merr, err)
@@ -259,55 +259,55 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted
}
if added != nil {
// Should create upstreams firstly due to the dependencies.
- for _, ssl := range added.ssl {
- if _, err := c.apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil {
+ for _, ssl := range added.SSLs {
+ if _, err := apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, u := range added.upstreams {
- if _, err := c.apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil {
+ for _, u := range added.Upstreams {
+ if _, err := apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, pc := range added.pluginConfigs {
- if _, err := c.apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc); err != nil {
+ for _, pc := range added.PluginConfigs {
+ if _, err := apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, r := range added.routes {
- if _, err := c.apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil {
+ for _, r := range added.Routes {
+ if _, err := apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, sr := range added.streamRoutes {
- if _, err := c.apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil {
+ for _, sr := range added.StreamRoutes {
+ if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if updated != nil {
- for _, ssl := range updated.ssl {
- if _, err := c.apisix.Cluster(clusterName).SSL().Update(ctx, ssl); err != nil {
+ for _, ssl := range updated.SSLs {
+ if _, err := apisix.Cluster(clusterName).SSL().Update(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, r := range updated.upstreams {
- if _, err := c.apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil {
+ for _, r := range updated.Upstreams {
+ if _, err := apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, pc := range updated.pluginConfigs {
- if _, err := c.apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc); err != nil {
+ for _, pc := range updated.PluginConfigs {
+ if _, err := apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, r := range updated.routes {
- if _, err := c.apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil {
+ for _, r := range updated.Routes {
+ if _, err := apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
- for _, sr := range updated.streamRoutes {
- if _, err := c.apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil {
+ for _, sr := range updated.StreamRoutes {
+ if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil {
merr = multierror.Append(merr, err)
}
}
diff --git a/pkg/ingress/manifest_test.go b/pkg/ingress/utils/manifest_test.go
similarity index 80%
rename from pkg/ingress/manifest_test.go
rename to pkg/ingress/utils/manifest_test.go
index 9bad3349..0085af8e 100644
--- a/pkg/ingress/manifest_test.go
+++ b/pkg/ingress/utils/manifest_test.go
@@ -12,7 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package ingress
+package utils
import (
"testing"
@@ -36,7 +36,7 @@ func TestDiffRoutes(t *testing.T) {
Methods: []string{"POST"},
},
}
- added, updated, deleted := diffRoutes(nil, news)
+ added, updated, deleted := DiffRoutes(nil, news)
assert.Nil(t, updated)
assert.Nil(t, deleted)
assert.Len(t, added, 2)
@@ -57,7 +57,7 @@ func TestDiffRoutes(t *testing.T) {
Methods: []string{"POST", "PUT"},
},
}
- added, updated, deleted = diffRoutes(olds, nil)
+ added, updated, deleted = DiffRoutes(olds, nil)
assert.Nil(t, updated)
assert.Nil(t, added)
assert.Len(t, deleted, 2)
@@ -65,7 +65,7 @@ func TestDiffRoutes(t *testing.T) {
assert.Equal(t, "3", deleted[1].ID)
assert.Equal(t, []string{"POST", "PUT"}, deleted[1].Methods)
- added, updated, deleted = diffRoutes(olds, news)
+ added, updated, deleted = DiffRoutes(olds, news)
assert.Len(t, added, 1)
assert.Equal(t, "1", added[0].ID)
assert.Len(t, updated, 1)
@@ -85,7 +85,7 @@ func TestDiffStreamRoutes(t *testing.T) {
ServerPort: 8080,
},
}
- added, updated, deleted := diffStreamRoutes(nil, news)
+ added, updated, deleted := DiffStreamRoutes(nil, news)
assert.Nil(t, updated)
assert.Nil(t, deleted)
assert.Len(t, added, 2)
@@ -102,7 +102,7 @@ func TestDiffStreamRoutes(t *testing.T) {
ServerPort: 8081,
},
}
- added, updated, deleted = diffStreamRoutes(olds, nil)
+ added, updated, deleted = DiffStreamRoutes(olds, nil)
assert.Nil(t, updated)
assert.Nil(t, added)
assert.Len(t, deleted, 2)
@@ -110,7 +110,7 @@ func TestDiffStreamRoutes(t *testing.T) {
assert.Equal(t, "3", deleted[1].ID)
assert.Equal(t, int32(8081), deleted[1].ServerPort)
- added, updated, deleted = diffStreamRoutes(olds, news)
+ added, updated, deleted = DiffStreamRoutes(olds, news)
assert.Len(t, added, 1)
assert.Equal(t, "1", added[0].ID)
assert.Len(t, updated, 1)
@@ -135,7 +135,7 @@ func TestDiffUpstreams(t *testing.T) {
Retries: &retries,
},
}
- added, updated, deleted := diffUpstreams(nil, news)
+ added, updated, deleted := DiffUpstreams(nil, news)
assert.Nil(t, updated)
assert.Nil(t, deleted)
assert.Len(t, added, 2)
@@ -160,7 +160,7 @@ func TestDiffUpstreams(t *testing.T) {
},
},
}
- added, updated, deleted = diffUpstreams(olds, nil)
+ added, updated, deleted = DiffUpstreams(olds, nil)
assert.Nil(t, updated)
assert.Nil(t, added)
assert.Len(t, deleted, 2)
@@ -169,7 +169,7 @@ func TestDiffUpstreams(t *testing.T) {
assert.Equal(t, 5, *deleted[1].Retries)
assert.Equal(t, 10, deleted[1].Timeout.Connect)
- added, updated, deleted = diffUpstreams(olds, news)
+ added, updated, deleted = DiffUpstreams(olds, news)
assert.Len(t, added, 1)
assert.Equal(t, "1", added[0].ID)
assert.Len(t, updated, 1)
@@ -196,7 +196,7 @@ func TestDiffPluginConfigs(t *testing.T) {
},
},
}
- added, updated, deleted := diffPluginConfigs(nil, news)
+ added, updated, deleted := DiffPluginConfigs(nil, news)
assert.Nil(t, updated)
assert.Nil(t, deleted)
assert.Len(t, added, 2)
@@ -225,7 +225,7 @@ func TestDiffPluginConfigs(t *testing.T) {
},
},
}
- added, updated, deleted = diffPluginConfigs(olds, nil)
+ added, updated, deleted = DiffPluginConfigs(olds, nil)
assert.Nil(t, updated)
assert.Nil(t, added)
assert.Len(t, deleted, 2)
@@ -233,7 +233,7 @@ func TestDiffPluginConfigs(t *testing.T) {
assert.Equal(t, "3", deleted[1].ID)
assert.Equal(t, olds[1].Plugins, deleted[1].Plugins)
- added, updated, deleted = diffPluginConfigs(olds, news)
+ added, updated, deleted = DiffPluginConfigs(olds, news)
assert.Len(t, added, 1)
assert.Equal(t, "1", added[0].ID)
assert.Len(t, updated, 1)
@@ -245,8 +245,8 @@ func TestDiffPluginConfigs(t *testing.T) {
func TestManifestDiff(t *testing.T) {
retries := 2
- m := &manifest{
- routes: []*apisixv1.Route{
+ m := &Manifest{
+ Routes: []*apisixv1.Route{
{
Metadata: apisixv1.Metadata{
ID: "1",
@@ -259,7 +259,7 @@ func TestManifestDiff(t *testing.T) {
Methods: []string{"GET"},
},
},
- upstreams: []*apisixv1.Upstream{
+ Upstreams: []*apisixv1.Upstream{
{
Metadata: apisixv1.Metadata{
ID: "4",
@@ -267,7 +267,7 @@ func TestManifestDiff(t *testing.T) {
Retries: &retries,
},
},
- pluginConfigs: []*apisixv1.PluginConfig{
+ PluginConfigs: []*apisixv1.PluginConfig{
{
Metadata: apisixv1.Metadata{
ID: "5",
@@ -284,8 +284,8 @@ func TestManifestDiff(t *testing.T) {
},
},
}
- om := &manifest{
- routes: []*apisixv1.Route{
+ om := &Manifest{
+ Routes: []*apisixv1.Route{
{
Metadata: apisixv1.Metadata{
ID: "2",
@@ -300,22 +300,22 @@ func TestManifestDiff(t *testing.T) {
},
}
- added, updated, deleted := m.diff(om)
- assert.Len(t, added.routes, 1)
- assert.Equal(t, "1", added.routes[0].ID)
- assert.Len(t, added.upstreams, 1)
- assert.Equal(t, "4", added.upstreams[0].ID)
- assert.Len(t, added.pluginConfigs, 1)
- assert.Equal(t, "5", added.pluginConfigs[0].ID)
+ added, updated, deleted := m.Diff(om)
+ assert.Len(t, added.Routes, 1)
+ assert.Equal(t, "1", added.Routes[0].ID)
+ assert.Len(t, added.Upstreams, 1)
+ assert.Equal(t, "4", added.Upstreams[0].ID)
+ assert.Len(t, added.PluginConfigs, 1)
+ assert.Equal(t, "5", added.PluginConfigs[0].ID)
- assert.Len(t, updated.routes, 1)
- assert.Equal(t, "3", updated.routes[0].ID)
- assert.Equal(t, []string{"GET"}, updated.routes[0].Methods)
- assert.Nil(t, updated.upstreams)
- assert.Nil(t, updated.pluginConfigs)
+ assert.Len(t, updated.Routes, 1)
+ assert.Equal(t, "3", updated.Routes[0].ID)
+ assert.Equal(t, []string{"GET"}, updated.Routes[0].Methods)
+ assert.Nil(t, updated.Upstreams)
+ assert.Nil(t, updated.PluginConfigs)
- assert.Len(t, deleted.routes, 1)
- assert.Equal(t, "2", deleted.routes[0].ID)
- assert.Nil(t, updated.upstreams)
- assert.Nil(t, updated.pluginConfigs)
+ assert.Len(t, deleted.Routes, 1)
+ assert.Equal(t, "2", deleted.Routes[0].ID)
+ assert.Nil(t, updated.Upstreams)
+ assert.Nil(t, updated.PluginConfigs)
}
diff --git a/pkg/ingress/utils/string.go b/pkg/ingress/utils/string.go
new file mode 100644
index 00000000..0e8c883d
--- /dev/null
+++ b/pkg/ingress/utils/string.go
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package utils
+
+func TruncateString(s string, max int) string {
+ if max > len(s) || max < 0 {
+ return s
+ }
+ return s[:max]
+}
diff --git a/pkg/kube/translation/apisix_pluginconfig.go b/pkg/kube/translation/apisix_pluginconfig.go
index 14ce0467..4bf35510 100644
--- a/pkg/kube/translation/apisix_pluginconfig.go
+++ b/pkg/kube/translation/apisix_pluginconfig.go
@@ -25,7 +25,7 @@ import (
)
func (t *translator) TranslatePluginConfigV2beta3(config *configv2beta3.ApisixPluginConfig) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
pluginMap := make(apisixv1.Plugins)
if len(config.Spec.Plugins) > 0 {
for _, plugin := range config.Spec.Plugins {
@@ -51,21 +51,21 @@ func (t *translator) TranslatePluginConfigV2beta3(config *configv2beta3.ApisixPl
pc.Name = apisixv1.ComposePluginConfigName(config.Namespace, config.Name)
pc.ID = id.GenID(pc.Name)
pc.Plugins = pluginMap
- ctx.addPluginConfig(pc)
+ ctx.AddPluginConfig(pc)
return ctx, nil
}
func (t *translator) TranslatePluginConfigV2beta3NotStrictly(config *configv2beta3.ApisixPluginConfig) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
pc := apisixv1.NewDefaultPluginConfig()
pc.Name = apisixv1.ComposePluginConfigName(config.Namespace, config.Name)
pc.ID = id.GenID(pc.Name)
- ctx.addPluginConfig(pc)
+ ctx.AddPluginConfig(pc)
return ctx, nil
}
func (t *translator) TranslatePluginConfigV2(config *configv2.ApisixPluginConfig) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
pluginMap := make(apisixv1.Plugins)
if len(config.Spec.Plugins) > 0 {
for _, plugin := range config.Spec.Plugins {
@@ -91,15 +91,15 @@ func (t *translator) TranslatePluginConfigV2(config *configv2.ApisixPluginConfig
pc.Name = apisixv1.ComposePluginConfigName(config.Namespace, config.Name)
pc.ID = id.GenID(pc.Name)
pc.Plugins = pluginMap
- ctx.addPluginConfig(pc)
+ ctx.AddPluginConfig(pc)
return ctx, nil
}
func (t *translator) TranslatePluginConfigV2NotStrictly(config *configv2.ApisixPluginConfig) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
pc := apisixv1.NewDefaultPluginConfig()
pc.Name = apisixv1.ComposePluginConfigName(config.Namespace, config.Name)
pc.ID = id.GenID(pc.Name)
- ctx.addPluginConfig(pc)
+ ctx.AddPluginConfig(pc)
return ctx, nil
}
diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go
index 5dafd842..7bdfbedc 100644
--- a/pkg/kube/translation/apisix_route.go
+++ b/pkg/kube/translation/apisix_route.go
@@ -30,7 +30,7 @@ import (
)
func (t *translator) TranslateRouteV2beta2(ar *configv2beta2.ApisixRoute) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
if err := t.translateHTTPRouteV2beta2(ctx, ar); err != nil {
return nil, err
@@ -42,7 +42,7 @@ func (t *translator) TranslateRouteV2beta2(ar *configv2beta2.ApisixRoute) (*Tran
}
func (t *translator) TranslateRouteV2beta2NotStrictly(ar *configv2beta2.ApisixRoute) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
if err := t.translateHTTPRouteV2beta2NotStrictly(ctx, ar); err != nil {
return nil, err
@@ -54,7 +54,7 @@ func (t *translator) TranslateRouteV2beta2NotStrictly(ar *configv2beta2.ApisixRo
}
func (t *translator) TranslateRouteV2beta3(ar *configv2beta3.ApisixRoute) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
if err := t.translateHTTPRouteV2beta3(ctx, ar); err != nil {
return nil, err
@@ -66,7 +66,7 @@ func (t *translator) TranslateRouteV2beta3(ar *configv2beta3.ApisixRoute) (*Tran
}
func (t *translator) TranslateRouteV2beta3NotStrictly(ar *configv2beta3.ApisixRoute) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
if err := t.translateHTTPRouteV2beta3NotStrictly(ctx, ar); err != nil {
return nil, err
@@ -78,7 +78,7 @@ func (t *translator) TranslateRouteV2beta3NotStrictly(ar *configv2beta3.ApisixRo
}
func (t *translator) TranslateRouteV2(ar *configv2.ApisixRoute) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
if err := t.translateHTTPRouteV2(ctx, ar); err != nil {
return nil, err
@@ -90,7 +90,7 @@ func (t *translator) TranslateRouteV2(ar *configv2.ApisixRoute) (*TranslateConte
}
func (t *translator) TranslateRouteV2NotStrictly(ar *configv2.ApisixRoute) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
if err := t.translateHTTPRouteV2NotStrictly(ctx, ar); err != nil {
return nil, err
@@ -200,13 +200,13 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *TranslateContext, ar *config
}
route.Plugins["traffic-split"] = plugin
}
- ctx.addRoute(route)
- if !ctx.checkUpstreamExist(upstreamName) {
+ ctx.AddRoute(route)
+ if !ctx.CheckUpstreamExist(upstreamName) {
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
}
return nil
@@ -336,13 +336,13 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *TranslateContext, ar *config
}
route.Plugins["traffic-split"] = plugin
}
- ctx.addRoute(route)
- if !ctx.checkUpstreamExist(upstreamName) {
+ ctx.AddRoute(route)
+ if !ctx.CheckUpstreamExist(upstreamName) {
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
}
return nil
@@ -472,13 +472,13 @@ func (t *translator) translateHTTPRouteV2(ctx *TranslateContext, ar *configv2.Ap
}
route.Plugins["traffic-split"] = plugin
}
- ctx.addRoute(route)
- if !ctx.checkUpstreamExist(upstreamName) {
+ ctx.AddRoute(route)
+ if !ctx.CheckUpstreamExist(upstreamName) {
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
}
return nil
@@ -595,13 +595,13 @@ func (t *translator) translateHTTPRouteV2beta2NotStrictly(ctx *TranslateContext,
route := apisixv1.NewDefaultRoute()
route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
route.ID = id.GenID(route.Name)
- ctx.addRoute(route)
- if !ctx.checkUpstreamExist(upstreamName) {
+ ctx.AddRoute(route)
+ if !ctx.CheckUpstreamExist(upstreamName) {
ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal)
if err != nil {
return err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
}
return nil
@@ -654,13 +654,13 @@ func (t *translator) translateHTTPRouteV2beta3NotStrictly(ctx *TranslateContext,
route.PluginConfigId = id.GenID(apisixv1.ComposePluginConfigName(ar.Namespace, part.PluginConfigName))
}
- ctx.addRoute(route)
- if !ctx.checkUpstreamExist(upstreamName) {
+ ctx.AddRoute(route)
+ if !ctx.CheckUpstreamExist(upstreamName) {
ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal)
if err != nil {
return err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
}
return nil
@@ -713,13 +713,13 @@ func (t *translator) translateHTTPRouteV2NotStrictly(ctx *TranslateContext, ar *
route.PluginConfigId = id.GenID(apisixv1.ComposePluginConfigName(ar.Namespace, part.PluginConfigName))
}
- ctx.addRoute(route)
- if !ctx.checkUpstreamExist(upstreamName) {
+ ctx.AddRoute(route)
+ if !ctx.CheckUpstreamExist(upstreamName) {
ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal)
if err != nil {
return err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
}
return nil
@@ -751,9 +751,9 @@ func (t *translator) translateStreamRouteV2beta2(ctx *TranslateContext, ar *conf
return err
}
sr.UpstreamId = ups.ID
- ctx.addStreamRoute(sr)
- if !ctx.checkUpstreamExist(ups.Name) {
- ctx.addUpstream(ups)
+ ctx.AddStreamRoute(sr)
+ if !ctx.CheckUpstreamExist(ups.Name) {
+ ctx.AddUpstream(ups)
}
}
@@ -786,9 +786,9 @@ func (t *translator) translateStreamRouteV2beta3(ctx *TranslateContext, ar *conf
return err
}
sr.UpstreamId = ups.ID
- ctx.addStreamRoute(sr)
- if !ctx.checkUpstreamExist(ups.Name) {
- ctx.addUpstream(ups)
+ ctx.AddStreamRoute(sr)
+ if !ctx.CheckUpstreamExist(ups.Name) {
+ ctx.AddUpstream(ups)
}
}
@@ -821,9 +821,9 @@ func (t *translator) translateStreamRouteV2(ctx *TranslateContext, ar *configv2.
return err
}
sr.UpstreamId = ups.ID
- ctx.addStreamRoute(sr)
- if !ctx.checkUpstreamExist(ups.Name) {
- ctx.addUpstream(ups)
+ ctx.AddStreamRoute(sr)
+ if !ctx.CheckUpstreamExist(ups.Name) {
+ ctx.AddUpstream(ups)
}
}
@@ -843,9 +843,9 @@ func (t *translator) translateStreamRouteNotStrictlyV2beta2(ctx *TranslateContex
return err
}
sr.UpstreamId = ups.ID
- ctx.addStreamRoute(sr)
- if !ctx.checkUpstreamExist(ups.Name) {
- ctx.addUpstream(ups)
+ ctx.AddStreamRoute(sr)
+ if !ctx.CheckUpstreamExist(ups.Name) {
+ ctx.AddUpstream(ups)
}
}
return nil
@@ -864,9 +864,9 @@ func (t *translator) translateStreamRouteNotStrictlyV2beta3(ctx *TranslateContex
return err
}
sr.UpstreamId = ups.ID
- ctx.addStreamRoute(sr)
- if !ctx.checkUpstreamExist(ups.Name) {
- ctx.addUpstream(ups)
+ ctx.AddStreamRoute(sr)
+ if !ctx.CheckUpstreamExist(ups.Name) {
+ ctx.AddUpstream(ups)
}
}
return nil
@@ -885,9 +885,9 @@ func (t *translator) translateStreamRouteNotStrictlyV2(ctx *TranslateContext, ar
return err
}
sr.UpstreamId = ups.ID
- ctx.addStreamRoute(sr)
- if !ctx.checkUpstreamExist(ups.Name) {
- ctx.addUpstream(ups)
+ ctx.AddStreamRoute(sr)
+ if !ctx.CheckUpstreamExist(ups.Name) {
+ ctx.AddUpstream(ups)
}
}
return nil
diff --git a/pkg/kube/translation/context.go b/pkg/kube/translation/context.go
index dc67ef74..10aafb62 100644
--- a/pkg/kube/translation/context.go
+++ b/pkg/kube/translation/context.go
@@ -26,25 +26,25 @@ type TranslateContext struct {
PluginConfigs []*apisix.PluginConfig
}
-func defaultEmptyTranslateContext() *TranslateContext {
+func DefaultEmptyTranslateContext() *TranslateContext {
return &TranslateContext{
upstreamMap: make(map[string]struct{}),
}
}
-func (tc *TranslateContext) addRoute(r *apisix.Route) {
+func (tc *TranslateContext) AddRoute(r *apisix.Route) {
tc.Routes = append(tc.Routes, r)
}
-func (tc *TranslateContext) addSSL(ssl *apisix.Ssl) {
+func (tc *TranslateContext) AddSSL(ssl *apisix.Ssl) {
tc.SSL = append(tc.SSL, ssl)
}
-func (tc *TranslateContext) addStreamRoute(sr *apisix.StreamRoute) {
+func (tc *TranslateContext) AddStreamRoute(sr *apisix.StreamRoute) {
tc.StreamRoutes = append(tc.StreamRoutes, sr)
}
-func (tc *TranslateContext) addUpstream(u *apisix.Upstream) {
+func (tc *TranslateContext) AddUpstream(u *apisix.Upstream) {
if _, ok := tc.upstreamMap[u.Name]; ok {
return
}
@@ -52,11 +52,11 @@ func (tc *TranslateContext) addUpstream(u *apisix.Upstream) {
tc.Upstreams = append(tc.Upstreams, u)
}
-func (tc *TranslateContext) checkUpstreamExist(name string) (ok bool) {
+func (tc *TranslateContext) CheckUpstreamExist(name string) (ok bool) {
_, ok = tc.upstreamMap[name]
return
}
-func (tc *TranslateContext) addPluginConfig(pc *apisix.PluginConfig) {
+func (tc *TranslateContext) AddPluginConfig(pc *apisix.PluginConfig) {
tc.PluginConfigs = append(tc.PluginConfigs, pc)
}
diff --git a/pkg/kube/translation/context_test.go b/pkg/kube/translation/context_test.go
index 78cabc5f..045fa8de 100644
--- a/pkg/kube/translation/context_test.go
+++ b/pkg/kube/translation/context_test.go
@@ -23,7 +23,7 @@ import (
)
func TestTranslateContext(t *testing.T) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
r1 := &apisix.Route{
Metadata: apisix.Metadata{
@@ -65,14 +65,14 @@ func TestTranslateContext(t *testing.T) {
Name: "aaa",
},
}
- ctx.addRoute(r1)
- ctx.addRoute(r2)
- ctx.addStreamRoute(sr1)
- ctx.addStreamRoute(sr2)
- ctx.addUpstream(u1)
- ctx.addUpstream(u2)
- ctx.addPluginConfig(pc1)
- ctx.addPluginConfig(pc2)
+ ctx.AddRoute(r1)
+ ctx.AddRoute(r2)
+ ctx.AddStreamRoute(sr1)
+ ctx.AddStreamRoute(sr2)
+ ctx.AddUpstream(u1)
+ ctx.AddUpstream(u2)
+ ctx.AddPluginConfig(pc1)
+ ctx.AddPluginConfig(pc2)
assert.Len(t, ctx.Routes, 2)
assert.Len(t, ctx.StreamRoutes, 2)
@@ -87,6 +87,6 @@ func TestTranslateContext(t *testing.T) {
assert.Equal(t, pc1, ctx.PluginConfigs[0])
assert.Equal(t, pc2, ctx.PluginConfigs[1])
- assert.Equal(t, true, ctx.checkUpstreamExist("aaa"))
- assert.Equal(t, false, ctx.checkUpstreamExist("bbb"))
+ assert.Equal(t, true, ctx.CheckUpstreamExist("aaa"))
+ assert.Equal(t, false, ctx.CheckUpstreamExist("bbb"))
}
diff --git a/pkg/kube/translation/ingress.go b/pkg/kube/translation/ingress.go
index 3724981b..39a4e452 100644
--- a/pkg/kube/translation/ingress.go
+++ b/pkg/kube/translation/ingress.go
@@ -40,7 +40,7 @@ const (
)
func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
plugins := t.translateAnnotations(ing.Annotations)
annoExtractor := annotations.NewExtractor(ing.Annotations)
useRegex := annoExtractor.GetBoolAnnotation(annotations.AnnotationsPrefix + "use-regex")
@@ -72,7 +72,7 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*TranslateCo
)
return nil, err
}
- ctx.addSSL(ssl)
+ ctx.AddSSL(ssl)
}
for _, rule := range ing.Spec.Rules {
for _, pathRule := range rule.HTTP.Paths {
@@ -90,7 +90,7 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*TranslateCo
)
return nil, err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
uris := []string{pathRule.Path}
var nginxVars []kubev2.ApisixRouteHTTPMatchExpr
@@ -143,21 +143,21 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*TranslateCo
pluginConfig.Name = composeIngressPluginName(ing.Namespace, pathRule.Backend.Service.Name)
pluginConfig.ID = id.GenID(route.Name)
pluginConfig.Plugins = *(plugins.DeepCopy())
- ctx.addPluginConfig(pluginConfig)
+ ctx.AddPluginConfig(pluginConfig)
route.PluginConfigId = pluginConfig.ID
}
if ups != nil {
route.UpstreamId = ups.ID
}
- ctx.addRoute(route)
+ ctx.AddRoute(route)
}
}
return ctx, nil
}
func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
plugins := t.translateAnnotations(ing.Annotations)
annoExtractor := annotations.NewExtractor(ing.Annotations)
useRegex := annoExtractor.GetBoolAnnotation(annotations.AnnotationsPrefix + "use-regex")
@@ -189,7 +189,7 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*T
)
return nil, err
}
- ctx.addSSL(ssl)
+ ctx.AddSSL(ssl)
}
for _, rule := range ing.Spec.Rules {
for _, pathRule := range rule.HTTP.Paths {
@@ -207,7 +207,7 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*T
)
return nil, err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
uris := []string{pathRule.Path}
var nginxVars []kubev2.ApisixRouteHTTPMatchExpr
@@ -260,14 +260,14 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*T
pluginConfig.Name = composeIngressPluginName(ing.Namespace, pathRule.Backend.ServiceName)
pluginConfig.ID = id.GenID(route.Name)
pluginConfig.Plugins = *(plugins.DeepCopy())
- ctx.addPluginConfig(pluginConfig)
+ ctx.AddPluginConfig(pluginConfig)
route.PluginConfigId = pluginConfig.ID
}
if ups != nil {
route.UpstreamId = ups.ID
}
- ctx.addRoute(route)
+ ctx.AddRoute(route)
}
}
return ctx, nil
@@ -305,7 +305,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n
}
func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.Ingress) (*TranslateContext, error) {
- ctx := defaultEmptyTranslateContext()
+ ctx := DefaultEmptyTranslateContext()
plugins := t.translateAnnotations(ing.Annotations)
annoExtractor := annotations.NewExtractor(ing.Annotations)
useRegex := annoExtractor.GetBoolAnnotation(annotations.AnnotationsPrefix + "use-regex")
@@ -327,7 +327,7 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In
)
return nil, err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
}
uris := []string{pathRule.Path}
var nginxVars []kubev2.ApisixRouteHTTPMatchExpr
@@ -380,14 +380,14 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In
pluginConfig.Name = composeIngressPluginName(ing.Namespace, pathRule.Backend.ServiceName)
pluginConfig.ID = id.GenID(route.Name)
pluginConfig.Plugins = *(plugins.DeepCopy())
- ctx.addPluginConfig(pluginConfig)
+ ctx.AddPluginConfig(pluginConfig)
route.PluginConfigId = pluginConfig.ID
}
if ups != nil {
route.UpstreamId = ups.ID
}
- ctx.addRoute(route)
+ ctx.AddRoute(route)
}
}
return ctx, nil
diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go
index 0340a2d5..f9236ae8 100644
--- a/pkg/kube/translation/plugin.go
+++ b/pkg/kube/translation/plugin.go
@@ -53,7 +53,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ns strin
if err != nil {
return nil, err
}
- ctx.addUpstream(ups)
+ ctx.AddUpstream(ups)
weight := _defaultWeight
if backend.Weight != nil {
diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go
index 77641449..59b3b1bd 100644
--- a/pkg/kube/translation/translator.go
+++ b/pkg/kube/translation/translator.go
@@ -21,7 +21,6 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
listerscorev1 "k8s.io/client-go/listers/core/v1"
- gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
"github.com/apache/apisix-ingress-controller/pkg/kube"
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
@@ -117,8 +116,6 @@ type Translator interface {
// ExtractKeyPair extracts certificate and private key pair from secret
// Supports APISIX style ("cert" and "key") and Kube style ("tls.crt" and "tls.key)
ExtractKeyPair(s *corev1.Secret, hasPrivateKey bool) ([]byte, []byte, error)
- // TranslateGatewayHTTPRouteV1Alpha2 translates Gateway API HTTPRoute to APISIX resources
- TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha2.HTTPRoute) (*TranslateContext, error)
}
// TranslatorOptions contains options to help Translator
diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go
index cab6d405..2ce379b8 100644
--- a/pkg/kube/translation/util.go
+++ b/pkg/kube/translation/util.go
@@ -261,10 +261,3 @@ func validateRemoteAddrs(remoteAddrs []string) error {
}
return nil
}
-
-func truncate(s string, max int) string {
- if max > len(s) || max < 0 {
- return s
- }
- return s[:max]
-}