You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by to...@apache.org on 2021/04/22 06:35:37 UTC

[apisix-ingress-controller] branch master updated: feat: add tcp route data structures and cache (#398)

This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c6f911  feat: add tcp route data structures and cache (#398)
3c6f911 is described below

commit 3c6f911714b344c0993ccddac4a529a8647008d5
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Thu Apr 22 14:35:30 2021 +0800

    feat: add tcp route data structures and cache (#398)
---
 pkg/apisix/apisix.go                               |  15 +-
 pkg/apisix/cache/cache.go                          |   8 +
 pkg/apisix/cache/memdb.go                          |  47 ++++-
 pkg/apisix/cache/memdb_test.go                     |  58 ++++++
 pkg/apisix/cache/schema.go                         |  16 ++
 pkg/apisix/cluster.go                              |   9 +-
 pkg/apisix/nonexistentclient.go                    |  68 +++++--
 pkg/apisix/resource.go                             |  15 ++
 pkg/apisix/route.go                                |   3 +-
 pkg/apisix/{route.go => stream_route.go}           | 109 ++++++-----
 pkg/apisix/stream_route_test.go                    | 200 +++++++++++++++++++++
 pkg/kube/apisix/apis/config/v2alpha1/types.go      |  30 ++++
 .../apis/config/v2alpha1/zz_generated.deepcopy.go  |  62 +++++++
 pkg/types/apisix/v1/types.go                       |  11 ++
 pkg/types/apisix/v1/zz_generated.deepcopy.go       |  23 +++
 15 files changed, 587 insertions(+), 87 deletions(-)

diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index bec4d25..0e88f43 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -39,10 +39,11 @@ type Cluster interface {
 	Upstream() Upstream
 	// SSL returns a SSL interface that can operate SSL resources.
 	SSL() SSL
+	// StreamRoute returns a StreamRoute interface that can operate StreamRoute resources.
+	StreamRoute() StreamRoute
 	// String exposes the client information in human readable format.
 	String() string
-	// Ready waits until all resources in APISIX cluster is synced to
-	// cache.
+	// HasSynced checks whether all resources in APISIX cluster is synced to cache.
 	HasSynced(context.Context) error
 }
 
@@ -76,6 +77,16 @@ type Upstream interface {
 	Update(context.Context, *v1.Upstream) (*v1.Upstream, error)
 }
 
+// StreamRoute is the specific client interface to take over the create, update,
+// list and delete for APISIX's Stream Route resource.
+type StreamRoute interface {
+	Get(context.Context, string) (*v1.StreamRoute, error)
+	List(context.Context) ([]*v1.StreamRoute, error)
+	Create(context.Context, *v1.StreamRoute) (*v1.StreamRoute, error)
+	Delete(context.Context, *v1.StreamRoute) error
+	Update(context.Context, *v1.StreamRoute) (*v1.StreamRoute, error)
+}
+
 type apisix struct {
 	nonExistentCluster Cluster
 	clusters           map[string]Cluster
diff --git a/pkg/apisix/cache/cache.go b/pkg/apisix/cache/cache.go
index 8ea26be..c527157 100644
--- a/pkg/apisix/cache/cache.go
+++ b/pkg/apisix/cache/cache.go
@@ -29,6 +29,8 @@ type Cache interface {
 	InsertSSL(*v1.Ssl) error
 	// InsertUpstream adds or updates upstream to cache.
 	InsertUpstream(*v1.Upstream) error
+	// InsertStreamRoute adds or updates stream_route to cache.
+	InsertStreamRoute(*v1.StreamRoute) error
 
 	// GetRoute finds the route from cache according to the primary index (id).
 	GetRoute(string) (*v1.Route, error)
@@ -36,6 +38,8 @@ type Cache interface {
 	GetSSL(string) (*v1.Ssl, error)
 	// GetUpstream finds the upstream from cache according to the primary index (id).
 	GetUpstream(string) (*v1.Upstream, error)
+	// GetStreamRoute finds the stream_route from cache according to the primary index (id).
+	GetStreamRoute(string) (*v1.StreamRoute, error)
 
 	// ListRoutes lists all routes in cache.
 	ListRoutes() ([]*v1.Route, error)
@@ -43,6 +47,8 @@ type Cache interface {
 	ListSSL() ([]*v1.Ssl, error)
 	// ListUpstreams lists all upstreams in cache.
 	ListUpstreams() ([]*v1.Upstream, error)
+	// ListStreamRoutes lists all stream_route in cache.
+	ListStreamRoutes() ([]*v1.StreamRoute, error)
 
 	// DeleteRoute deletes the specified route in cache.
 	DeleteRoute(*v1.Route) error
@@ -50,4 +56,6 @@ type Cache interface {
 	DeleteSSL(*v1.Ssl) error
 	// DeleteUpstream deletes the specified upstream in cache.
 	DeleteUpstream(*v1.Upstream) error
+	// DeleteStreamRoute deletes the specified stream_route in cache.
+	DeleteStreamRoute(*v1.StreamRoute) error
 }
diff --git a/pkg/apisix/cache/memdb.go b/pkg/apisix/cache/memdb.go
index b488e54..481cfe7 100644
--- a/pkg/apisix/cache/memdb.go
+++ b/pkg/apisix/cache/memdb.go
@@ -58,6 +58,10 @@ func (c *dbCache) InsertUpstream(u *v1.Upstream) error {
 	return c.insert("upstream", u.DeepCopy())
 }
 
+func (c *dbCache) InsertStreamRoute(sr *v1.StreamRoute) error {
+	return c.insert("stream_route", sr.DeepCopy())
+}
+
 func (c *dbCache) insert(table string, obj interface{}) error {
 	txn := c.db.Txn(true)
 	defer txn.Abort()
@@ -92,6 +96,14 @@ func (c *dbCache) GetUpstream(id string) (*v1.Upstream, error) {
 	return obj.(*v1.Upstream).DeepCopy(), nil
 }
 
+func (c *dbCache) GetStreamRoute(id string) (*v1.StreamRoute, error) {
+	obj, err := c.get("stream_route", id)
+	if err != nil {
+		return nil, err
+	}
+	return obj.(*v1.StreamRoute).DeepCopy(), nil
+}
+
 func (c *dbCache) get(table, id string) (interface{}, error) {
 	txn := c.db.Txn(false)
 	defer txn.Abort()
@@ -144,6 +156,18 @@ func (c *dbCache) ListUpstreams() ([]*v1.Upstream, error) {
 	return upstreams, nil
 }
 
+func (c *dbCache) ListStreamRoutes() ([]*v1.StreamRoute, error) {
+	raws, err := c.list("stream_route")
+	if err != nil {
+		return nil, err
+	}
+	streamRoutes := make([]*v1.StreamRoute, 0, len(raws))
+	for _, raw := range raws {
+		streamRoutes = append(streamRoutes, raw.(*v1.StreamRoute).DeepCopy())
+	}
+	return streamRoutes, nil
+}
+
 func (c *dbCache) list(table string) ([]interface{}, error) {
 	txn := c.db.Txn(false)
 	defer txn.Abort()
@@ -173,6 +197,10 @@ func (c *dbCache) DeleteUpstream(u *v1.Upstream) error {
 	return c.delete("upstream", u)
 }
 
+func (c *dbCache) DeleteStreamRoute(sr *v1.StreamRoute) error {
+	return c.delete("stream_route", sr)
+}
+
 func (c *dbCache) delete(table string, obj interface{}) error {
 	txn := c.db.Txn(true)
 	defer txn.Abort()
@@ -191,14 +219,19 @@ func (c *dbCache) checkUpstreamReference(u *v1.Upstream) error {
 	txn := c.db.Txn(false)
 	defer txn.Abort()
 	obj, err := txn.First("route", "upstream_id", u.ID)
-	if err != nil {
-		if err == memdb.ErrNotFound {
-			return nil
-		}
+	if err != nil && err != memdb.ErrNotFound {
 		return err
 	}
-	if obj == nil {
-		return nil
+	if obj != nil {
+		return ErrStillInUse
+	}
+
+	obj, err = txn.First("stream_route", "upstream_id", u.ID)
+	if err != nil && err != memdb.ErrNotFound {
+		return err
+	}
+	if obj != nil {
+		return ErrStillInUse
 	}
-	return ErrStillInUse
+	return nil
 }
diff --git a/pkg/apisix/cache/memdb_test.go b/pkg/apisix/cache/memdb_test.go
index da820f2..8e449ea 100644
--- a/pkg/apisix/cache/memdb_test.go
+++ b/pkg/apisix/cache/memdb_test.go
@@ -191,13 +191,71 @@ func TestMemDBCacheReference(t *testing.T) {
 			Name: "upstream",
 		},
 	}
+	u2 := &v1.Upstream{
+		Metadata: v1.Metadata{
+			ID:   "2",
+			Name: "upstream",
+		},
+	}
+	sr := &v1.StreamRoute{
+		ID:         "1",
+		UpstreamId: "2",
+	}
 
 	db, err := NewMemDBCache()
 	assert.Nil(t, err, "NewMemDBCache")
 	assert.Nil(t, db.InsertRoute(r))
 	assert.Nil(t, db.InsertUpstream(u))
+	assert.Nil(t, db.InsertStreamRoute(sr))
+	assert.Nil(t, db.InsertUpstream(u2))
 
 	assert.Error(t, ErrStillInUse, db.DeleteUpstream(u))
+	assert.Error(t, ErrStillInUse, db.DeleteUpstream(u2))
 	assert.Nil(t, db.DeleteRoute(r))
 	assert.Nil(t, db.DeleteUpstream(u))
+	assert.Nil(t, db.DeleteStreamRoute(sr))
+	assert.Nil(t, db.DeleteUpstream(u2))
+}
+
+func TestMemDBCacheStreamRoute(t *testing.T) {
+	c, err := NewMemDBCache()
+	assert.Nil(t, err, "NewMemDBCache")
+
+	r1 := &v1.StreamRoute{
+		ID: "1",
+	}
+	assert.Nil(t, c.InsertStreamRoute(r1), "inserting stream route 1")
+
+	r, err := c.GetStreamRoute("1")
+	assert.Nil(t, err)
+	assert.Equal(t, r1, r)
+
+	r2 := &v1.StreamRoute{
+		ID: "2",
+	}
+	r3 := &v1.StreamRoute{
+		ID: "3",
+	}
+	assert.Nil(t, c.InsertStreamRoute(r2), "inserting stream route r2")
+	assert.Nil(t, c.InsertStreamRoute(r3), "inserting stream route r3")
+
+	r, err = c.GetStreamRoute("3")
+	assert.Nil(t, err)
+	assert.Equal(t, r3, r)
+
+	assert.Nil(t, c.DeleteStreamRoute(r3), "delete stream route r3")
+
+	routes, err := c.ListStreamRoutes()
+	assert.Nil(t, err, "listing streams routes")
+
+	if routes[0].ID > routes[1].ID {
+		routes[0], routes[1] = routes[1], routes[0]
+	}
+	assert.Equal(t, routes[0], r1)
+	assert.Equal(t, routes[1], r2)
+
+	r4 := &v1.StreamRoute{
+		ID: "4",
+	}
+	assert.Error(t, ErrNotFound, c.DeleteStreamRoute(r4))
 }
diff --git a/pkg/apisix/cache/schema.go b/pkg/apisix/cache/schema.go
index 40293f4..9db4ae7 100644
--- a/pkg/apisix/cache/schema.go
+++ b/pkg/apisix/cache/schema.go
@@ -70,6 +70,22 @@ var (
 					},
 				},
 			},
+			"stream_route": {
+				Name: "stream_route",
+				Indexes: map[string]*memdb.IndexSchema{
+					"id": {
+						Name:    "id",
+						Unique:  true,
+						Indexer: &memdb.StringFieldIndex{Field: "ID"},
+					},
+					"upstream_id": {
+						Name:         "upstream_id",
+						Unique:       false,
+						Indexer:      &memdb.StringFieldIndex{Field: "UpstreamId"},
+						AllowMissing: true,
+					},
+				},
+			},
 		},
 	}
 )
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index bb6db4b..362d5d5 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -52,7 +52,7 @@ var (
 	_errReadOnClosedResBody = errors.New("http: read on closed response body")
 )
 
-// Options contains parameters to customize APISIX client.
+// ClusterOptions contains parameters to customize APISIX client.
 type ClusterOptions struct {
 	Name     string
 	AdminKey string
@@ -72,6 +72,7 @@ type cluster struct {
 	route        Route
 	upstream     Upstream
 	ssl          SSL
+	streamRoute  StreamRoute
 }
 
 func newCluster(o *ClusterOptions) (Cluster, error) {
@@ -101,6 +102,7 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
 	c.route = newRouteClient(c)
 	c.upstream = newUpstreamClient(c)
 	c.ssl = newSSLClient(c)
+	c.streamRoute = newStreamRouteClient(c)
 
 	go c.syncCache()
 
@@ -241,6 +243,11 @@ func (c *cluster) SSL() SSL {
 	return c.ssl
 }
 
+// StreamRoute implements Cluster.StreamRoute method.
+func (c *cluster) StreamRoute() StreamRoute {
+	return c.streamRoute
+}
+
 func (s *cluster) applyAuth(req *http.Request) {
 	if s.adminKey != "" {
 		req.Header.Set("X-API-Key", s.adminKey)
diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go
index cfc1522..3113bae 100644
--- a/pkg/apisix/nonexistentclient.go
+++ b/pkg/apisix/nonexistentclient.go
@@ -29,17 +29,19 @@ type nonExistentCluster struct {
 func newNonExistentCluster() *nonExistentCluster {
 	return &nonExistentCluster{
 		embedDummyResourceImplementer{
-			route:    &dummyRoute{},
-			ssl:      &dummySSL{},
-			upstream: &dummyUpstream{},
+			route:       &dummyRoute{},
+			ssl:         &dummySSL{},
+			upstream:    &dummyUpstream{},
+			streamRoute: &dummyStreamRoute{},
 		},
 	}
 }
 
 type embedDummyResourceImplementer struct {
-	route    Route
-	ssl      SSL
-	upstream Upstream
+	route       Route
+	ssl         SSL
+	upstream    Upstream
+	streamRoute StreamRoute
 }
 
 type dummyRoute struct{}
@@ -108,6 +110,28 @@ func (f *dummyUpstream) Update(_ context.Context, _ *v1.Upstream) (*v1.Upstream,
 	return nil, ErrClusterNotExist
 }
 
+type dummyStreamRoute struct{}
+
+func (f *dummyStreamRoute) Get(_ context.Context, _ string) (*v1.StreamRoute, error) {
+	return nil, ErrClusterNotExist
+}
+
+func (f *dummyStreamRoute) List(_ context.Context) ([]*v1.StreamRoute, error) {
+	return nil, ErrClusterNotExist
+}
+
+func (f *dummyStreamRoute) Create(_ context.Context, _ *v1.StreamRoute) (*v1.StreamRoute, error) {
+	return nil, ErrClusterNotExist
+}
+
+func (f *dummyStreamRoute) Delete(_ context.Context, _ *v1.StreamRoute) error {
+	return ErrClusterNotExist
+}
+
+func (f *dummyStreamRoute) Update(_ context.Context, _ *v1.StreamRoute) (*v1.StreamRoute, error) {
+	return nil, ErrClusterNotExist
+}
+
 func (nc *nonExistentCluster) Route() Route {
 	return nc.route
 }
@@ -120,6 +144,10 @@ func (nc *nonExistentCluster) Upstream() Upstream {
 	return nc.upstream
 }
 
+func (nc *nonExistentCluster) StreamRoute() StreamRoute {
+	return nc.streamRoute
+}
+
 func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
 	return nil
 }
@@ -132,15 +160,19 @@ type dummyCache struct{}
 
 var _ cache.Cache = &dummyCache{}
 
-func (c *dummyCache) InsertRoute(_ *v1.Route) error              { return nil }
-func (c *dummyCache) InsertSSL(_ *v1.Ssl) error                  { return nil }
-func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error        { return nil }
-func (c *dummyCache) GetRoute(_ string) (*v1.Route, error)       { return nil, cache.ErrNotFound }
-func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error)           { return nil, cache.ErrNotFound }
-func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) { return nil, cache.ErrNotFound }
-func (c *dummyCache) ListRoutes() ([]*v1.Route, error)           { return nil, nil }
-func (c *dummyCache) ListSSL() ([]*v1.Ssl, error)                { return nil, nil }
-func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error)     { return nil, nil }
-func (c *dummyCache) DeleteRoute(_ *v1.Route) error              { return nil }
-func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error                  { return nil }
-func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error        { return nil }
+func (c *dummyCache) InsertRoute(_ *v1.Route) error                    { return nil }
+func (c *dummyCache) InsertSSL(_ *v1.Ssl) error                        { return nil }
+func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error              { return nil }
+func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error        { return nil }
+func (c *dummyCache) GetRoute(_ string) (*v1.Route, error)             { return nil, cache.ErrNotFound }
+func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error)                 { return nil, cache.ErrNotFound }
+func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error)       { return nil, cache.ErrNotFound }
+func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) { return nil, cache.ErrNotFound }
+func (c *dummyCache) ListRoutes() ([]*v1.Route, error)                 { return nil, nil }
+func (c *dummyCache) ListSSL() ([]*v1.Ssl, error)                      { return nil, nil }
+func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error)           { return nil, nil }
+func (c *dummyCache) ListStreamRoutes() ([]*v1.StreamRoute, error)     { return nil, nil }
+func (c *dummyCache) DeleteRoute(_ *v1.Route) error                    { return nil }
+func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error                        { return nil }
+func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error              { return nil }
+func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error        { return nil }
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index 6be51fd..5526a66 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -87,6 +87,21 @@ func (i *item) route() (*v1.Route, error) {
 	return &route, nil
 }
 
+// streamRoute decodes item.Value and converts it to v1.StreamRoute.
+func (i *item) streamRoute() (*v1.StreamRoute, error) {
+	log.Debugf("got stream_route: %s", string(i.Value))
+	list := strings.Split(i.Key, "/")
+	if len(list) < 1 {
+		return nil, fmt.Errorf("bad stream_route config key: %s", i.Key)
+	}
+
+	var streamRoute v1.StreamRoute
+	if err := json.Unmarshal(i.Value, &streamRoute); err != nil {
+		return nil, err
+	}
+	return &streamRoute, nil
+}
+
 // upstream decodes item.Value and converts it to v1.Upstream.
 func (i *item) upstream() (*v1.Upstream, error) {
 	log.Debugf("got upstream: %s", string(i.Value))
diff --git a/pkg/apisix/route.go b/pkg/apisix/route.go
index c724e79..ae0dba1 100644
--- a/pkg/apisix/route.go
+++ b/pkg/apisix/route.go
@@ -140,7 +140,7 @@ func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
 
 func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
 	log.Debugw("try to create route",
-		zap.String("host", obj.Host),
+		zap.Strings("hosts", obj.Hosts),
 		zap.String("name", obj.Name),
 		zap.String("cluster", "default"),
 		zap.String("url", r.url),
@@ -204,7 +204,6 @@ func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, err
 	if err := r.cluster.HasSynced(ctx); err != nil {
 		return nil, err
 	}
-	// FIXME use unified v1.Route, removing routeReqBody.
 	body, err := json.Marshal(obj)
 	if err != nil {
 		return nil, err
diff --git a/pkg/apisix/route.go b/pkg/apisix/stream_route.go
similarity index 55%
copy from pkg/apisix/route.go
copy to pkg/apisix/stream_route.go
index c724e79..d46eda4 100644
--- a/pkg/apisix/route.go
+++ b/pkg/apisix/stream_route.go
@@ -12,7 +12,6 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-
 package apisix
 
 import (
@@ -28,39 +27,39 @@ import (
 	v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
-type routeClient struct {
+type streamRouteClient struct {
 	url     string
 	cluster *cluster
 }
 
-func newRouteClient(c *cluster) Route {
-	return &routeClient{
-		url:     c.baseURL + "/routes",
+func newStreamRouteClient(c *cluster) StreamRoute {
+	return &streamRouteClient{
+		url:     c.baseURL + "/stream_routes",
 		cluster: c,
 	}
 }
 
-// Get returns the Route.
+// Get returns the StreamRoute.
 // FIXME, currently if caller pass a non-existent resource, the Get always passes
 // through cache.
-func (r *routeClient) Get(ctx context.Context, name string) (*v1.Route, error) {
-	log.Debugw("try to look up route",
+func (r *streamRouteClient) Get(ctx context.Context, name string) (*v1.StreamRoute, error) {
+	log.Debugw("try to look up stream_route",
 		zap.String("name", name),
 		zap.String("url", r.url),
 		zap.String("cluster", "default"),
 	)
 	rid := id.GenID(name)
-	route, err := r.cluster.cache.GetRoute(rid)
+	streamRoute, err := r.cluster.cache.GetStreamRoute(rid)
 	if err == nil {
-		return route, nil
+		return streamRoute, nil
 	}
 	if err != cache.ErrNotFound {
-		log.Errorw("failed to find route in cache, will try to lookup from APISIX",
+		log.Errorw("failed to find stream_route in cache, will try to lookup from APISIX",
 			zap.String("name", name),
 			zap.Error(err),
 		)
 	} else {
-		log.Debugw("failed to find route in cache, will try to lookup from APISIX",
+		log.Debugw("failed to find stream_route in cache, will try to lookup from APISIX",
 			zap.String("name", name),
 			zap.Error(err),
 		)
@@ -71,13 +70,13 @@ func (r *routeClient) Get(ctx context.Context, name string) (*v1.Route, error) {
 	resp, err := r.cluster.getResource(ctx, url)
 	if err != nil {
 		if err == cache.ErrNotFound {
-			log.Warnw("route not found",
+			log.Warnw("stream_route not found",
 				zap.String("name", name),
 				zap.String("url", url),
 				zap.String("cluster", "default"),
 			)
 		} else {
-			log.Errorw("failed to get route from APISIX",
+			log.Errorw("failed to get stream_route from APISIX",
 				zap.String("name", name),
 				zap.String("url", url),
 				zap.String("cluster", "default"),
@@ -87,61 +86,60 @@ func (r *routeClient) Get(ctx context.Context, name string) (*v1.Route, error) {
 		return nil, err
 	}
 
-	route, err = resp.Item.route()
+	streamRoute, err = resp.Item.streamRoute()
 	if err != nil {
-		log.Errorw("failed to convert route item",
+		log.Errorw("failed to convert stream_route item",
 			zap.String("url", r.url),
-			zap.String("route_key", resp.Item.Key),
-			zap.String("route_value", string(resp.Item.Value)),
+			zap.String("stream_route_key", resp.Item.Key),
+			zap.String("stream_route_value", string(resp.Item.Value)),
 			zap.Error(err),
 		)
 		return nil, err
 	}
 
-	if err := r.cluster.cache.InsertRoute(route); err != nil {
+	if err := r.cluster.cache.InsertStreamRoute(streamRoute); err != nil {
 		log.Errorf("failed to reflect route create to cache: %s", err)
 		return nil, err
 	}
-	return route, nil
+	return streamRoute, nil
 }
 
 // List is only used in cache warming up. So here just pass through
 // to APISIX.
-func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
-	log.Debugw("try to list routes in APISIX",
+func (r *streamRouteClient) List(ctx context.Context) ([]*v1.StreamRoute, error) {
+	log.Debugw("try to list stream_routes in APISIX",
 		zap.String("cluster", "default"),
 		zap.String("url", r.url),
 	)
-	routeItems, err := r.cluster.listResource(ctx, r.url)
+	streamRouteItems, err := r.cluster.listResource(ctx, r.url)
 	if err != nil {
-		log.Errorf("failed to list routes: %s", err)
+		log.Errorf("failed to list stream_routes: %s", err)
 		return nil, err
 	}
 
-	var items []*v1.Route
-	for i, item := range routeItems.Node.Items {
-		route, err := item.route()
+	var items []*v1.StreamRoute
+	for i, item := range streamRouteItems.Node.Items {
+		streamRoute, err := item.streamRoute()
 		if err != nil {
-			log.Errorw("failed to convert route item",
+			log.Errorw("failed to convert stream_route item",
 				zap.String("url", r.url),
-				zap.String("route_key", item.Key),
-				zap.String("route_value", string(item.Value)),
+				zap.String("stream_route_key", item.Key),
+				zap.String("stream_route_value", string(item.Value)),
 				zap.Error(err),
 			)
 			return nil, err
 		}
 
-		items = append(items, route)
-		log.Debugf("list route #%d, body: %s", i, string(item.Value))
+		items = append(items, streamRoute)
+		log.Debugf("list stream_route #%d, body: %s", i, string(item.Value))
 	}
-
 	return items, nil
 }
 
-func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
-	log.Debugw("try to create route",
-		zap.String("host", obj.Host),
-		zap.String("name", obj.Name),
+func (r *streamRouteClient) Create(ctx context.Context, obj *v1.StreamRoute) (*v1.StreamRoute, error) {
+	log.Debugw("try to create stream_route",
+		zap.String("id", obj.ID),
+		zap.Int32("server_port", obj.ServerPort),
 		zap.String("cluster", "default"),
 		zap.String("url", r.url),
 	)
@@ -155,28 +153,27 @@ func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, err
 	}
 
 	url := r.url + "/" + obj.ID
-	log.Debugw("creating route", zap.ByteString("body", data), zap.String("url", url))
+	log.Debugw("creating stream_route", zap.ByteString("body", data), zap.String("url", url))
 	resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
 	if err != nil {
-		log.Errorf("failed to create route: %s", err)
+		log.Errorf("failed to create stream_route: %s", err)
 		return nil, err
 	}
 
-	route, err := resp.Item.route()
+	streamRoute, err := resp.Item.streamRoute()
 	if err != nil {
 		return nil, err
 	}
-	if err := r.cluster.cache.InsertRoute(route); err != nil {
-		log.Errorf("failed to reflect route create to cache: %s", err)
+	if err := r.cluster.cache.InsertStreamRoute(streamRoute); err != nil {
+		log.Errorf("failed to reflect stream_route create to cache: %s", err)
 		return nil, err
 	}
-	return route, nil
+	return streamRoute, nil
 }
 
-func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
-	log.Debugw("try to delete route",
+func (r *streamRouteClient) Delete(ctx context.Context, obj *v1.StreamRoute) error {
+	log.Debugw("try to delete stream_route",
 		zap.String("id", obj.ID),
-		zap.String("name", obj.Name),
 		zap.String("cluster", "default"),
 		zap.String("url", r.url),
 	)
@@ -187,41 +184,39 @@ func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
 	if err := r.cluster.deleteResource(ctx, url); err != nil {
 		return err
 	}
-	if err := r.cluster.cache.DeleteRoute(obj); err != nil {
-		log.Errorf("failed to reflect route delete to cache: %s", err)
+	if err := r.cluster.cache.DeleteStreamRoute(obj); err != nil {
+		log.Errorf("failed to reflect stream_route delete to cache: %s", err)
 		return err
 	}
 	return nil
 }
 
-func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
-	log.Debugw("try to update route",
+func (r *streamRouteClient) Update(ctx context.Context, obj *v1.StreamRoute) (*v1.StreamRoute, error) {
+	log.Debugw("try to update stream_route",
 		zap.String("id", obj.ID),
-		zap.String("name", obj.Name),
 		zap.String("cluster", "default"),
 		zap.String("url", r.url),
 	)
 	if err := r.cluster.HasSynced(ctx); err != nil {
 		return nil, err
 	}
-	// FIXME use unified v1.Route, removing routeReqBody.
 	body, err := json.Marshal(obj)
 	if err != nil {
 		return nil, err
 	}
 	url := r.url + "/" + obj.ID
-	log.Debugw("updating route", zap.ByteString("body", body), zap.String("url", url))
+	log.Debugw("updating stream_route", zap.ByteString("body", body), zap.String("url", url))
 	resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
 	if err != nil {
 		return nil, err
 	}
-	route, err := resp.Item.route()
+	streamRoute, err := resp.Item.streamRoute()
 	if err != nil {
 		return nil, err
 	}
-	if err := r.cluster.cache.InsertRoute(route); err != nil {
-		log.Errorf("failed to reflect route update to cache: %s", err)
+	if err := r.cluster.cache.InsertStreamRoute(streamRoute); err != nil {
+		log.Errorf("failed to reflect stream_route update to cache: %s", err)
 		return nil, err
 	}
-	return route, nil
+	return streamRoute, nil
 }
diff --git a/pkg/apisix/stream_route_test.go b/pkg/apisix/stream_route_test.go
new file mode 100644
index 0000000..35411ca
--- /dev/null
+++ b/pkg/apisix/stream_route_test.go
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"sort"
+	"strconv"
+	"strings"
+	"testing"
+
+	v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+	"github.com/stretchr/testify/assert"
+	"golang.org/x/net/nettest"
+)
+
+type fakeAPISIXStreamRouteSrv struct {
+	streamRoute map[string]json.RawMessage
+}
+
+func (srv *fakeAPISIXStreamRouteSrv) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	if !strings.HasPrefix(r.URL.Path, "/apisix/admin/stream_routes") {
+		w.WriteHeader(http.StatusNotFound)
+		return
+	}
+
+	if r.Method == http.MethodGet {
+		resp := fakeListResp{
+			Count: strconv.Itoa(len(srv.streamRoute)),
+			Node: fakeNode{
+				Key: "/apisix/stream_routes",
+			},
+		}
+		var keys []string
+		for key := range srv.streamRoute {
+			keys = append(keys, key)
+		}
+		sort.Strings(keys)
+		for _, key := range keys {
+			resp.Node.Items = append(resp.Node.Items, fakeItem{
+				Key:   key,
+				Value: srv.streamRoute[key],
+			})
+		}
+		w.WriteHeader(http.StatusOK)
+		data, _ := json.Marshal(resp)
+		_, _ = w.Write(data)
+		return
+	}
+
+	if r.Method == http.MethodDelete {
+		id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/stream_routes/")
+		id = "/apisix/stream_routes/" + id
+		code := http.StatusNotFound
+		if _, ok := srv.streamRoute[id]; ok {
+			delete(srv.streamRoute, id)
+			code = http.StatusOK
+		}
+		w.WriteHeader(code)
+	}
+
+	if r.Method == http.MethodPut {
+		paths := strings.Split(r.URL.Path, "/")
+		key := fmt.Sprintf("/apisix/stream_routes/%s", paths[len(paths)-1])
+		data, _ := ioutil.ReadAll(r.Body)
+		srv.streamRoute[key] = data
+		w.WriteHeader(http.StatusCreated)
+		resp := fakeCreateResp{
+			Action: "create",
+			Node: fakeItem{
+				Key:   key,
+				Value: json.RawMessage(data),
+			},
+		}
+		data, _ = json.Marshal(resp)
+		_, _ = w.Write(data)
+		return
+	}
+
+	if r.Method == http.MethodPatch {
+		id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/stream_routes/")
+		id = "/apisix/stream_routes/" + id
+		if _, ok := srv.streamRoute[id]; !ok {
+			w.WriteHeader(http.StatusNotFound)
+			return
+		}
+
+		data, _ := ioutil.ReadAll(r.Body)
+		srv.streamRoute[id] = data
+
+		w.WriteHeader(http.StatusOK)
+		output := fmt.Sprintf(`{"action": "compareAndSwap", "node": {"key": "%s", "value": %s}}`, id, string(data))
+		_, _ = w.Write([]byte(output))
+		return
+	}
+}
+
+func runFakeStreamRouteSrv(t *testing.T) *http.Server {
+	srv := &fakeAPISIXStreamRouteSrv{
+		streamRoute: make(map[string]json.RawMessage),
+	}
+
+	ln, _ := nettest.NewLocalListener("tcp")
+
+	httpSrv := &http.Server{
+		Addr:    ln.Addr().String(),
+		Handler: srv,
+	}
+
+	go func() {
+		if err := httpSrv.Serve(ln); err != nil && err != http.ErrServerClosed {
+			t.Errorf("failed to run http server: %s", err)
+		}
+	}()
+
+	return httpSrv
+}
+
+func TestStreamRouteClient(t *testing.T) {
+	srv := runFakeStreamRouteSrv(t)
+	defer func() {
+		assert.Nil(t, srv.Shutdown(context.Background()))
+	}()
+
+	u := url.URL{
+		Scheme: "http",
+		Host:   srv.Addr,
+		Path:   "/apisix/admin",
+	}
+
+	closedCh := make(chan struct{})
+	close(closedCh)
+	cli := newStreamRouteClient(&cluster{
+		baseURL:     u.String(),
+		cli:         http.DefaultClient,
+		cache:       &dummyCache{},
+		cacheSynced: closedCh,
+	})
+
+	// Create
+	obj, err := cli.Create(context.Background(), &v1.StreamRoute{
+		ID:         "1",
+		ServerPort: 8001,
+		UpstreamId: "1",
+	})
+	assert.Nil(t, err)
+	assert.Equal(t, obj.ID, "1")
+
+	obj, err = cli.Create(context.Background(), &v1.StreamRoute{
+		ID:         "2",
+		ServerPort: 8002,
+		UpstreamId: "1",
+	})
+	assert.Nil(t, err)
+	assert.Equal(t, obj.ID, "2")
+
+	// List
+	objs, err := cli.List(context.Background())
+	assert.Nil(t, err)
+	assert.Len(t, objs, 2)
+	assert.Equal(t, objs[0].ID, "1")
+	assert.Equal(t, objs[1].ID, "2")
+
+	// Delete then List
+	assert.Nil(t, cli.Delete(context.Background(), objs[0]))
+	objs, err = cli.List(context.Background())
+	assert.Nil(t, err)
+	assert.Len(t, objs, 1)
+	assert.Equal(t, "2", objs[0].ID)
+
+	// Patch then List
+	_, err = cli.Update(context.Background(), &v1.StreamRoute{
+		ID:         "2",
+		UpstreamId: "112",
+	})
+	assert.Nil(t, err)
+	objs, err = cli.List(context.Background())
+	assert.Nil(t, err)
+	assert.Len(t, objs, 1)
+	assert.Equal(t, "2", objs[0].ID)
+}
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index d1ad058..3d1bb72 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -71,6 +71,7 @@ type ApisixRoute struct {
 // ApisixRouteSpec is the spec definition for ApisixRouteSpec.
 type ApisixRouteSpec struct {
 	HTTP []*ApisixRouteHTTP `json:"http,omitempty" yaml:"http,omitempty"`
+	TCP  []*ApisixRouteTCP  `json:"tcp,omitempty" yaml:"tcp,omitempty"`
 }
 
 // ApisixRouteHTTP represents a single route in for HTTP traffic.
@@ -196,6 +197,35 @@ func (p *ApisixRouteHTTPPluginConfig) DeepCopy() *ApisixRouteHTTPPluginConfig {
 	return out
 }
 
+// ApisixRouteTCP is the configuration for tcp route.
+type ApisixRouteTCP struct {
+	// The rule name, cannot be empty.
+	Name    string                `json:"name" yaml:"name"`
+	Match   ApisixRouteTCPMatch   `json:"match" yaml:"match"`
+	Backend ApisixRouteTCPBackend `json:"backend" yaml:"backend"`
+}
+
+// ApisixRouteTCPMatch represents the match conditions of tcp route.
+type ApisixRouteTCPMatch struct {
+	// IngressPort represents the port listening on the Ingress proxy server.
+	// It should be pre-defined as APISIX doesn't support dynamic listening.
+	IngressPort int32 `json:"ingressPort" yaml:"ingressPort"`
+}
+
+// ApisixRouteTCPBackend represents a TCP backend (a Kubernetes Service).
+type ApisixRouteTCPBackend struct {
+	// The name (short) of the service, note cross namespace is forbidden,
+	// so be sure the ApisixRoute and Service are in the same namespace.
+	ServiceName string `json:"serviceName" yaml:"serviceName"`
+	// The service port, could be the name or the port number.
+	ServicePort intstr.IntOrString `json:"servicePort" yaml:"servicePort"`
+	// The resolve granularity, can be "endpoints" or "service",
+	// when set to "endpoints", the pod ips will be used; other
+	// wise, the service ClusterIP or ExternalIP will be used,
+	// default is endpoints.
+	ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
+}
+
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
 type ApisixRouteList struct {
 	metav1.TypeMeta `json:",inline" yaml:",inline"`
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go b/pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go
index d3e87e6..f35d721 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go
@@ -275,6 +275,17 @@ func (in *ApisixRouteSpec) DeepCopyInto(out *ApisixRouteSpec) {
 			}
 		}
 	}
+	if in.TCP != nil {
+		in, out := &in.TCP, &out.TCP
+		*out = make([]*ApisixRouteTCP, len(*in))
+		for i := range *in {
+			if (*in)[i] != nil {
+				in, out := &(*in)[i], &(*out)[i]
+				*out = new(ApisixRouteTCP)
+				**out = **in
+			}
+		}
+	}
 	return
 }
 
@@ -287,3 +298,54 @@ func (in *ApisixRouteSpec) DeepCopy() *ApisixRouteSpec {
 	in.DeepCopyInto(out)
 	return out
 }
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixRouteTCP) DeepCopyInto(out *ApisixRouteTCP) {
+	*out = *in
+	out.Match = in.Match
+	out.Backend = in.Backend
+	return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteTCP.
+func (in *ApisixRouteTCP) DeepCopy() *ApisixRouteTCP {
+	if in == nil {
+		return nil
+	}
+	out := new(ApisixRouteTCP)
+	in.DeepCopyInto(out)
+	return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixRouteTCPBackend) DeepCopyInto(out *ApisixRouteTCPBackend) {
+	*out = *in
+	out.ServicePort = in.ServicePort
+	return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteTCPBackend.
+func (in *ApisixRouteTCPBackend) DeepCopy() *ApisixRouteTCPBackend {
+	if in == nil {
+		return nil
+	}
+	out := new(ApisixRouteTCPBackend)
+	in.DeepCopyInto(out)
+	return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixRouteTCPMatch) DeepCopyInto(out *ApisixRouteTCPMatch) {
+	*out = *in
+	return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteTCPMatch.
+func (in *ApisixRouteTCPMatch) DeepCopy() *ApisixRouteTCPMatch {
+	if in == nil {
+		return nil
+	}
+	out := new(ApisixRouteTCPMatch)
+	in.DeepCopyInto(out)
+	return out
+}
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 1f43705..6ab1aa0 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -323,6 +323,17 @@ type TrafficSplitConfigRuleWeightedUpstream struct {
 	Weight     int    `json:"weight"`
 }
 
+// StreamRoute represents the stream route object in APISIX.
+// +k8s:deepcopy-gen=true
+type StreamRoute struct {
+	// TODO metadata should use Metadata type
+	ID         string            `json:"id,omitempty" yaml:"id,omitempty"`
+	Desc       string            `json:"desc,omitempty" yaml:"desc,omitempty"`
+	Labels     map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
+	ServerPort int32             `json:"server_port,omitempty" yaml:"server_port,omitempty"`
+	UpstreamId string            `json:"upstream_id,omitempty" yaml:"upstream_id,omitempty"`
+}
+
 // NewDefaultUpstream returns an empty Upstream with default values.
 func NewDefaultUpstream() *Upstream {
 	return &Upstream{
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index 50a6f1b..8fa8c11 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -123,6 +123,29 @@ func (in *Ssl) DeepCopy() *Ssl {
 }
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *StreamRoute) DeepCopyInto(out *StreamRoute) {
+	*out = *in
+	if in.Labels != nil {
+		in, out := &in.Labels, &out.Labels
+		*out = make(map[string]string, len(*in))
+		for key, val := range *in {
+			(*out)[key] = val
+		}
+	}
+	return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamRoute.
+func (in *StreamRoute) DeepCopy() *StreamRoute {
+	if in == nil {
+		return nil
+	}
+	out := new(StreamRoute)
+	in.DeepCopyInto(out)
+	return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *StringOrSlice) DeepCopyInto(out *StringOrSlice) {
 	*out = *in
 	if in.SliceVal != nil {