You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/05/06 10:14:38 UTC
[apisix-ingress-controller] branch master updated: chore: optimize
the apisix cluster processing (#414)
This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new a3279e4 chore: optimize the apisix cluster processing (#414)
a3279e4 is described below
commit a3279e4028643799d0d3e5b0e3298f5366435f51
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Thu May 6 18:14:32 2021 +0800
chore: optimize the apisix cluster processing (#414)
* chore: optimize the apisix cluster processing
* fix
---
cmd/ingress/ingress.go | 7 +++++--
conf/config-default.yaml | 15 +++++++++++++--
pkg/apisix/apisix.go | 26 ++++++++++++++++++++++++++
pkg/config/config.go | 26 ++++++++++++++++++++++++--
pkg/config/config_test.go | 14 ++++++++------
pkg/ingress/apisix_upstream.go | 6 ++++--
pkg/ingress/controller.go | 15 ++++++++-------
pkg/ingress/manifest.go | 14 ++++++++------
8 files changed, 96 insertions(+), 27 deletions(-)
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index f90b50f..df931ef 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -147,8 +147,11 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, "apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute api group version, can be \"apisix.apache.org/v1\" or \"apisix.apache.org/v2alpha1\"")
- cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api")
- cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api")
+ cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api (deprecated, using --default-apisix-cluster-base-url instead)")
+ cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api (deprecated, using --default-apisix-cluster-admin-key instead)")
+ cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster")
+ cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey, "default-apisix-cluster-admin-key", "", "admin key used for the authorization of admin api / manager api for the default APISIX cluster")
+ cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName, "default-apisix-cluster-name", "default", "name of the default apisix cluster")
return cmd
}
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index e255b18..78c4f5d 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -43,7 +43,7 @@ kubernetes:
election_id: "ingress-apisix-leader" # the election id for the controller leader campaign,
# only the leader will watch and delivery resource changes,
# other instances (as candidates) stand by.
- ingress_class: "apisix" # The class of an Ingress object is set using the field
+ ingress_class: "apisix" # the class of an Ingress object is set using the field
# IngressClassName in Kubernetes clusters version v1.18.0
# or higher or the annotation "kubernetes.io/ingress.class"
# (deprecated).
@@ -56,5 +56,16 @@ kubernetes:
# default is "apisix.apache.org/v2alpha1".
# APISIX related configurations.
apisix:
- base_url: "http://127.0.0.1:9080/apisix/admin" # the APISIX admin api / manager api
+ base_url: "http://127.0.0.1:9080/apisix/admin" # (Deprecated, use default_cluster_base_url) the APISIX admin api / manager api
# base url, it's required.
+
+ default_cluster_base_url: "http://127.0.0.1:9080/apisix/admin" # The base url of admin api / manager api
+ # of the default APISIX cluster
+
+ admin_key: "" # (Deprecated, use default_cluster_admin_key) the admin key used for the authentication of
+ # admin api / manager api in the default APISIX cluster, by default this field is unset.
+
+ default_cluster_admin_key: "" # the admin key used for the authentication of admin api / manager api in the
+ # default APISIX cluster, by default this field is unset.
+
+ default_cluster_name: "default" # name of the default APISIX cluster.
diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index e407b7e..e6c346c 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -16,6 +16,7 @@ package apisix
import (
"context"
+ "sync"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -26,6 +27,8 @@ type APISIX interface {
Cluster(string) Cluster
// AddCluster adds a new cluster.
AddCluster(*ClusterOptions) error
+ // UpdateCluster updates an existing cluster.
+ UpdateCluster(*ClusterOptions) error
// ListClusters lists all APISIX clusters.
ListClusters() []Cluster
}
@@ -98,6 +101,7 @@ type GlobalRule interface {
}
type apisix struct {
+ mu sync.RWMutex
nonExistentCluster Cluster
clusters map[string]Cluster
}
@@ -112,6 +116,8 @@ func NewClient() (APISIX, error) {
// Cluster implements APISIX.Cluster method.
func (c *apisix) Cluster(name string) Cluster {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
cluster, ok := c.clusters[name]
if !ok {
return c.nonExistentCluster
@@ -121,6 +127,8 @@ func (c *apisix) Cluster(name string) Cluster {
// ListClusters implements APISIX.ListClusters method.
func (c *apisix) ListClusters() []Cluster {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
clusters := make([]Cluster, 0, len(c.clusters))
for _, cluster := range c.clusters {
clusters = append(clusters, cluster)
@@ -130,6 +138,8 @@ func (c *apisix) ListClusters() []Cluster {
// AddCluster implements APISIX.AddCluster method.
func (c *apisix) AddCluster(co *ClusterOptions) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
_, ok := c.clusters[co.Name]
if ok {
return ErrDuplicatedCluster
@@ -144,3 +154,19 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
c.clusters[co.Name] = cluster
return nil
}
+
+func (c *apisix) UpdateCluster(co *ClusterOptions) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if _, ok := c.clusters[co.Name]; !ok {
+ return ErrClusterNotExist
+ }
+
+ cluster, err := newCluster(co)
+ if err != nil {
+ return err
+ }
+
+ c.clusters[co.Name] = cluster
+ return nil
+}
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 72043be..aa7f681 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -78,8 +78,20 @@ type KubernetesConfig struct {
// APISIXConfig contains all APISIX related config items.
type APISIXConfig struct {
- BaseURL string `json:"base_url" yaml:"base_url"`
+ // DefaultClusterName is the name of default cluster.
+ DefaultClusterName string `json:"default_cluster_name"`
+ // DefaultClusterBaseURL is the base url configuration for the default cluster.
+ DefaultClusterBaseURL string `json:"default_cluster_base_url" yaml:"default_cluster_base_url"`
+ // DefaultClusterAdminKey is the admin key for the default cluster.
// TODO: Obsolete the plain way to specify admin_key, which is insecure.
+ DefaultClusterAdminKey string `json:"default_cluster_admin_key" yaml:"default_cluster_admin_key"`
+ // BaseURL is same to DefaultClusterBaseURL.
+ // Deprecated: use DefaultClusterBaseURL instead. BaseURL will be removed
+ // once v1.0.0 is released.
+ BaseURL string `json:"base_url" yaml:"base_url"`
+ // AdminKey is same to DefaultClusterAdminKey.
+ // Deprecated: use DefaultClusterAdminKey instead. AdminKey will be removed
+ // // once v1.0.0 is released.
AdminKey string `json:"admin_key" yaml:"admin_key"`
}
@@ -130,7 +142,17 @@ func (cfg *Config) Validate() error {
if cfg.Kubernetes.ResyncInterval.Duration < _minimalResyncInterval {
return errors.New("controller resync interval too small")
}
- if cfg.APISIX.BaseURL == "" {
+ if cfg.APISIX.DefaultClusterAdminKey == "" {
+ cfg.APISIX.DefaultClusterAdminKey = cfg.APISIX.AdminKey
+ }
+ if cfg.APISIX.DefaultClusterBaseURL == "" {
+ cfg.APISIX.DefaultClusterBaseURL = cfg.APISIX.BaseURL
+ }
+ if cfg.APISIX.DefaultClusterName == "" {
+ cfg.APISIX.DefaultClusterName = "default"
+ }
+
+ if cfg.APISIX.DefaultClusterBaseURL == "" {
return errors.New("apisix base url is required")
}
switch cfg.Kubernetes.IngressVersion {
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 6c814f7..93be369 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -42,8 +42,9 @@ func TestNewConfigFromFile(t *testing.T) {
ApisixRouteVersion: ApisixRouteV2alpha1,
},
APISIX: APISIXConfig{
- BaseURL: "http://127.0.0.1:8080/apisix",
- AdminKey: "123456",
+ DefaultClusterName: "default",
+ DefaultClusterBaseURL: "http://127.0.0.1:8080/apisix",
+ DefaultClusterAdminKey: "123456",
},
}
@@ -80,8 +81,8 @@ kubernetes:
ingress_class: apisix
ingress_version: networking/v1
apisix:
- base_url: http://127.0.0.1:8080/apisix
- admin_key: "123456"
+ default_cluster_base_url: http://127.0.0.1:8080/apisix
+ default_cluster_admin_key: "123456"
`
tmpYAML, err := ioutil.TempFile("/tmp", "config-*.yaml")
assert.Nil(t, err, "failed to create temporary yaml configuration file: ", err)
@@ -101,7 +102,7 @@ apisix:
func TestConfigDefaultValue(t *testing.T) {
yamlData := `
apisix:
- base_url: http://127.0.0.1:8080/apisix
+ default_cluster_base_url: http://127.0.0.1:8080/apisix
`
tmpYAML, err := ioutil.TempFile("/tmp", "config-*.yaml")
assert.Nil(t, err, "failed to create temporary yaml configuration file: ", err)
@@ -116,7 +117,8 @@ apisix:
assert.Nil(t, newCfg.Validate(), "failed to validate config")
defaultCfg := NewDefaultConfig()
- defaultCfg.APISIX.BaseURL = "http://127.0.0.1:8080/apisix"
+ defaultCfg.APISIX.DefaultClusterBaseURL = "http://127.0.0.1:8080/apisix"
+ defaultCfg.APISIX.DefaultClusterName = "default"
assert.Equal(t, defaultCfg, newCfg, "bad configuration")
}
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index 634b60d..9c9d8eb 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -128,10 +128,11 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
return err
}
+ clusterName := c.controller.cfg.APISIX.DefaultClusterName
for _, port := range svc.Spec.Ports {
upsName := apisixv1.ComposeUpstreamName(namespace, name, port.Port)
// TODO: multiple cluster
- ups, err := c.controller.apisix.Cluster("").Upstream().Get(ctx, upsName)
+ ups, err := c.controller.apisix.Cluster(clusterName).Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {
continue
@@ -169,11 +170,12 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.Any("upstream", newUps),
zap.Any("ApisixUpstream", au),
)
- if _, err := c.controller.apisix.Cluster("").Upstream().Update(ctx, newUps); err != nil {
+ if _, err := c.controller.apisix.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil {
log.Errorw("failed to update upstream",
zap.Error(err),
zap.Any("upstream", newUps),
zap.Any("ApisixUpstream", au),
+ zap.String("cluster", clusterName),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 89876e2..7b2c9f9 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -318,9 +318,9 @@ func (c *Controller) run(ctx context.Context) {
c.metricsCollector.ResetLeader(true)
err := c.apisix.AddCluster(&apisix.ClusterOptions{
- Name: "",
- AdminKey: c.cfg.APISIX.AdminKey,
- BaseURL: c.cfg.APISIX.BaseURL,
+ Name: c.cfg.APISIX.DefaultClusterName,
+ AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
+ BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
})
if err != nil && err != apisix.ErrDuplicatedCluster {
// TODO give up the leader role.
@@ -328,7 +328,7 @@ func (c *Controller) run(ctx context.Context) {
return
}
- if err := c.apisix.Cluster("").HasSynced(ctx); err != nil {
+ if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
// TODO give up the leader role.
log.Errorf("failed to wait the default cluster to be ready: %s", err)
return
@@ -400,12 +400,13 @@ func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types
var (
err error
)
+ clusterName := c.cfg.APISIX.DefaultClusterName
if event == types.EventDelete {
- err = c.apisix.Cluster("").SSL().Delete(ctx, ssl)
+ err = c.apisix.Cluster(clusterName).SSL().Delete(ctx, ssl)
} else if event == types.EventUpdate {
- _, err = c.apisix.Cluster("").SSL().Update(ctx, ssl)
+ _, err = c.apisix.Cluster(clusterName).SSL().Update(ctx, ssl)
} else {
- _, err = c.apisix.Cluster("").SSL().Create(ctx, ssl)
+ _, err = c.apisix.Cluster(clusterName).SSL().Create(ctx, ssl)
}
return err
}
diff --git a/pkg/ingress/manifest.go b/pkg/ingress/manifest.go
index 25901d1..8e3034c 100644
--- a/pkg/ingress/manifest.go
+++ b/pkg/ingress/manifest.go
@@ -111,14 +111,16 @@ func (m *manifest) diff(om *manifest) (added, updated, deleted *manifest) {
func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted *manifest) error {
var merr *multierror.Error
+
+ clusterName := c.cfg.APISIX.DefaultClusterName
if deleted != nil {
for _, r := range deleted.routes {
- if err := c.apisix.Cluster("").Route().Delete(ctx, r); err != nil {
+ if err := c.apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, u := range deleted.upstreams {
- if err := c.apisix.Cluster("").Upstream().Delete(ctx, u); err != nil {
+ if err := c.apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil {
merr = multierror.Append(merr, err)
}
}
@@ -126,24 +128,24 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted
if added != nil {
// Should create upstreams firstly due to the dependencies.
for _, u := range added.upstreams {
- if _, err := c.apisix.Cluster("").Upstream().Create(ctx, u); err != nil {
+ if _, err := c.apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range added.routes {
- if _, err := c.apisix.Cluster("").Route().Create(ctx, r); err != nil {
+ if _, err := c.apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if updated != nil {
for _, r := range updated.upstreams {
- if _, err := c.apisix.Cluster("").Upstream().Update(ctx, r); err != nil {
+ if _, err := c.apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range updated.routes {
- if _, err := c.apisix.Cluster("").Route().Update(ctx, r); err != nil {
+ if _, err := c.apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}