You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by to...@apache.org on 2021/04/22 06:35:37 UTC
[apisix-ingress-controller] branch master updated: feat: add tcp
route data structures and cache (#398)
This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 3c6f911 feat: add tcp route data structures and cache (#398)
3c6f911 is described below
commit 3c6f911714b344c0993ccddac4a529a8647008d5
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Thu Apr 22 14:35:30 2021 +0800
feat: add tcp route data structures and cache (#398)
---
pkg/apisix/apisix.go | 15 +-
pkg/apisix/cache/cache.go | 8 +
pkg/apisix/cache/memdb.go | 47 ++++-
pkg/apisix/cache/memdb_test.go | 58 ++++++
pkg/apisix/cache/schema.go | 16 ++
pkg/apisix/cluster.go | 9 +-
pkg/apisix/nonexistentclient.go | 68 +++++--
pkg/apisix/resource.go | 15 ++
pkg/apisix/route.go | 3 +-
pkg/apisix/{route.go => stream_route.go} | 109 ++++++-----
pkg/apisix/stream_route_test.go | 200 +++++++++++++++++++++
pkg/kube/apisix/apis/config/v2alpha1/types.go | 30 ++++
.../apis/config/v2alpha1/zz_generated.deepcopy.go | 62 +++++++
pkg/types/apisix/v1/types.go | 11 ++
pkg/types/apisix/v1/zz_generated.deepcopy.go | 23 +++
15 files changed, 587 insertions(+), 87 deletions(-)
diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index bec4d25..0e88f43 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -39,10 +39,11 @@ type Cluster interface {
Upstream() Upstream
// SSL returns a SSL interface that can operate SSL resources.
SSL() SSL
+ // StreamRoute returns a StreamRoute interface that can operate StreamRoute resources.
+ StreamRoute() StreamRoute
// String exposes the client information in human readable format.
String() string
- // Ready waits until all resources in APISIX cluster is synced to
- // cache.
+ // HasSynced checks whether all resources in APISIX cluster is synced to cache.
HasSynced(context.Context) error
}
@@ -76,6 +77,16 @@ type Upstream interface {
Update(context.Context, *v1.Upstream) (*v1.Upstream, error)
}
+// StreamRoute is the specific client interface to take over the create, update,
+// list and delete for APISIX's Stream Route resource.
+type StreamRoute interface {
+ Get(context.Context, string) (*v1.StreamRoute, error)
+ List(context.Context) ([]*v1.StreamRoute, error)
+ Create(context.Context, *v1.StreamRoute) (*v1.StreamRoute, error)
+ Delete(context.Context, *v1.StreamRoute) error
+ Update(context.Context, *v1.StreamRoute) (*v1.StreamRoute, error)
+}
+
type apisix struct {
nonExistentCluster Cluster
clusters map[string]Cluster
diff --git a/pkg/apisix/cache/cache.go b/pkg/apisix/cache/cache.go
index 8ea26be..c527157 100644
--- a/pkg/apisix/cache/cache.go
+++ b/pkg/apisix/cache/cache.go
@@ -29,6 +29,8 @@ type Cache interface {
InsertSSL(*v1.Ssl) error
// InsertUpstream adds or updates upstream to cache.
InsertUpstream(*v1.Upstream) error
+ // InsertStreamRoute adds or updates stream_route to cache.
+ InsertStreamRoute(*v1.StreamRoute) error
// GetRoute finds the route from cache according to the primary index (id).
GetRoute(string) (*v1.Route, error)
@@ -36,6 +38,8 @@ type Cache interface {
GetSSL(string) (*v1.Ssl, error)
// GetUpstream finds the upstream from cache according to the primary index (id).
GetUpstream(string) (*v1.Upstream, error)
+ // GetStreamRoute finds the stream_route from cache according to the primary index (id).
+ GetStreamRoute(string) (*v1.StreamRoute, error)
// ListRoutes lists all routes in cache.
ListRoutes() ([]*v1.Route, error)
@@ -43,6 +47,8 @@ type Cache interface {
ListSSL() ([]*v1.Ssl, error)
// ListUpstreams lists all upstreams in cache.
ListUpstreams() ([]*v1.Upstream, error)
+ // ListStreamRoutes lists all stream_route in cache.
+ ListStreamRoutes() ([]*v1.StreamRoute, error)
// DeleteRoute deletes the specified route in cache.
DeleteRoute(*v1.Route) error
@@ -50,4 +56,6 @@ type Cache interface {
DeleteSSL(*v1.Ssl) error
// DeleteUpstream deletes the specified upstream in cache.
DeleteUpstream(*v1.Upstream) error
+ // DeleteStreamRoute deletes the specified stream_route in cache.
+ DeleteStreamRoute(*v1.StreamRoute) error
}
diff --git a/pkg/apisix/cache/memdb.go b/pkg/apisix/cache/memdb.go
index b488e54..481cfe7 100644
--- a/pkg/apisix/cache/memdb.go
+++ b/pkg/apisix/cache/memdb.go
@@ -58,6 +58,10 @@ func (c *dbCache) InsertUpstream(u *v1.Upstream) error {
return c.insert("upstream", u.DeepCopy())
}
+func (c *dbCache) InsertStreamRoute(sr *v1.StreamRoute) error {
+ return c.insert("stream_route", sr.DeepCopy())
+}
+
func (c *dbCache) insert(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
@@ -92,6 +96,14 @@ func (c *dbCache) GetUpstream(id string) (*v1.Upstream, error) {
return obj.(*v1.Upstream).DeepCopy(), nil
}
+func (c *dbCache) GetStreamRoute(id string) (*v1.StreamRoute, error) {
+ obj, err := c.get("stream_route", id)
+ if err != nil {
+ return nil, err
+ }
+ return obj.(*v1.StreamRoute).DeepCopy(), nil
+}
+
func (c *dbCache) get(table, id string) (interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
@@ -144,6 +156,18 @@ func (c *dbCache) ListUpstreams() ([]*v1.Upstream, error) {
return upstreams, nil
}
+func (c *dbCache) ListStreamRoutes() ([]*v1.StreamRoute, error) {
+ raws, err := c.list("stream_route")
+ if err != nil {
+ return nil, err
+ }
+ streamRoutes := make([]*v1.StreamRoute, 0, len(raws))
+ for _, raw := range raws {
+ streamRoutes = append(streamRoutes, raw.(*v1.StreamRoute).DeepCopy())
+ }
+ return streamRoutes, nil
+}
+
func (c *dbCache) list(table string) ([]interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
@@ -173,6 +197,10 @@ func (c *dbCache) DeleteUpstream(u *v1.Upstream) error {
return c.delete("upstream", u)
}
+func (c *dbCache) DeleteStreamRoute(sr *v1.StreamRoute) error {
+ return c.delete("stream_route", sr)
+}
+
func (c *dbCache) delete(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
@@ -191,14 +219,19 @@ func (c *dbCache) checkUpstreamReference(u *v1.Upstream) error {
txn := c.db.Txn(false)
defer txn.Abort()
obj, err := txn.First("route", "upstream_id", u.ID)
- if err != nil {
- if err == memdb.ErrNotFound {
- return nil
- }
+ if err != nil && err != memdb.ErrNotFound {
return err
}
- if obj == nil {
- return nil
+ if obj != nil {
+ return ErrStillInUse
+ }
+
+ obj, err = txn.First("stream_route", "upstream_id", u.ID)
+ if err != nil && err != memdb.ErrNotFound {
+ return err
+ }
+ if obj != nil {
+ return ErrStillInUse
}
- return ErrStillInUse
+ return nil
}
diff --git a/pkg/apisix/cache/memdb_test.go b/pkg/apisix/cache/memdb_test.go
index da820f2..8e449ea 100644
--- a/pkg/apisix/cache/memdb_test.go
+++ b/pkg/apisix/cache/memdb_test.go
@@ -191,13 +191,71 @@ func TestMemDBCacheReference(t *testing.T) {
Name: "upstream",
},
}
+ u2 := &v1.Upstream{
+ Metadata: v1.Metadata{
+ ID: "2",
+ Name: "upstream",
+ },
+ }
+ sr := &v1.StreamRoute{
+ ID: "1",
+ UpstreamId: "2",
+ }
db, err := NewMemDBCache()
assert.Nil(t, err, "NewMemDBCache")
assert.Nil(t, db.InsertRoute(r))
assert.Nil(t, db.InsertUpstream(u))
+ assert.Nil(t, db.InsertStreamRoute(sr))
+ assert.Nil(t, db.InsertUpstream(u2))
assert.Error(t, ErrStillInUse, db.DeleteUpstream(u))
+ assert.Error(t, ErrStillInUse, db.DeleteUpstream(u2))
assert.Nil(t, db.DeleteRoute(r))
assert.Nil(t, db.DeleteUpstream(u))
+ assert.Nil(t, db.DeleteStreamRoute(sr))
+ assert.Nil(t, db.DeleteUpstream(u2))
+}
+
+func TestMemDBCacheStreamRoute(t *testing.T) {
+ c, err := NewMemDBCache()
+ assert.Nil(t, err, "NewMemDBCache")
+
+ r1 := &v1.StreamRoute{
+ ID: "1",
+ }
+ assert.Nil(t, c.InsertStreamRoute(r1), "inserting stream route 1")
+
+ r, err := c.GetStreamRoute("1")
+ assert.Nil(t, err)
+ assert.Equal(t, r1, r)
+
+ r2 := &v1.StreamRoute{
+ ID: "2",
+ }
+ r3 := &v1.StreamRoute{
+ ID: "3",
+ }
+ assert.Nil(t, c.InsertStreamRoute(r2), "inserting stream route r2")
+ assert.Nil(t, c.InsertStreamRoute(r3), "inserting stream route r3")
+
+ r, err = c.GetStreamRoute("3")
+ assert.Nil(t, err)
+ assert.Equal(t, r3, r)
+
+ assert.Nil(t, c.DeleteStreamRoute(r3), "delete stream route r3")
+
+ routes, err := c.ListStreamRoutes()
+ assert.Nil(t, err, "listing streams routes")
+
+ if routes[0].ID > routes[1].ID {
+ routes[0], routes[1] = routes[1], routes[0]
+ }
+ assert.Equal(t, routes[0], r1)
+ assert.Equal(t, routes[1], r2)
+
+ r4 := &v1.StreamRoute{
+ ID: "4",
+ }
+ assert.Error(t, ErrNotFound, c.DeleteStreamRoute(r4))
}
diff --git a/pkg/apisix/cache/schema.go b/pkg/apisix/cache/schema.go
index 40293f4..9db4ae7 100644
--- a/pkg/apisix/cache/schema.go
+++ b/pkg/apisix/cache/schema.go
@@ -70,6 +70,22 @@ var (
},
},
},
+ "stream_route": {
+ Name: "stream_route",
+ Indexes: map[string]*memdb.IndexSchema{
+ "id": {
+ Name: "id",
+ Unique: true,
+ Indexer: &memdb.StringFieldIndex{Field: "ID"},
+ },
+ "upstream_id": {
+ Name: "upstream_id",
+ Unique: false,
+ Indexer: &memdb.StringFieldIndex{Field: "UpstreamId"},
+ AllowMissing: true,
+ },
+ },
+ },
},
}
)
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index bb6db4b..362d5d5 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -52,7 +52,7 @@ var (
_errReadOnClosedResBody = errors.New("http: read on closed response body")
)
-// Options contains parameters to customize APISIX client.
+// ClusterOptions contains parameters to customize APISIX client.
type ClusterOptions struct {
Name string
AdminKey string
@@ -72,6 +72,7 @@ type cluster struct {
route Route
upstream Upstream
ssl SSL
+ streamRoute StreamRoute
}
func newCluster(o *ClusterOptions) (Cluster, error) {
@@ -101,6 +102,7 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
c.ssl = newSSLClient(c)
+ c.streamRoute = newStreamRouteClient(c)
go c.syncCache()
@@ -241,6 +243,11 @@ func (c *cluster) SSL() SSL {
return c.ssl
}
+// StreamRoute implements Cluster.StreamRoute method.
+func (c *cluster) StreamRoute() StreamRoute {
+ return c.streamRoute
+}
+
func (s *cluster) applyAuth(req *http.Request) {
if s.adminKey != "" {
req.Header.Set("X-API-Key", s.adminKey)
diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go
index cfc1522..3113bae 100644
--- a/pkg/apisix/nonexistentclient.go
+++ b/pkg/apisix/nonexistentclient.go
@@ -29,17 +29,19 @@ type nonExistentCluster struct {
func newNonExistentCluster() *nonExistentCluster {
return &nonExistentCluster{
embedDummyResourceImplementer{
- route: &dummyRoute{},
- ssl: &dummySSL{},
- upstream: &dummyUpstream{},
+ route: &dummyRoute{},
+ ssl: &dummySSL{},
+ upstream: &dummyUpstream{},
+ streamRoute: &dummyStreamRoute{},
},
}
}
type embedDummyResourceImplementer struct {
- route Route
- ssl SSL
- upstream Upstream
+ route Route
+ ssl SSL
+ upstream Upstream
+ streamRoute StreamRoute
}
type dummyRoute struct{}
@@ -108,6 +110,28 @@ func (f *dummyUpstream) Update(_ context.Context, _ *v1.Upstream) (*v1.Upstream,
return nil, ErrClusterNotExist
}
+type dummyStreamRoute struct{}
+
+func (f *dummyStreamRoute) Get(_ context.Context, _ string) (*v1.StreamRoute, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyStreamRoute) List(_ context.Context) ([]*v1.StreamRoute, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyStreamRoute) Create(_ context.Context, _ *v1.StreamRoute) (*v1.StreamRoute, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyStreamRoute) Delete(_ context.Context, _ *v1.StreamRoute) error {
+ return ErrClusterNotExist
+}
+
+func (f *dummyStreamRoute) Update(_ context.Context, _ *v1.StreamRoute) (*v1.StreamRoute, error) {
+ return nil, ErrClusterNotExist
+}
+
func (nc *nonExistentCluster) Route() Route {
return nc.route
}
@@ -120,6 +144,10 @@ func (nc *nonExistentCluster) Upstream() Upstream {
return nc.upstream
}
+func (nc *nonExistentCluster) StreamRoute() StreamRoute {
+ return nc.streamRoute
+}
+
func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
return nil
}
@@ -132,15 +160,19 @@ type dummyCache struct{}
var _ cache.Cache = &dummyCache{}
-func (c *dummyCache) InsertRoute(_ *v1.Route) error { return nil }
-func (c *dummyCache) InsertSSL(_ *v1.Ssl) error { return nil }
-func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error { return nil }
-func (c *dummyCache) GetRoute(_ string) (*v1.Route, error) { return nil, cache.ErrNotFound }
-func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error) { return nil, cache.ErrNotFound }
-func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) { return nil, cache.ErrNotFound }
-func (c *dummyCache) ListRoutes() ([]*v1.Route, error) { return nil, nil }
-func (c *dummyCache) ListSSL() ([]*v1.Ssl, error) { return nil, nil }
-func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error) { return nil, nil }
-func (c *dummyCache) DeleteRoute(_ *v1.Route) error { return nil }
-func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error { return nil }
-func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error { return nil }
+func (c *dummyCache) InsertRoute(_ *v1.Route) error { return nil }
+func (c *dummyCache) InsertSSL(_ *v1.Ssl) error { return nil }
+func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error { return nil }
+func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error { return nil }
+func (c *dummyCache) GetRoute(_ string) (*v1.Route, error) { return nil, cache.ErrNotFound }
+func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error) { return nil, cache.ErrNotFound }
+func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) { return nil, cache.ErrNotFound }
+func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) { return nil, cache.ErrNotFound }
+func (c *dummyCache) ListRoutes() ([]*v1.Route, error) { return nil, nil }
+func (c *dummyCache) ListSSL() ([]*v1.Ssl, error) { return nil, nil }
+func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error) { return nil, nil }
+func (c *dummyCache) ListStreamRoutes() ([]*v1.StreamRoute, error) { return nil, nil }
+func (c *dummyCache) DeleteRoute(_ *v1.Route) error { return nil }
+func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error { return nil }
+func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error { return nil }
+func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error { return nil }
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index 6be51fd..5526a66 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -87,6 +87,21 @@ func (i *item) route() (*v1.Route, error) {
return &route, nil
}
+// streamRoute decodes item.Value and converts it to v1.StreamRoute.
+func (i *item) streamRoute() (*v1.StreamRoute, error) {
+ log.Debugf("got stream_route: %s", string(i.Value))
+ list := strings.Split(i.Key, "/")
+ if len(list) < 1 {
+ return nil, fmt.Errorf("bad stream_route config key: %s", i.Key)
+ }
+
+ var streamRoute v1.StreamRoute
+ if err := json.Unmarshal(i.Value, &streamRoute); err != nil {
+ return nil, err
+ }
+ return &streamRoute, nil
+}
+
// upstream decodes item.Value and converts it to v1.Upstream.
func (i *item) upstream() (*v1.Upstream, error) {
log.Debugf("got upstream: %s", string(i.Value))
diff --git a/pkg/apisix/route.go b/pkg/apisix/route.go
index c724e79..ae0dba1 100644
--- a/pkg/apisix/route.go
+++ b/pkg/apisix/route.go
@@ -140,7 +140,7 @@ func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
log.Debugw("try to create route",
- zap.String("host", obj.Host),
+ zap.Strings("hosts", obj.Hosts),
zap.String("name", obj.Name),
zap.String("cluster", "default"),
zap.String("url", r.url),
@@ -204,7 +204,6 @@ func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, err
if err := r.cluster.HasSynced(ctx); err != nil {
return nil, err
}
- // FIXME use unified v1.Route, removing routeReqBody.
body, err := json.Marshal(obj)
if err != nil {
return nil, err
diff --git a/pkg/apisix/route.go b/pkg/apisix/stream_route.go
similarity index 55%
copy from pkg/apisix/route.go
copy to pkg/apisix/stream_route.go
index c724e79..d46eda4 100644
--- a/pkg/apisix/route.go
+++ b/pkg/apisix/stream_route.go
@@ -12,7 +12,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-
package apisix
import (
@@ -28,39 +27,39 @@ import (
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
-type routeClient struct {
+type streamRouteClient struct {
url string
cluster *cluster
}
-func newRouteClient(c *cluster) Route {
- return &routeClient{
- url: c.baseURL + "/routes",
+func newStreamRouteClient(c *cluster) StreamRoute {
+ return &streamRouteClient{
+ url: c.baseURL + "/stream_routes",
cluster: c,
}
}
-// Get returns the Route.
+// Get returns the StreamRoute.
// FIXME, currently if caller pass a non-existent resource, the Get always passes
// through cache.
-func (r *routeClient) Get(ctx context.Context, name string) (*v1.Route, error) {
- log.Debugw("try to look up route",
+func (r *streamRouteClient) Get(ctx context.Context, name string) (*v1.StreamRoute, error) {
+ log.Debugw("try to look up stream_route",
zap.String("name", name),
zap.String("url", r.url),
zap.String("cluster", "default"),
)
rid := id.GenID(name)
- route, err := r.cluster.cache.GetRoute(rid)
+ streamRoute, err := r.cluster.cache.GetStreamRoute(rid)
if err == nil {
- return route, nil
+ return streamRoute, nil
}
if err != cache.ErrNotFound {
- log.Errorw("failed to find route in cache, will try to lookup from APISIX",
+ log.Errorw("failed to find stream_route in cache, will try to lookup from APISIX",
zap.String("name", name),
zap.Error(err),
)
} else {
- log.Debugw("failed to find route in cache, will try to lookup from APISIX",
+ log.Debugw("failed to find stream_route in cache, will try to lookup from APISIX",
zap.String("name", name),
zap.Error(err),
)
@@ -71,13 +70,13 @@ func (r *routeClient) Get(ctx context.Context, name string) (*v1.Route, error) {
resp, err := r.cluster.getResource(ctx, url)
if err != nil {
if err == cache.ErrNotFound {
- log.Warnw("route not found",
+ log.Warnw("stream_route not found",
zap.String("name", name),
zap.String("url", url),
zap.String("cluster", "default"),
)
} else {
- log.Errorw("failed to get route from APISIX",
+ log.Errorw("failed to get stream_route from APISIX",
zap.String("name", name),
zap.String("url", url),
zap.String("cluster", "default"),
@@ -87,61 +86,60 @@ func (r *routeClient) Get(ctx context.Context, name string) (*v1.Route, error) {
return nil, err
}
- route, err = resp.Item.route()
+ streamRoute, err = resp.Item.streamRoute()
if err != nil {
- log.Errorw("failed to convert route item",
+ log.Errorw("failed to convert stream_route item",
zap.String("url", r.url),
- zap.String("route_key", resp.Item.Key),
- zap.String("route_value", string(resp.Item.Value)),
+ zap.String("stream_route_key", resp.Item.Key),
+ zap.String("stream_route_value", string(resp.Item.Value)),
zap.Error(err),
)
return nil, err
}
- if err := r.cluster.cache.InsertRoute(route); err != nil {
+ if err := r.cluster.cache.InsertStreamRoute(streamRoute); err != nil {
log.Errorf("failed to reflect route create to cache: %s", err)
return nil, err
}
- return route, nil
+ return streamRoute, nil
}
// List is only used in cache warming up. So here just pass through
// to APISIX.
-func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
- log.Debugw("try to list routes in APISIX",
+func (r *streamRouteClient) List(ctx context.Context) ([]*v1.StreamRoute, error) {
+ log.Debugw("try to list stream_routes in APISIX",
zap.String("cluster", "default"),
zap.String("url", r.url),
)
- routeItems, err := r.cluster.listResource(ctx, r.url)
+ streamRouteItems, err := r.cluster.listResource(ctx, r.url)
if err != nil {
- log.Errorf("failed to list routes: %s", err)
+ log.Errorf("failed to list stream_routes: %s", err)
return nil, err
}
- var items []*v1.Route
- for i, item := range routeItems.Node.Items {
- route, err := item.route()
+ var items []*v1.StreamRoute
+ for i, item := range streamRouteItems.Node.Items {
+ streamRoute, err := item.streamRoute()
if err != nil {
- log.Errorw("failed to convert route item",
+ log.Errorw("failed to convert stream_route item",
zap.String("url", r.url),
- zap.String("route_key", item.Key),
- zap.String("route_value", string(item.Value)),
+ zap.String("stream_route_key", item.Key),
+ zap.String("stream_route_value", string(item.Value)),
zap.Error(err),
)
return nil, err
}
- items = append(items, route)
- log.Debugf("list route #%d, body: %s", i, string(item.Value))
+ items = append(items, streamRoute)
+ log.Debugf("list stream_route #%d, body: %s", i, string(item.Value))
}
-
return items, nil
}
-func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
- log.Debugw("try to create route",
- zap.String("host", obj.Host),
- zap.String("name", obj.Name),
+func (r *streamRouteClient) Create(ctx context.Context, obj *v1.StreamRoute) (*v1.StreamRoute, error) {
+ log.Debugw("try to create stream_route",
+ zap.String("id", obj.ID),
+ zap.Int32("server_port", obj.ServerPort),
zap.String("cluster", "default"),
zap.String("url", r.url),
)
@@ -155,28 +153,27 @@ func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, err
}
url := r.url + "/" + obj.ID
- log.Debugw("creating route", zap.ByteString("body", data), zap.String("url", url))
+ log.Debugw("creating stream_route", zap.ByteString("body", data), zap.String("url", url))
resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
if err != nil {
- log.Errorf("failed to create route: %s", err)
+ log.Errorf("failed to create stream_route: %s", err)
return nil, err
}
- route, err := resp.Item.route()
+ streamRoute, err := resp.Item.streamRoute()
if err != nil {
return nil, err
}
- if err := r.cluster.cache.InsertRoute(route); err != nil {
- log.Errorf("failed to reflect route create to cache: %s", err)
+ if err := r.cluster.cache.InsertStreamRoute(streamRoute); err != nil {
+ log.Errorf("failed to reflect stream_route create to cache: %s", err)
return nil, err
}
- return route, nil
+ return streamRoute, nil
}
-func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
- log.Debugw("try to delete route",
+func (r *streamRouteClient) Delete(ctx context.Context, obj *v1.StreamRoute) error {
+ log.Debugw("try to delete stream_route",
zap.String("id", obj.ID),
- zap.String("name", obj.Name),
zap.String("cluster", "default"),
zap.String("url", r.url),
)
@@ -187,41 +184,39 @@ func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
if err := r.cluster.deleteResource(ctx, url); err != nil {
return err
}
- if err := r.cluster.cache.DeleteRoute(obj); err != nil {
- log.Errorf("failed to reflect route delete to cache: %s", err)
+ if err := r.cluster.cache.DeleteStreamRoute(obj); err != nil {
+ log.Errorf("failed to reflect stream_route delete to cache: %s", err)
return err
}
return nil
}
-func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
- log.Debugw("try to update route",
+func (r *streamRouteClient) Update(ctx context.Context, obj *v1.StreamRoute) (*v1.StreamRoute, error) {
+ log.Debugw("try to update stream_route",
zap.String("id", obj.ID),
- zap.String("name", obj.Name),
zap.String("cluster", "default"),
zap.String("url", r.url),
)
if err := r.cluster.HasSynced(ctx); err != nil {
return nil, err
}
- // FIXME use unified v1.Route, removing routeReqBody.
body, err := json.Marshal(obj)
if err != nil {
return nil, err
}
url := r.url + "/" + obj.ID
- log.Debugw("updating route", zap.ByteString("body", body), zap.String("url", url))
+ log.Debugw("updating stream_route", zap.ByteString("body", body), zap.String("url", url))
resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
if err != nil {
return nil, err
}
- route, err := resp.Item.route()
+ streamRoute, err := resp.Item.streamRoute()
if err != nil {
return nil, err
}
- if err := r.cluster.cache.InsertRoute(route); err != nil {
- log.Errorf("failed to reflect route update to cache: %s", err)
+ if err := r.cluster.cache.InsertStreamRoute(streamRoute); err != nil {
+ log.Errorf("failed to reflect stream_route update to cache: %s", err)
return nil, err
}
- return route, nil
+ return streamRoute, nil
}
diff --git a/pkg/apisix/stream_route_test.go b/pkg/apisix/stream_route_test.go
new file mode 100644
index 0000000..35411ca
--- /dev/null
+++ b/pkg/apisix/stream_route_test.go
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "sort"
+ "strconv"
+ "strings"
+ "testing"
+
+ v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+ "github.com/stretchr/testify/assert"
+ "golang.org/x/net/nettest"
+)
+
+type fakeAPISIXStreamRouteSrv struct {
+ streamRoute map[string]json.RawMessage
+}
+
+func (srv *fakeAPISIXStreamRouteSrv) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ defer r.Body.Close()
+
+ if !strings.HasPrefix(r.URL.Path, "/apisix/admin/stream_routes") {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ if r.Method == http.MethodGet {
+ resp := fakeListResp{
+ Count: strconv.Itoa(len(srv.streamRoute)),
+ Node: fakeNode{
+ Key: "/apisix/stream_routes",
+ },
+ }
+ var keys []string
+ for key := range srv.streamRoute {
+ keys = append(keys, key)
+ }
+ sort.Strings(keys)
+ for _, key := range keys {
+ resp.Node.Items = append(resp.Node.Items, fakeItem{
+ Key: key,
+ Value: srv.streamRoute[key],
+ })
+ }
+ w.WriteHeader(http.StatusOK)
+ data, _ := json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodDelete {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/stream_routes/")
+ id = "/apisix/stream_routes/" + id
+ code := http.StatusNotFound
+ if _, ok := srv.streamRoute[id]; ok {
+ delete(srv.streamRoute, id)
+ code = http.StatusOK
+ }
+ w.WriteHeader(code)
+ }
+
+ if r.Method == http.MethodPut {
+ paths := strings.Split(r.URL.Path, "/")
+ key := fmt.Sprintf("/apisix/stream_routes/%s", paths[len(paths)-1])
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.streamRoute[key] = data
+ w.WriteHeader(http.StatusCreated)
+ resp := fakeCreateResp{
+ Action: "create",
+ Node: fakeItem{
+ Key: key,
+ Value: json.RawMessage(data),
+ },
+ }
+ data, _ = json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodPatch {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/stream_routes/")
+ id = "/apisix/stream_routes/" + id
+ if _, ok := srv.streamRoute[id]; !ok {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.streamRoute[id] = data
+
+ w.WriteHeader(http.StatusOK)
+ output := fmt.Sprintf(`{"action": "compareAndSwap", "node": {"key": "%s", "value": %s}}`, id, string(data))
+ _, _ = w.Write([]byte(output))
+ return
+ }
+}
+
+func runFakeStreamRouteSrv(t *testing.T) *http.Server {
+ srv := &fakeAPISIXStreamRouteSrv{
+ streamRoute: make(map[string]json.RawMessage),
+ }
+
+ ln, _ := nettest.NewLocalListener("tcp")
+
+ httpSrv := &http.Server{
+ Addr: ln.Addr().String(),
+ Handler: srv,
+ }
+
+ go func() {
+ if err := httpSrv.Serve(ln); err != nil && err != http.ErrServerClosed {
+ t.Errorf("failed to run http server: %s", err)
+ }
+ }()
+
+ return httpSrv
+}
+
+func TestStreamRouteClient(t *testing.T) {
+ srv := runFakeStreamRouteSrv(t)
+ defer func() {
+ assert.Nil(t, srv.Shutdown(context.Background()))
+ }()
+
+ u := url.URL{
+ Scheme: "http",
+ Host: srv.Addr,
+ Path: "/apisix/admin",
+ }
+
+ closedCh := make(chan struct{})
+ close(closedCh)
+ cli := newStreamRouteClient(&cluster{
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ })
+
+ // Create
+ obj, err := cli.Create(context.Background(), &v1.StreamRoute{
+ ID: "1",
+ ServerPort: 8001,
+ UpstreamId: "1",
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, obj.ID, "1")
+
+ obj, err = cli.Create(context.Background(), &v1.StreamRoute{
+ ID: "2",
+ ServerPort: 8002,
+ UpstreamId: "1",
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, obj.ID, "2")
+
+ // List
+ objs, err := cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 2)
+ assert.Equal(t, objs[0].ID, "1")
+ assert.Equal(t, objs[1].ID, "2")
+
+ // Delete then List
+ assert.Nil(t, cli.Delete(context.Background(), objs[0]))
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", objs[0].ID)
+
+ // Patch then List
+ _, err = cli.Update(context.Background(), &v1.StreamRoute{
+ ID: "2",
+ UpstreamId: "112",
+ })
+ assert.Nil(t, err)
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", objs[0].ID)
+}
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index d1ad058..3d1bb72 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -71,6 +71,7 @@ type ApisixRoute struct {
// ApisixRouteSpec is the spec definition for ApisixRouteSpec.
type ApisixRouteSpec struct {
HTTP []*ApisixRouteHTTP `json:"http,omitempty" yaml:"http,omitempty"`
+ TCP []*ApisixRouteTCP `json:"tcp,omitempty" yaml:"tcp,omitempty"`
}
// ApisixRouteHTTP represents a single route in for HTTP traffic.
@@ -196,6 +197,35 @@ func (p *ApisixRouteHTTPPluginConfig) DeepCopy() *ApisixRouteHTTPPluginConfig {
return out
}
+// ApisixRouteTCP is the configuration for tcp route.
+type ApisixRouteTCP struct {
+ // The rule name, cannot be empty.
+ Name string `json:"name" yaml:"name"`
+ Match ApisixRouteTCPMatch `json:"match" yaml:"match"`
+ Backend ApisixRouteTCPBackend `json:"backend" yaml:"backend"`
+}
+
+// ApisixRouteTCPMatch represents the match conditions of tcp route.
+type ApisixRouteTCPMatch struct {
+ // IngressPort represents the port listening on the Ingress proxy server.
+ // It should be pre-defined as APISIX doesn't support dynamic listening.
+ IngressPort int32 `json:"ingressPort" yaml:"ingressPort"`
+}
+
+// ApisixRouteTCPBackend represents a TCP backend (a Kubernetes Service).
+type ApisixRouteTCPBackend struct {
+ // The name (short) of the service, note cross namespace is forbidden,
+ // so be sure the ApisixRoute and Service are in the same namespace.
+ ServiceName string `json:"serviceName" yaml:"serviceName"`
+ // The service port, could be the name or the port number.
+ ServicePort intstr.IntOrString `json:"servicePort" yaml:"servicePort"`
+ // The resolve granularity, can be "endpoints" or "service",
+ // when set to "endpoints", the pod ips will be used; other
+ // wise, the service ClusterIP or ExternalIP will be used,
+ // default is endpoints.
+ ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
+}
+
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ApisixRouteList struct {
metav1.TypeMeta `json:",inline" yaml:",inline"`
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go b/pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go
index d3e87e6..f35d721 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go
@@ -275,6 +275,17 @@ func (in *ApisixRouteSpec) DeepCopyInto(out *ApisixRouteSpec) {
}
}
}
+ if in.TCP != nil {
+ in, out := &in.TCP, &out.TCP
+ *out = make([]*ApisixRouteTCP, len(*in))
+ for i := range *in {
+ if (*in)[i] != nil {
+ in, out := &(*in)[i], &(*out)[i]
+ *out = new(ApisixRouteTCP)
+ **out = **in
+ }
+ }
+ }
return
}
@@ -287,3 +298,54 @@ func (in *ApisixRouteSpec) DeepCopy() *ApisixRouteSpec {
in.DeepCopyInto(out)
return out
}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixRouteTCP) DeepCopyInto(out *ApisixRouteTCP) {
+ *out = *in
+ out.Match = in.Match
+ out.Backend = in.Backend
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteTCP.
+func (in *ApisixRouteTCP) DeepCopy() *ApisixRouteTCP {
+ if in == nil {
+ return nil
+ }
+ out := new(ApisixRouteTCP)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixRouteTCPBackend) DeepCopyInto(out *ApisixRouteTCPBackend) {
+ *out = *in
+ out.ServicePort = in.ServicePort
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteTCPBackend.
+func (in *ApisixRouteTCPBackend) DeepCopy() *ApisixRouteTCPBackend {
+ if in == nil {
+ return nil
+ }
+ out := new(ApisixRouteTCPBackend)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixRouteTCPMatch) DeepCopyInto(out *ApisixRouteTCPMatch) {
+ *out = *in
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteTCPMatch.
+func (in *ApisixRouteTCPMatch) DeepCopy() *ApisixRouteTCPMatch {
+ if in == nil {
+ return nil
+ }
+ out := new(ApisixRouteTCPMatch)
+ in.DeepCopyInto(out)
+ return out
+}
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 1f43705..6ab1aa0 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -323,6 +323,17 @@ type TrafficSplitConfigRuleWeightedUpstream struct {
Weight int `json:"weight"`
}
+// StreamRoute represents the stream route object in APISIX.
+// +k8s:deepcopy-gen=true
+type StreamRoute struct {
+ // TODO metadata should use Metadata type
+ ID string `json:"id,omitempty" yaml:"id,omitempty"`
+ Desc string `json:"desc,omitempty" yaml:"desc,omitempty"`
+ Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
+ ServerPort int32 `json:"server_port,omitempty" yaml:"server_port,omitempty"`
+ UpstreamId string `json:"upstream_id,omitempty" yaml:"upstream_id,omitempty"`
+}
+
// NewDefaultUpstream returns an empty Upstream with default values.
func NewDefaultUpstream() *Upstream {
return &Upstream{
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index 50a6f1b..8fa8c11 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -123,6 +123,29 @@ func (in *Ssl) DeepCopy() *Ssl {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *StreamRoute) DeepCopyInto(out *StreamRoute) {
+ *out = *in
+ if in.Labels != nil {
+ in, out := &in.Labels, &out.Labels
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamRoute.
+func (in *StreamRoute) DeepCopy() *StreamRoute {
+ if in == nil {
+ return nil
+ }
+ out := new(StreamRoute)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StringOrSlice) DeepCopyInto(out *StringOrSlice) {
*out = *in
if in.SliceVal != nil {