You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/05/06 10:14:38 UTC

[apisix-ingress-controller] branch master updated: chore: optimize the apisix cluster processing (#414)

This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new a3279e4  chore: optimize the apisix cluster processing (#414)
a3279e4 is described below

commit a3279e4028643799d0d3e5b0e3298f5366435f51
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Thu May 6 18:14:32 2021 +0800

    chore: optimize the apisix cluster processing (#414)
    
    * chore: optimize the apisix cluster processing
    
    * fix
---
 cmd/ingress/ingress.go         |  7 +++++--
 conf/config-default.yaml       | 15 +++++++++++++--
 pkg/apisix/apisix.go           | 26 ++++++++++++++++++++++++++
 pkg/config/config.go           | 26 ++++++++++++++++++++++++--
 pkg/config/config_test.go      | 14 ++++++++------
 pkg/ingress/apisix_upstream.go |  6 ++++--
 pkg/ingress/controller.go      | 15 ++++++++-------
 pkg/ingress/manifest.go        | 14 ++++++++------
 8 files changed, 96 insertions(+), 27 deletions(-)

diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index f90b50f..df931ef 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -147,8 +147,11 @@ the apisix cluster and others are created`,
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader")
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"")
 	cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, "apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute api group version, can be \"apisix.apache.org/v1\" or \"apisix.apache.org/v2alpha1\"")
-	cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api")
-	cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api")
+	cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api (deprecated, using --default-apisix-cluster-base-url instead)")
+	cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api (deprecated, using --default-apisix-cluster-admin-key instead)")
+	cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster")
+	cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey, "default-apisix-cluster-admin-key", "", "admin key used for the authorization of admin api / manager api for the default APISIX cluster")
+	cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName, "default-apisix-cluster-name", "default", "name of the default apisix cluster")
 
 	return cmd
 }
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index e255b18..78c4f5d 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -43,7 +43,7 @@ kubernetes:
   election_id: "ingress-apisix-leader" # the election id for the controller leader campaign,
                                        # only the leader will watch and delivery resource changes,
                                        # other instances (as candidates) stand by.
-  ingress_class: "apisix"              # The class of an Ingress object is set using the field
+  ingress_class: "apisix"              # the class of an Ingress object is set using the field
                                        # IngressClassName in Kubernetes clusters version v1.18.0
                                        # or higher or the annotation "kubernetes.io/ingress.class"
                                        # (deprecated).
@@ -56,5 +56,16 @@ kubernetes:
                                                      # default is "apisix.apache.org/v2alpha1".
 # APISIX related configurations.
 apisix:
-  base_url: "http://127.0.0.1:9080/apisix/admin" # the APISIX admin api / manager api
+  base_url: "http://127.0.0.1:9080/apisix/admin" # (Deprecated, use default_cluster_base_url) the APISIX admin api / manager api
                                                  # base url, it's required.
+
+  default_cluster_base_url: "http://127.0.0.1:9080/apisix/admin" # The base url of admin api / manager api
+                                                                 # of the default APISIX cluster
+
+  admin_key: "" # (Deprecated, use default_cluster_admin_key) the admin key used for the authentication of
+                # admin api / manager api in the default APISIX cluster, by default this field is unset.
+
+  default_cluster_admin_key: "" # the admin key used for the authentication of admin api / manager api in the
+                                # default APISIX cluster, by default this field is unset.
+
+  default_cluster_name: "default" # name of the default APISIX cluster.
diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index e407b7e..e6c346c 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -16,6 +16,7 @@ package apisix
 
 import (
 	"context"
+	"sync"
 
 	v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
@@ -26,6 +27,8 @@ type APISIX interface {
 	Cluster(string) Cluster
 	// AddCluster adds a new cluster.
 	AddCluster(*ClusterOptions) error
+	// UpdateCluster updates an existing cluster.
+	UpdateCluster(*ClusterOptions) error
 	// ListClusters lists all APISIX clusters.
 	ListClusters() []Cluster
 }
@@ -98,6 +101,7 @@ type GlobalRule interface {
 }
 
 type apisix struct {
+	mu                 sync.RWMutex
 	nonExistentCluster Cluster
 	clusters           map[string]Cluster
 }
@@ -112,6 +116,8 @@ func NewClient() (APISIX, error) {
 
 // Cluster implements APISIX.Cluster method.
 func (c *apisix) Cluster(name string) Cluster {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 	cluster, ok := c.clusters[name]
 	if !ok {
 		return c.nonExistentCluster
@@ -121,6 +127,8 @@ func (c *apisix) Cluster(name string) Cluster {
 
 // ListClusters implements APISIX.ListClusters method.
 func (c *apisix) ListClusters() []Cluster {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 	clusters := make([]Cluster, 0, len(c.clusters))
 	for _, cluster := range c.clusters {
 		clusters = append(clusters, cluster)
@@ -130,6 +138,8 @@ func (c *apisix) ListClusters() []Cluster {
 
 // AddCluster implements APISIX.AddCluster method.
 func (c *apisix) AddCluster(co *ClusterOptions) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	_, ok := c.clusters[co.Name]
 	if ok {
 		return ErrDuplicatedCluster
@@ -144,3 +154,19 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
 	c.clusters[co.Name] = cluster
 	return nil
 }
+
+func (c *apisix) UpdateCluster(co *ClusterOptions) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if _, ok := c.clusters[co.Name]; !ok {
+		return ErrClusterNotExist
+	}
+
+	cluster, err := newCluster(co)
+	if err != nil {
+		return err
+	}
+
+	c.clusters[co.Name] = cluster
+	return nil
+}
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 72043be..aa7f681 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -78,8 +78,20 @@ type KubernetesConfig struct {
 
 // APISIXConfig contains all APISIX related config items.
 type APISIXConfig struct {
-	BaseURL string `json:"base_url" yaml:"base_url"`
+	// DefaultClusterName is the name of default cluster.
+	DefaultClusterName string `json:"default_cluster_name"`
+	// DefaultClusterBaseURL is the base url configuration for the default cluster.
+	DefaultClusterBaseURL string `json:"default_cluster_base_url" yaml:"default_cluster_base_url"`
+	// DefaultClusterAdminKey is the admin key for the default cluster.
 	// TODO: Obsolete the plain way to specify admin_key, which is insecure.
+	DefaultClusterAdminKey string `json:"default_cluster_admin_key" yaml:"default_cluster_admin_key"`
+	// BaseURL is same to DefaultClusterBaseURL.
+	// Deprecated: use DefaultClusterBaseURL instead. BaseURL will be removed
+	// once v1.0.0 is released.
+	BaseURL string `json:"base_url" yaml:"base_url"`
+	// AdminKey is same to DefaultClusterAdminKey.
+	// Deprecated: use DefaultClusterAdminKey instead. AdminKey will be removed
+	//	// once v1.0.0 is released.
 	AdminKey string `json:"admin_key" yaml:"admin_key"`
 }
 
@@ -130,7 +142,17 @@ func (cfg *Config) Validate() error {
 	if cfg.Kubernetes.ResyncInterval.Duration < _minimalResyncInterval {
 		return errors.New("controller resync interval too small")
 	}
-	if cfg.APISIX.BaseURL == "" {
+	if cfg.APISIX.DefaultClusterAdminKey == "" {
+		cfg.APISIX.DefaultClusterAdminKey = cfg.APISIX.AdminKey
+	}
+	if cfg.APISIX.DefaultClusterBaseURL == "" {
+		cfg.APISIX.DefaultClusterBaseURL = cfg.APISIX.BaseURL
+	}
+	if cfg.APISIX.DefaultClusterName == "" {
+		cfg.APISIX.DefaultClusterName = "default"
+	}
+
+	if cfg.APISIX.DefaultClusterBaseURL == "" {
 		return errors.New("apisix base url is required")
 	}
 	switch cfg.Kubernetes.IngressVersion {
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 6c814f7..93be369 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -42,8 +42,9 @@ func TestNewConfigFromFile(t *testing.T) {
 			ApisixRouteVersion: ApisixRouteV2alpha1,
 		},
 		APISIX: APISIXConfig{
-			BaseURL:  "http://127.0.0.1:8080/apisix",
-			AdminKey: "123456",
+			DefaultClusterName:     "default",
+			DefaultClusterBaseURL:  "http://127.0.0.1:8080/apisix",
+			DefaultClusterAdminKey: "123456",
 		},
 	}
 
@@ -80,8 +81,8 @@ kubernetes:
   ingress_class: apisix
   ingress_version: networking/v1
 apisix:
-  base_url: http://127.0.0.1:8080/apisix
-  admin_key: "123456"
+  default_cluster_base_url: http://127.0.0.1:8080/apisix
+  default_cluster_admin_key: "123456"
 `
 	tmpYAML, err := ioutil.TempFile("/tmp", "config-*.yaml")
 	assert.Nil(t, err, "failed to create temporary yaml configuration file: ", err)
@@ -101,7 +102,7 @@ apisix:
 func TestConfigDefaultValue(t *testing.T) {
 	yamlData := `
 apisix:
-  base_url: http://127.0.0.1:8080/apisix
+  default_cluster_base_url: http://127.0.0.1:8080/apisix
 `
 	tmpYAML, err := ioutil.TempFile("/tmp", "config-*.yaml")
 	assert.Nil(t, err, "failed to create temporary yaml configuration file: ", err)
@@ -116,7 +117,8 @@ apisix:
 	assert.Nil(t, newCfg.Validate(), "failed to validate config")
 
 	defaultCfg := NewDefaultConfig()
-	defaultCfg.APISIX.BaseURL = "http://127.0.0.1:8080/apisix"
+	defaultCfg.APISIX.DefaultClusterBaseURL = "http://127.0.0.1:8080/apisix"
+	defaultCfg.APISIX.DefaultClusterName = "default"
 
 	assert.Equal(t, defaultCfg, newCfg, "bad configuration")
 }
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index 634b60d..9c9d8eb 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -128,10 +128,11 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
 		return err
 	}
 
+	clusterName := c.controller.cfg.APISIX.DefaultClusterName
 	for _, port := range svc.Spec.Ports {
 		upsName := apisixv1.ComposeUpstreamName(namespace, name, port.Port)
 		// TODO: multiple cluster
-		ups, err := c.controller.apisix.Cluster("").Upstream().Get(ctx, upsName)
+		ups, err := c.controller.apisix.Cluster(clusterName).Upstream().Get(ctx, upsName)
 		if err != nil {
 			if err == apisixcache.ErrNotFound {
 				continue
@@ -169,11 +170,12 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
 			zap.Any("upstream", newUps),
 			zap.Any("ApisixUpstream", au),
 		)
-		if _, err := c.controller.apisix.Cluster("").Upstream().Update(ctx, newUps); err != nil {
+		if _, err := c.controller.apisix.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil {
 			log.Errorw("failed to update upstream",
 				zap.Error(err),
 				zap.Any("upstream", newUps),
 				zap.Any("ApisixUpstream", au),
+				zap.String("cluster", clusterName),
 			)
 			c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
 			recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 89876e2..7b2c9f9 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -318,9 +318,9 @@ func (c *Controller) run(ctx context.Context) {
 	c.metricsCollector.ResetLeader(true)
 
 	err := c.apisix.AddCluster(&apisix.ClusterOptions{
-		Name:     "",
-		AdminKey: c.cfg.APISIX.AdminKey,
-		BaseURL:  c.cfg.APISIX.BaseURL,
+		Name:     c.cfg.APISIX.DefaultClusterName,
+		AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
+		BaseURL:  c.cfg.APISIX.DefaultClusterBaseURL,
 	})
 	if err != nil && err != apisix.ErrDuplicatedCluster {
 		// TODO give up the leader role.
@@ -328,7 +328,7 @@ func (c *Controller) run(ctx context.Context) {
 		return
 	}
 
-	if err := c.apisix.Cluster("").HasSynced(ctx); err != nil {
+	if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
 		// TODO give up the leader role.
 		log.Errorf("failed to wait the default cluster to be ready: %s", err)
 		return
@@ -400,12 +400,13 @@ func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types
 	var (
 		err error
 	)
+	clusterName := c.cfg.APISIX.DefaultClusterName
 	if event == types.EventDelete {
-		err = c.apisix.Cluster("").SSL().Delete(ctx, ssl)
+		err = c.apisix.Cluster(clusterName).SSL().Delete(ctx, ssl)
 	} else if event == types.EventUpdate {
-		_, err = c.apisix.Cluster("").SSL().Update(ctx, ssl)
+		_, err = c.apisix.Cluster(clusterName).SSL().Update(ctx, ssl)
 	} else {
-		_, err = c.apisix.Cluster("").SSL().Create(ctx, ssl)
+		_, err = c.apisix.Cluster(clusterName).SSL().Create(ctx, ssl)
 	}
 	return err
 }
diff --git a/pkg/ingress/manifest.go b/pkg/ingress/manifest.go
index 25901d1..8e3034c 100644
--- a/pkg/ingress/manifest.go
+++ b/pkg/ingress/manifest.go
@@ -111,14 +111,16 @@ func (m *manifest) diff(om *manifest) (added, updated, deleted *manifest) {
 
 func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted *manifest) error {
 	var merr *multierror.Error
+
+	clusterName := c.cfg.APISIX.DefaultClusterName
 	if deleted != nil {
 		for _, r := range deleted.routes {
-			if err := c.apisix.Cluster("").Route().Delete(ctx, r); err != nil {
+			if err := c.apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil {
 				merr = multierror.Append(merr, err)
 			}
 		}
 		for _, u := range deleted.upstreams {
-			if err := c.apisix.Cluster("").Upstream().Delete(ctx, u); err != nil {
+			if err := c.apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil {
 				merr = multierror.Append(merr, err)
 			}
 		}
@@ -126,24 +128,24 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted
 	if added != nil {
 		// Should create upstreams firstly due to the dependencies.
 		for _, u := range added.upstreams {
-			if _, err := c.apisix.Cluster("").Upstream().Create(ctx, u); err != nil {
+			if _, err := c.apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil {
 				merr = multierror.Append(merr, err)
 			}
 		}
 		for _, r := range added.routes {
-			if _, err := c.apisix.Cluster("").Route().Create(ctx, r); err != nil {
+			if _, err := c.apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil {
 				merr = multierror.Append(merr, err)
 			}
 		}
 	}
 	if updated != nil {
 		for _, r := range updated.upstreams {
-			if _, err := c.apisix.Cluster("").Upstream().Update(ctx, r); err != nil {
+			if _, err := c.apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil {
 				merr = multierror.Append(merr, err)
 			}
 		}
 		for _, r := range updated.routes {
-			if _, err := c.apisix.Cluster("").Route().Update(ctx, r); err != nil {
+			if _, err := c.apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil {
 				merr = multierror.Append(merr, err)
 			}
 		}