You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by we...@apache.org on 2020/12/25 07:16:42 UTC
[apisix-ingress-controller] branch master updated: chore:
integrated api7/seven (#128)
This is an automated email from the ASF dual-hosted git repository.
wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new cf99f1f chore: integrated api7/seven (#128)
cf99f1f is described below
commit cf99f1f57a4072fc743ce361053171ba9ad95c67
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Fri Dec 25 15:16:37 2020 +0800
chore: integrated api7/seven (#128)
This commit integrated github.com/api7/seven module, previously the
release it not so convenient.
What's more, we have the follow changes:
* Apache License V2 header is added in each of file;
* Apply gofmt;
* Change the reference from github.com/api7/seven to github.com/api7/ingress-controller/pkg/seven
---
cmd/ingress/ingress.go | 2 +-
cmd/ingress/ingress_test.go | 2 +-
go.mod | 11 +-
go.sum | 2 -
pkg/ingress/apisix/annotation.go | 6 +-
pkg/ingress/apisix/plugin.go | 3 +-
pkg/ingress/apisix/route.go | 8 +-
pkg/ingress/apisix/service.go | 8 +-
pkg/ingress/apisix/tls.go | 5 +-
pkg/ingress/apisix/upstream.go | 8 +-
pkg/ingress/controller/apisix_route.go | 4 +-
pkg/ingress/controller/apisix_service.go | 2 +-
pkg/ingress/controller/apisix_tls.go | 7 +-
pkg/ingress/controller/apisix_upstream.go | 2 +-
pkg/ingress/controller/endpoint.go | 6 +-
pkg/ingress/controller/watch.go | 10 +-
.../apisix/plugin.go => seven/apisix/client.go} | 49 ++--
pkg/seven/apisix/error.go | 22 ++
pkg/seven/apisix/event.go | 23 ++
pkg/seven/apisix/plugins.go | 115 +++++++++
pkg/seven/apisix/route.go | 246 +++++++++++++++++++
pkg/seven/apisix/route_test.go | 104 ++++++++
pkg/seven/apisix/service.go | 215 ++++++++++++++++
pkg/seven/apisix/service_test.go | 80 ++++++
pkg/seven/apisix/ssl.go | 139 +++++++++++
pkg/seven/apisix/ssl_test.go | 80 ++++++
pkg/seven/apisix/upstream.go | 269 +++++++++++++++++++++
pkg/seven/apisix/upstream_test.go | 84 +++++++
pkg/seven/conf/conf.go | 36 +++
pkg/seven/conf/conf_test.go | 24 ++
pkg/seven/db/route.go | 110 +++++++++
pkg/seven/db/service.go | 97 ++++++++
pkg/seven/db/store.go | 43 ++++
pkg/seven/db/upstream.go | 103 ++++++++
pkg/seven/state/builder.go | 244 +++++++++++++++++++
.../plugin.go => seven/state/builder_test.go} | 52 ++--
pkg/seven/state/diff.go | 15 ++
.../apisix/plugin.go => seven/state/event.go} | 48 ++--
pkg/seven/state/route_worker.go | 61 +++++
pkg/seven/state/service_worker.go | 195 +++++++++++++++
pkg/seven/state/solver.go | 129 ++++++++++
pkg/seven/state/sync.go | 15 ++
pkg/seven/state/upstream_worker.go | 23 ++
.../apisix/plugin.go => seven/utils/diff.go} | 61 ++---
pkg/seven/utils/http.go | 88 +++++++
pkg/seven/utils/types.go | 15 ++
46 files changed, 2731 insertions(+), 140 deletions(-)
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index 5cae8a5..fb41553 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -24,7 +24,6 @@ import (
"time"
api6Informers "github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions"
- "github.com/gxthrj/seven/conf"
"github.com/spf13/cobra"
"github.com/api7/ingress-controller/pkg/api"
@@ -32,6 +31,7 @@ import (
"github.com/api7/ingress-controller/pkg/ingress/controller"
"github.com/api7/ingress-controller/pkg/kube"
"github.com/api7/ingress-controller/pkg/log"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
)
func dief(template string, args ...interface{}) {
diff --git a/cmd/ingress/ingress_test.go b/cmd/ingress/ingress_test.go
index 144f753..108fee1 100644
--- a/cmd/ingress/ingress_test.go
+++ b/cmd/ingress/ingress_test.go
@@ -25,11 +25,11 @@ import (
"testing"
"time"
- "github.com/gxthrj/seven/conf"
"github.com/stretchr/testify/assert"
"github.com/api7/ingress-controller/pkg/config"
"github.com/api7/ingress-controller/pkg/log"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/types"
)
diff --git a/go.mod b/go.mod
index 261896d..f4195b0 100644
--- a/go.mod
+++ b/go.mod
@@ -8,11 +8,20 @@ require (
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/gxthrj/apisix-ingress-types v0.1.3
github.com/gxthrj/apisix-types v0.1.3
- github.com/gxthrj/seven v0.2.7
+ github.com/hashicorp/go-memdb v1.0.4
+ github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect
+ github.com/mattn/go-colorable v0.1.4 // indirect
+ github.com/onsi/ginkgo v1.11.0 // indirect
+ github.com/onsi/gomega v1.8.1 // indirect
github.com/prometheus/client_golang v0.9.3
+ github.com/sergi/go-diff v1.1.0 // indirect
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.4.0
+ github.com/yudai/gojsondiff v1.0.0
+ github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
+ github.com/yudai/pp v2.0.1+incompatible // indirect
go.uber.org/zap v1.13.0
+ gopkg.in/resty.v1 v1.12.0
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.0.0-20190819141258-3544db3b9e44
k8s.io/apimachinery v0.0.0-20190817020851-f2f3a405f61d
diff --git a/go.sum b/go.sum
index 32e6a93..096a026 100644
--- a/go.sum
+++ b/go.sum
@@ -112,8 +112,6 @@ github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:Fecb
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
-github.com/gxthrj/seven v0.2.7 h1:DNRi3HGXiTEC2O87jq9MqEMHjwf7eHvYQXhJxv1Qa5E=
-github.com/gxthrj/seven v0.2.7/go.mod h1:SYs/veqEMdwRF5BL3nf/nxfypoDMO2E6Odgp17m+J9U=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
diff --git a/pkg/ingress/apisix/annotation.go b/pkg/ingress/apisix/annotation.go
index cead443..0bf0459 100644
--- a/pkg/ingress/apisix/annotation.go
+++ b/pkg/ingress/apisix/annotation.go
@@ -15,9 +15,11 @@
package apisix
import (
- apisix "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
- seven "github.com/gxthrj/seven/apisix"
"strconv"
+
+ apisix "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+ seven "github.com/api7/ingress-controller/pkg/seven/apisix"
)
// BuildAnnotation return plugins and group
diff --git a/pkg/ingress/apisix/plugin.go b/pkg/ingress/apisix/plugin.go
index 4feb097..4518324 100644
--- a/pkg/ingress/apisix/plugin.go
+++ b/pkg/ingress/apisix/plugin.go
@@ -15,8 +15,9 @@
package apisix
import (
- "github.com/gxthrj/seven/apisix"
"strconv"
+
+ "github.com/api7/ingress-controller/pkg/seven/apisix"
)
type CorsYaml struct {
diff --git a/pkg/ingress/apisix/route.go b/pkg/ingress/apisix/route.go
index fe4d0aa..d40734a 100644
--- a/pkg/ingress/apisix/route.go
+++ b/pkg/ingress/apisix/route.go
@@ -15,11 +15,13 @@
package apisix
import (
- "github.com/api7/ingress-controller/pkg/ingress/endpoint"
+ "strconv"
+
ingress "github.com/gxthrj/apisix-ingress-types/pkg/apis/config/v1"
apisix "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
- "github.com/gxthrj/seven/conf"
- "strconv"
+
+ "github.com/api7/ingress-controller/pkg/ingress/endpoint"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
)
const (
diff --git a/pkg/ingress/apisix/service.go b/pkg/ingress/apisix/service.go
index b3e00b9..62d11e4 100644
--- a/pkg/ingress/apisix/service.go
+++ b/pkg/ingress/apisix/service.go
@@ -15,11 +15,13 @@
package apisix
import (
- "github.com/api7/ingress-controller/pkg/ingress/endpoint"
+ "strconv"
+
ingress "github.com/gxthrj/apisix-ingress-types/pkg/apis/config/v1"
apisix "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
- "github.com/gxthrj/seven/conf"
- "strconv"
+
+ "github.com/api7/ingress-controller/pkg/ingress/endpoint"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
)
const (
diff --git a/pkg/ingress/apisix/tls.go b/pkg/ingress/apisix/tls.go
index ec3888c..10c3201 100644
--- a/pkg/ingress/apisix/tls.go
+++ b/pkg/ingress/apisix/tls.go
@@ -15,12 +15,13 @@
package apisix
import (
- ingressConf "github.com/api7/ingress-controller/pkg/kube"
ingress "github.com/gxthrj/apisix-ingress-types/pkg/apis/config/v1"
apisix "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
- "github.com/gxthrj/seven/conf"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ ingressConf "github.com/api7/ingress-controller/pkg/kube"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
)
const (
diff --git a/pkg/ingress/apisix/upstream.go b/pkg/ingress/apisix/upstream.go
index c5c08e6..358ff81 100644
--- a/pkg/ingress/apisix/upstream.go
+++ b/pkg/ingress/apisix/upstream.go
@@ -15,11 +15,13 @@
package apisix
import (
- "github.com/api7/ingress-controller/pkg/ingress/endpoint"
+ "strconv"
+
ingress "github.com/gxthrj/apisix-ingress-types/pkg/apis/config/v1"
apisix "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
- "github.com/gxthrj/seven/conf"
- "strconv"
+
+ "github.com/api7/ingress-controller/pkg/ingress/endpoint"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
)
const (
diff --git a/pkg/ingress/controller/apisix_route.go b/pkg/ingress/controller/apisix_route.go
index 73d34bf..0759f20 100644
--- a/pkg/ingress/controller/apisix_route.go
+++ b/pkg/ingress/controller/apisix_route.go
@@ -16,14 +16,12 @@ package controller
import (
"fmt"
- "github.com/api7/ingress-controller/pkg/ingress/apisix"
api6V1 "github.com/gxthrj/apisix-ingress-types/pkg/apis/config/v1"
clientSet "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
api6Scheme "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
api6Informers "github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
"github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
apisixV1 "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
- "github.com/gxthrj/seven/state"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -33,7 +31,9 @@ import (
"k8s.io/client-go/util/workqueue"
"time"
+ "github.com/api7/ingress-controller/pkg/ingress/apisix"
"github.com/api7/ingress-controller/pkg/log"
+ "github.com/api7/ingress-controller/pkg/seven/state"
)
type ApisixRouteController struct {
diff --git a/pkg/ingress/controller/apisix_service.go b/pkg/ingress/controller/apisix_service.go
index a6b2005..474f5de 100644
--- a/pkg/ingress/controller/apisix_service.go
+++ b/pkg/ingress/controller/apisix_service.go
@@ -23,7 +23,6 @@ import (
apisixScheme "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
informers "github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
"github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
- "github.com/gxthrj/seven/state"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -34,6 +33,7 @@ import (
"github.com/api7/ingress-controller/pkg/ingress/apisix"
"github.com/api7/ingress-controller/pkg/log"
+ "github.com/api7/ingress-controller/pkg/seven/state"
)
type ApisixServiceController struct {
diff --git a/pkg/ingress/controller/apisix_tls.go b/pkg/ingress/controller/apisix_tls.go
index 4efeb05..0510d14 100644
--- a/pkg/ingress/controller/apisix_tls.go
+++ b/pkg/ingress/controller/apisix_tls.go
@@ -18,14 +18,11 @@ import (
"fmt"
"time"
- "github.com/api7/ingress-controller/pkg/ingress/apisix"
- "github.com/api7/ingress-controller/pkg/log"
apisixV1 "github.com/gxthrj/apisix-ingress-types/pkg/apis/config/v1"
clientSet "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
apisixScheme "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
informers "github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
"github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
- "github.com/gxthrj/seven/state"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -33,6 +30,10 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
+
+ "github.com/api7/ingress-controller/pkg/ingress/apisix"
+ "github.com/api7/ingress-controller/pkg/log"
+ "github.com/api7/ingress-controller/pkg/seven/state"
)
type ApisixTlsController struct {
diff --git a/pkg/ingress/controller/apisix_upstream.go b/pkg/ingress/controller/apisix_upstream.go
index 092f63e..bc08e82 100644
--- a/pkg/ingress/controller/apisix_upstream.go
+++ b/pkg/ingress/controller/apisix_upstream.go
@@ -23,7 +23,6 @@ import (
apisixScheme "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
informers "github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
"github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
- "github.com/gxthrj/seven/state"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -35,6 +34,7 @@ import (
"github.com/api7/ingress-controller/pkg/ingress/apisix"
"github.com/api7/ingress-controller/pkg/ingress/endpoint"
"github.com/api7/ingress-controller/pkg/log"
+ "github.com/api7/ingress-controller/pkg/seven/state"
)
type ApisixUpstreamController struct {
diff --git a/pkg/ingress/controller/endpoint.go b/pkg/ingress/controller/endpoint.go
index 7c42a22..b21cfbf 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -18,9 +18,6 @@ import (
"fmt"
"github.com/golang/glog"
apisixType "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
- "github.com/gxthrj/seven/apisix"
- sevenConf "github.com/gxthrj/seven/conf"
- "github.com/gxthrj/seven/state"
CoreV1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
@@ -34,6 +31,9 @@ import (
"github.com/api7/ingress-controller/pkg/kube"
"github.com/api7/ingress-controller/pkg/log"
+ "github.com/api7/ingress-controller/pkg/seven/apisix"
+ sevenConf "github.com/api7/ingress-controller/pkg/seven/conf"
+ "github.com/api7/ingress-controller/pkg/seven/state"
)
type EndpointController struct {
diff --git a/pkg/ingress/controller/watch.go b/pkg/ingress/controller/watch.go
index ce4a2ca..6738e19 100644
--- a/pkg/ingress/controller/watch.go
+++ b/pkg/ingress/controller/watch.go
@@ -15,14 +15,16 @@
package controller
import (
+ "strconv"
+
"github.com/api7/ingress-controller/pkg/kube"
"github.com/golang/glog"
apisixType "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
- "github.com/gxthrj/seven/apisix"
- sevenConf "github.com/gxthrj/seven/conf"
- "github.com/gxthrj/seven/state"
"k8s.io/api/core/v1"
- "strconv"
+
+ "github.com/api7/ingress-controller/pkg/seven/apisix"
+ sevenConf "github.com/api7/ingress-controller/pkg/seven/conf"
+ "github.com/api7/ingress-controller/pkg/seven/state"
)
const (
diff --git a/pkg/ingress/apisix/plugin.go b/pkg/seven/apisix/client.go
similarity index 50%
copy from pkg/ingress/apisix/plugin.go
copy to pkg/seven/apisix/client.go
index 4feb097..fcb2466 100644
--- a/pkg/ingress/apisix/plugin.go
+++ b/pkg/seven/apisix/client.go
@@ -15,37 +15,28 @@
package apisix
import (
- "github.com/gxthrj/seven/apisix"
- "strconv"
+ "fmt"
+ "net/http"
+ "time"
+
+ "gopkg.in/resty.v1"
)
-type CorsYaml struct {
- Enable bool `json:"enable,omitempty"`
- AllowOrigin string `json:"allow_origin,omitempty"`
- AllowHeaders string `json:"allow_headers,omitempty"`
- AllowMethods string `json:"allow_methods,omitempty"`
-}
+const (
+ timeout = 3000
+)
-func (c *CorsYaml) SetEnable(enable string) {
- if b, err := strconv.ParseBool(enable); err != nil {
- c.Enable = false
- } else {
- c.Enable = b
+func Get(url string) ([]byte, error) {
+ r := resty.New().
+ SetTimeout(time.Duration(timeout)*time.Millisecond).
+ R().
+ SetHeader("content-type", "application/json")
+ resp, err := r.Get(url)
+ if err != nil {
+ return nil, err
}
-}
-
-func (c *CorsYaml) SetOrigin(origin string) {
- c.AllowOrigin = origin
-}
-
-func (c *CorsYaml) SetHeaders(headers string) {
- c.AllowHeaders = headers
-}
-func (c *CorsYaml) SetMethods(methods string) {
- c.AllowMethods = methods
-}
-
-func (c *CorsYaml) Build() *apisix.Cors {
- maxAge := int64(3600)
- return apisix.BuildCors(c.Enable, &c.AllowOrigin, &c.AllowHeaders, &c.AllowMethods, &maxAge)
+ if resp.StatusCode() != http.StatusOK {
+ return nil, fmt.Errorf("status: %d, body: %s", resp.StatusCode(), resp.Body())
+ }
+ return resp.Body(), nil
}
diff --git a/pkg/seven/apisix/error.go b/pkg/seven/apisix/error.go
new file mode 100644
index 0000000..d832854
--- /dev/null
+++ b/pkg/seven/apisix/error.go
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+type ErrorResponse struct {
+ Cause string `json:"cause"`
+ Index int64 `json:"index"`
+ ErrorCode int64 `json:"errorCode"`
+ Message string `json:"message"`
+}
diff --git a/pkg/seven/apisix/event.go b/pkg/seven/apisix/event.go
new file mode 100644
index 0000000..64bd3d9
--- /dev/null
+++ b/pkg/seven/apisix/event.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+// define event for workflow
+
+type Event struct {
+ Method string // ADD UPDATE DELETE
+ Kind string // route service upstream
+ Func func(...interface{}) // callback
+}
diff --git a/pkg/seven/apisix/plugins.go b/pkg/seven/apisix/plugins.go
new file mode 100644
index 0000000..f8eefc8
--- /dev/null
+++ b/pkg/seven/apisix/plugins.go
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import "strings"
+
+// ip-restrictio
+type IpRestriction struct {
+ Whitelist []string `json:"whitelist,omitempty"`
+ Blacklist []string `json:"blacklist,omitempty"`
+}
+
+// Convert2IpRestriction build IpRestriction
+func BuildIpRestriction(whites, blacks *string) *IpRestriction {
+ result := &IpRestriction{}
+ if whites != nil {
+ whiteIps := strings.Split(*whites, ",")
+ result.Whitelist = whiteIps
+ }
+ if blacks != nil {
+ blackIps := strings.Split(*blacks, ",")
+ result.Blacklist = blackIps
+ }
+ return result
+}
+
+// Cors
+type Cors struct {
+ Origins []string `json:"origins,omitempty"`
+ Headers []string `json:"headers,omitempty"`
+ Methods []string `json:"methods,omitempty"`
+ MaxAge int64 `json:"max_age,omitempty"`
+}
+
+// BuildCors
+func BuildCors(enable bool, originStr, headerStr, methodStr *string, maxAge *int64) *Cors {
+ result := &Cors{}
+ if enable {
+ if originStr != nil {
+ origins := strings.Split(*originStr, ",")
+ result.Origins = origins
+ }
+ if headerStr != nil {
+ headers := strings.Split(*headerStr, ",")
+ result.Headers = headers
+ }
+ if methodStr != nil {
+ methods := strings.Split(*methodStr, ",")
+ result.Methods = methods
+ }
+ if maxAge != nil {
+ result.MaxAge = *maxAge
+ }
+ return result
+ } else {
+ return nil
+ }
+}
+
+// routex
+type Routex struct {
+ Rules []Rule `json:"rules,inline"`
+}
+
+type Rule struct {
+ Priority int64 `json:"priority,omitempty"`
+ Upstream string `json:"upstream"`
+ Desc string `json:"desc"`
+ Matches []Match `json:"matchs,omitempty"`
+}
+
+type Match struct {
+ Host string `json:"host,omitempty"`
+ Uri string `json:"uri,omitempty"`
+ Use string `json:"use"`
+ Key string `json:"key"`
+ Values []string `json:"values,omitempty"`
+}
+
+// BuildRoutex
+func BuildRoutex(enable bool, rules []Rule) *Routex {
+ if enable {
+ result := &Routex{Rules: rules}
+ return result
+ } else {
+ return nil
+ }
+}
+
+// token
+type Token struct {
+ IgnoreUri []string `json:"ignore_uri,omitempty"`
+}
+
+// BuildToken
+func BuildToken(enable bool, ignoreUris []string) *Token {
+ if enable {
+ result := &Token{IgnoreUri: ignoreUris}
+ return result
+ } else {
+ return nil
+ }
+}
diff --git a/pkg/seven/apisix/route.go b/pkg/seven/apisix/route.go
new file mode 100644
index 0000000..3ad709f
--- /dev/null
+++ b/pkg/seven/apisix/route.go
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+ "github.com/api7/ingress-controller/pkg/seven/conf"
+ sevendb "github.com/api7/ingress-controller/pkg/seven/db"
+ "github.com/api7/ingress-controller/pkg/seven/utils"
+)
+
+// FindCurrentRoute find current route in memDB
+func FindCurrentRoute(route *v1.Route) (*v1.Route, error) {
+ db := &sevendb.RouteRequest{Group: *route.Group, Name: *route.Name, FullName: *route.FullName}
+ currentRoute, _ := db.FindByName()
+ if currentRoute != nil {
+ return currentRoute, nil
+ } else {
+ // find from apisix
+ if routes, err := ListRoute(*route.Group); err != nil {
+ return nil, fmt.Errorf("list routes from etcd failed, err: %+v", err)
+ } else {
+ for _, r := range routes {
+ if r.Name != nil && *r.Name == *route.Name {
+ // insert to memDB
+ db := &sevendb.RouteDB{Routes: []*v1.Route{r}}
+ db.Insert()
+ // return
+ return r, nil
+ }
+ }
+ }
+
+ }
+ return nil, fmt.Errorf("NOT FOUND")
+}
+
+// ListRoute list route from etcd , convert to v1.Route
+func ListRoute(group string) ([]*v1.Route, error) {
+ baseUrl := conf.FindUrl(group)
+ url := baseUrl + "/routes"
+ ret, err := Get(url)
+ if err != nil {
+ return nil, fmt.Errorf("http get failed, url: %s, err: %+v", url, err)
+ }
+ var routesResponse RoutesResponse
+ if err := json.Unmarshal(ret, &routesResponse); err != nil {
+ return nil, fmt.Errorf("json unmarshal failed, err: %+v", err)
+ } else {
+ routes := make([]*v1.Route, 0)
+ for _, u := range routesResponse.Routes.Routes {
+ if n, err := u.convert(group); err == nil {
+ routes = append(routes, n)
+ } else {
+ return nil, fmt.Errorf("upstream: %s 转换失败, %s", *u.Value.Desc, err.Error())
+ }
+ }
+ return routes, nil
+ }
+}
+
+func AddRoute(route *v1.Route) (*RouteResponse, error) {
+ baseUrl := conf.FindUrl(*route.Group)
+ url := fmt.Sprintf("%s/routes", baseUrl)
+ rr := convert2RouteRequest(route)
+ if b, err := json.Marshal(rr); err != nil {
+ return nil, err
+ } else {
+ if res, err := utils.Post(url, b); err != nil {
+ return nil, err
+ } else {
+ var routeResp RouteResponse
+ if err = json.Unmarshal(res, &routeResp); err != nil {
+ return nil, err
+ } else {
+ if routeResp.Route.Key != nil {
+ return &routeResp, nil
+ } else {
+ return nil, fmt.Errorf("apisix route not expected response")
+ }
+
+ }
+ }
+ }
+}
+
+func UpdateRoute(route *v1.Route) error {
+ baseUrl := conf.FindUrl(*route.Group)
+ url := fmt.Sprintf("%s/routes/%s", baseUrl, *route.ID)
+ rr := convert2RouteRequest(route)
+ if b, err := json.Marshal(rr); err != nil {
+ return err
+ } else {
+ if _, err := utils.Patch(url, b); err != nil {
+ return err
+ } else {
+ return nil
+ }
+ }
+}
+
+func DeleteRoute(route *v1.Route) error {
+ baseUrl := conf.FindUrl(*route.Group)
+ url := fmt.Sprintf("%s/routes/%s", baseUrl, *route.ID)
+ if _, err := utils.Delete(url); err != nil {
+ return err
+ } else {
+ return nil
+ }
+}
+
+type Redirect struct {
+ RetCode int64 `json:"ret_code"`
+ Uri string `json:"uri"`
+}
+
+func convert2RouteRequest(route *v1.Route) *RouteRequest {
+ return &RouteRequest{
+ Desc: *route.Name,
+ Host: *route.Host,
+ Uri: *route.Path,
+ ServiceId: *route.ServiceId,
+ Plugins: route.Plugins,
+ }
+}
+
+// convert apisix RouteResponse -> apisix-types v1.Route
+func (r *Route) convert(group string) (*v1.Route, error) {
+ // id
+ key := r.Key
+ ks := strings.Split(*key, "/")
+ id := ks[len(ks)-1]
+ // name
+ name := r.Value.Desc
+ // host
+ host := r.Value.Host
+ // path
+ path := r.Value.Uri
+ // method
+ methods := r.Value.Methods
+ // upstreamId
+ upstreamId := r.Value.UpstreamId
+ // serviceId
+ serviceId := r.Value.ServiceId
+ // plugins
+ var plugins v1.Plugins
+ plugins = r.Value.Plugins
+
+ // fullName
+ fullName := "unknown"
+ if name != nil {
+ fullName = *name
+ }
+ if group != "" {
+ fullName = group + "_" + fullName
+ }
+
+ return &v1.Route{
+ ID: &id,
+ Group: &group,
+ FullName: &fullName,
+ Name: name,
+ Host: host,
+ Path: path,
+ Methods: methods,
+ UpstreamId: upstreamId,
+ ServiceId: serviceId,
+ Plugins: &plugins,
+ }, nil
+}
+
+type RoutesResponse struct {
+ Routes Routes `json:"node"`
+}
+
+type Routes struct {
+ Key string `json:"key"`
+ Routes RouteSet `json:"nodes"`
+}
+
+type RouteSet []Route
+
+// RouteSet.UnmarshalJSON implements json.Unmarshaler interface.
+// lua-cjson doesn't distinguish empty array and table,
+// and by default empty array will be encoded as '{}'.
+// We have to maintain the compatibility.
+func (set *RouteSet) UnmarshalJSON(p []byte) error {
+ if p[0] == '{' {
+ if len(p) != 2 {
+ return errors.New("unexpected non-empty object")
+ }
+ return nil
+ }
+ var route []Route
+ if err := json.Unmarshal(p, &route); err != nil {
+ return err
+ }
+ *set = route
+ return nil
+}
+
+type RouteResponse struct {
+ Action string `json:"action"`
+ Route Route `json:"node"`
+}
+
+type Route struct {
+ Key *string `json:"key"` // route key
+ Value Value `json:"value"` // route content
+}
+
+type Value struct {
+ UpstreamId *string `json:"upstream_id"`
+ ServiceId *string `json:"service_id"`
+ Plugins map[string]interface{} `json:"plugins"`
+ Host *string `json:"host,omitempty"`
+ Uri *string `json:"uri"`
+ Desc *string `json:"desc"`
+ Methods []*string `json:"methods,omitempty"`
+}
+
+type RouteRequest struct {
+ Desc string `json:"desc,omitempty"`
+ Uri string `json:"uri,omitempty"`
+ Host string `json:"host,omitempty"`
+ ServiceId string `json:"service_id,omitempty"`
+ Plugins *v1.Plugins `json:"plugins,omitempty"`
+}
diff --git a/pkg/seven/apisix/route_test.go b/pkg/seven/apisix/route_test.go
new file mode 100644
index 0000000..9fba34e
--- /dev/null
+++ b/pkg/seven/apisix/route_test.go
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestRouteUnmarshalJSON(t *testing.T) {
+ var route Routes
+ emptyData := `
+{
+ "key": "test",
+ "nodes": {}
+}
+`
+ err := json.Unmarshal([]byte(emptyData), &route)
+ assert.Nil(t, err)
+
+ emptyData = `
+{
+ "key": "test",
+ "nodes": {"a": "b", "c": "d"}
+}
+`
+ err = json.Unmarshal([]byte(emptyData), &route)
+ assert.Equal(t, err.Error(), "unexpected non-empty object")
+
+ emptyArray := `
+{
+ "key": "test",
+ "nodes": []
+}
+`
+ err = json.Unmarshal([]byte(emptyArray), &route)
+ assert.Nil(t, err)
+
+ normalData := `
+{
+ "key": "test",
+ "nodes": [
+ {
+ "key": "route 1",
+ "value": {
+ "desc": "test route 1",
+ "upstream_id": "123",
+ "service_id": "12345",
+ "host": "foo.com",
+ "uri": "/bar/baz",
+ "methods": ["GET", "POST"]
+ }
+ }
+ ]
+}
+`
+ err = json.Unmarshal([]byte(normalData), &route)
+ assert.Nil(t, err)
+ assert.Equal(t, route.Key, "test")
+ assert.Equal(t, len(route.Routes), 1)
+
+ key := *route.Routes[0].Key
+ assert.Equal(t, key, "route 1")
+ desc := *route.Routes[0].Value.Desc
+ assert.Equal(t, desc, "test route 1")
+ upstreamId := *route.Routes[0].Value.UpstreamId
+ assert.Equal(t, upstreamId, "123")
+ svcId := *route.Routes[0].Value.ServiceId
+ assert.Equal(t, svcId, "12345")
+ assert.Equal(t, *route.Routes[0].Value.Host, "foo.com")
+ assert.Equal(t, *route.Routes[0].Value.Uri, "/bar/baz")
+ assert.Equal(t, *route.Routes[0].Value.Methods[0], "GET")
+ assert.Equal(t, *route.Routes[0].Value.Methods[1], "POST")
+}
+
+func TestRouteConvertWithoutDesc(t *testing.T) {
+ upsId := "1"
+ svcId := "2"
+ key := "foo/bar"
+ r := &Route{
+ Key: &key,
+ Value: Value{
+ UpstreamId: &upsId,
+ ServiceId: &svcId,
+ Host: nil,
+ },
+ }
+ _, err := r.convert("mygroup")
+ assert.Nil(t, err)
+}
diff --git a/pkg/seven/apisix/service.go b/pkg/seven/apisix/service.go
new file mode 100644
index 0000000..a036e8e
--- /dev/null
+++ b/pkg/seven/apisix/service.go
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/golang/glog"
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+ "github.com/api7/ingress-controller/pkg/seven/conf"
+ sevendb "github.com/api7/ingress-controller/pkg/seven/db"
+ "github.com/api7/ingress-controller/pkg/seven/utils"
+)
+
+// FindCurrentService find service from memDB,
+// if Not Found, find service from apisix
+func FindCurrentService(group, name, fullName string) (*v1.Service, error) {
+ db := sevendb.ServiceRequest{Group: group, Name: name, FullName: fullName}
+ currentService, _ := db.FindByName()
+ if currentService != nil {
+ return currentService, nil
+ } else {
+ // find service from apisix
+ if services, err := ListService(group); err != nil {
+ glog.Errorf("list services in etcd failed, group: %s, err: %+v", group, err)
+ return nil, fmt.Errorf("list services failed, err: %+v", err)
+ } else {
+ for _, s := range services {
+ if s.Name != nil && *(s.Name) == name {
+ // and save to memDB
+ db := &sevendb.ServiceDB{Services: []*v1.Service{s}}
+ db.Insert()
+ // return
+ return s, nil
+ }
+ }
+ }
+ }
+ return nil, nil
+}
+
+// ListUpstream list upstream from etcd , convert to v1.Upstream
+func ListService(group string) ([]*v1.Service, error) {
+ baseUrl := conf.FindUrl(group)
+ url := baseUrl + "/services"
+ ret, err := Get(url)
+ if err != nil {
+ return nil, fmt.Errorf("http get failed, url: %s, err: %+v", url, err)
+ }
+ var servicesResponse ServicesResponse
+ if err := json.Unmarshal(ret, &servicesResponse); err != nil {
+ return nil, fmt.Errorf("json unmarshal failed, err: %+v", err)
+ } else {
+ result := make([]*v1.Service, 0)
+ for _, u := range servicesResponse.Services.Services {
+ if n, err := u.convert(group); err == nil {
+ result = append(result, n)
+ } else {
+ return nil, fmt.Errorf("service : %+v 转换失败, %s", u.ServiceValue, err.Error())
+ }
+ }
+ return result, nil
+ }
+}
+
+// convert convert Service from etcd to v1.Service
+func (u *Service) convert(group string) (*v1.Service, error) {
+ // id
+ keys := strings.Split(*u.Key, "/")
+ id := keys[len(keys)-1]
+ // Name
+ name := u.ServiceValue.Desc
+ // upstreamId
+ upstreamId := u.ServiceValue.UpstreamId
+ // plugins
+ plugins := &v1.Plugins{}
+ for k, v := range u.ServiceValue.Plugins {
+ (*plugins)[k] = v
+ }
+ fullName := *name
+ if group != "" {
+ fullName = group + "_" + *name
+ }
+ return &v1.Service{ID: &id, FullName: &fullName, Group: &group, Name: name, UpstreamId: upstreamId, Plugins: plugins}, nil
+}
+
+func AddService(service *v1.Service) (*ServiceResponse, error) {
+ baseUrl := conf.FindUrl(*service.Group)
+ url := fmt.Sprintf("%s/services", baseUrl)
+ ur := convert2ServiceRequest(service)
+ if b, err := json.Marshal(ur); err != nil {
+ return nil, err
+ } else {
+ if res, err := utils.Post(url, b); err != nil {
+ return nil, fmt.Errorf("http post failed, err: %+v", err)
+ } else {
+ var uRes ServiceResponse
+ if err = json.Unmarshal(res, &uRes); err != nil {
+ return nil, err
+ } else {
+ if uRes.Service.Key != nil {
+ return &uRes, nil
+ } else {
+ return nil, fmt.Errorf("apisix service not expected response")
+ }
+
+ }
+ }
+ }
+}
+
+func UpdateService(service *v1.Service) (*ServiceResponse, error) {
+ baseUrl := conf.FindUrl(*service.Group)
+ url := fmt.Sprintf("%s/services/%s", baseUrl, *service.ID)
+ ur := convert2ServiceRequest(service)
+ if b, err := json.Marshal(ur); err != nil {
+ return nil, err
+ } else {
+ if res, err := utils.Patch(url, b); err != nil {
+ return nil, err
+ } else {
+ var uRes ServiceResponse
+ if err = json.Unmarshal(res, &uRes); err != nil {
+ return nil, err
+ } else {
+ if uRes.Service.Key != nil {
+ return &uRes, nil
+ } else {
+ var errResp ErrorResponse
+ json.Unmarshal(res, &errResp)
+ glog.Error(errResp.Message)
+ return nil, fmt.Errorf("apisix service not expected response %s", errResp.Message)
+ }
+ }
+ }
+ }
+}
+
+func convert2ServiceRequest(service *v1.Service) *ServiceRequest {
+ request := &ServiceRequest{
+ Desc: service.Name,
+ UpstreamId: service.UpstreamId,
+ Plugins: service.Plugins,
+ }
+ glog.V(2).Info(*request.Desc)
+ return request
+}
+
+type ServiceRequest struct {
+ Desc *string `json:"desc,omitempty"`
+ UpstreamId *string `json:"upstream_id"`
+ Plugins *v1.Plugins `json:"plugins,omitempty"`
+}
+
+type ServicesResponse struct {
+ Services Services `json:"node"`
+}
+
+type Services struct {
+ Key string `json:"key"` // 用来定位upstreams 列表
+ Services ServiceSet `json:"nodes"`
+}
+
+type ServiceSet []Service
+
+// UpstreamSet.UnmarshalJSON implements json.Unmarshaler interface.
+// lua-cjson doesn't distinguish empty array and table,
+// and by default empty array will be encoded as '{}'.
+// We have to maintain the compatibility.
+func (set *ServiceSet) UnmarshalJSON(p []byte) error {
+ if p[0] == '{' {
+ if len(p) != 2 {
+ return errors.New("unexpected non-empty object")
+ }
+ return nil
+ }
+ var svcs []Service
+ if err := json.Unmarshal(p, &svcs); err != nil {
+ return err
+ }
+ *set = svcs
+ return nil
+}
+
+type ServiceResponse struct {
+ Action string `json:"action"`
+ Service Service `json:"node"`
+}
+
+type Service struct {
+ Key *string `json:"key"` // service key
+ ServiceValue ServiceValue `json:"value,omitempty"`
+}
+
+type ServiceValue struct {
+ UpstreamId *string `json:"upstream_id,omitempty"`
+ Plugins map[string]interface{} `json:"plugins"`
+ Desc *string `json:"desc,omitempty"`
+}
diff --git a/pkg/seven/apisix/service_test.go b/pkg/seven/apisix/service_test.go
new file mode 100644
index 0000000..4309e4d
--- /dev/null
+++ b/pkg/seven/apisix/service_test.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestServiceUnmarshalJSON(t *testing.T) {
+ var svc Services
+ emptyData := `
+{
+ "key": "test",
+ "nodes": {}
+}
+`
+ err := json.Unmarshal([]byte(emptyData), &svc)
+ assert.Nil(t, err)
+
+ emptyData = `
+{
+ "key": "test",
+ "nodes": {"a": "b", "c": "d"}
+}
+`
+ err = json.Unmarshal([]byte(emptyData), &svc)
+ assert.Equal(t, err.Error(), "unexpected non-empty object")
+
+ emptyArray := `
+{
+ "key": "test",
+ "nodes": []
+}
+`
+ err = json.Unmarshal([]byte(emptyArray), &svc)
+ assert.Nil(t, err)
+
+ normalData := `
+{
+ "key": "test",
+ "nodes": [
+ {
+ "key": "svc1",
+ "value": {
+ "desc": "test service 1",
+ "upstream_id": "123",
+ "plugins": {}
+ }
+ }
+ ]
+}
+`
+ err = json.Unmarshal([]byte(normalData), &svc)
+ assert.Nil(t, err)
+ assert.Equal(t, svc.Key, "test")
+ assert.Equal(t, len(svc.Services), 1)
+
+ key := *svc.Services[0].Key
+ assert.Equal(t, key, "svc1")
+ desc := *svc.Services[0].ServiceValue.Desc
+ assert.Equal(t, desc, "test service 1")
+
+ upstreamId := *svc.Services[0].ServiceValue.UpstreamId
+ assert.Equal(t, upstreamId, "123")
+}
diff --git a/pkg/seven/apisix/ssl.go b/pkg/seven/apisix/ssl.go
new file mode 100644
index 0000000..4933f80
--- /dev/null
+++ b/pkg/seven/apisix/ssl.go
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/golang/glog"
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+ "github.com/api7/ingress-controller/pkg/seven/conf"
+ "github.com/api7/ingress-controller/pkg/seven/utils"
+)
+
+// ListSsl list ssl from etcd , convert to v1.Upstream
+func ListSsl(group string) ([]*v1.Ssl, error) {
+ baseUrl := conf.FindUrl(group)
+ url := baseUrl + "/ssl"
+ ret, err := Get(url)
+ if err != nil {
+ return nil, fmt.Errorf("http get failed, url: %s, err: %+v", url, err)
+ }
+ var sslsResponse SslsResponse
+ if err := json.Unmarshal(ret, &sslsResponse); err != nil {
+ return nil, fmt.Errorf("json transform error")
+ } else {
+ ssls := make([]*v1.Ssl, 0)
+ for _, s := range sslsResponse.SslList.SslNodes {
+ id := strings.ReplaceAll(*s.Key, "/apisix/ssl/", "")
+ ssl := &v1.Ssl{
+ ID: &id,
+ Snis: s.Ssl.Snis,
+ Cert: s.Ssl.Cert,
+ Key: s.Ssl.Key,
+ Status: s.Ssl.Status,
+ Group: &group,
+ }
+ ssls = append(ssls, ssl)
+ }
+ return ssls, nil
+ }
+}
+
+func AddOrUpdateSsl(ssl *v1.Ssl) (*SslResponse, error) {
+ baseUrl := conf.FindUrl(*ssl.Group)
+ url := fmt.Sprintf("%s/ssl/%s", baseUrl, *ssl.ID)
+ glog.V(2).Info(url)
+ ur := &v1.Ssl{
+ Snis: ssl.Snis,
+ Cert: ssl.Cert,
+ Key: ssl.Key,
+ Status: ssl.Status,
+ }
+ if b, err := json.Marshal(ur); err != nil {
+ return nil, err
+ } else {
+ if res, err := utils.Put(url, b); err != nil {
+ return nil, fmt.Errorf("http put failed, url: %s, err: %+v", url, err)
+ } else {
+ var uRes SslResponse
+ if err = json.Unmarshal(res, &uRes); err != nil {
+ glog.Errorf("json Unmarshal error: %s", err.Error())
+ return nil, err
+ } else {
+ glog.V(2).Info(uRes)
+ if uRes.Ssl.Key != nil {
+ return &uRes, nil
+ } else {
+ return nil, fmt.Errorf("apisix ssl not expected response")
+ }
+ }
+ }
+ }
+}
+
+func DeleteSsl(ssl *v1.Ssl) error {
+ baseUrl := conf.FindUrl(*ssl.Group)
+ url := fmt.Sprintf("%s/ssl/%s", baseUrl, *ssl.ID)
+ if _, err := utils.Delete(url); err != nil {
+ return fmt.Errorf("http delete failed, url: %s, err: %+v", url, err)
+ } else {
+ return nil
+ }
+}
+
+type SslResponse struct {
+ Action string `json:"action"`
+ Ssl SslNode `json:"node"`
+}
+
+type SslsResponse struct {
+ Action string `json:"action"`
+ SslList SslList `json:"node"`
+}
+
+type SslList struct {
+ SslNodes SslSet `json:"nodes"`
+}
+
+type SslNode struct {
+ Key *string `json:"key"`
+ Ssl *v1.Ssl `json:"value"`
+}
+
+type SslSet []SslNode
+
+// SslSet.UnmarshalJSON implements json.Unmarshaler interface.
+// lua-cjson doesn't distinguish empty array and table,
+// and by default empty array will be encoded as '{}'.
+// We have to maintain the compatibility.
+func (set *SslSet) UnmarshalJSON(p []byte) error {
+ if p[0] == '{' {
+ if len(p) != 2 {
+ return errors.New("unexpected non-empty object")
+ }
+ return nil
+ }
+ var ssls []SslNode
+ if err := json.Unmarshal(p, &ssls); err != nil {
+ return err
+ }
+ *set = ssls
+ return nil
+}
diff --git a/pkg/seven/apisix/ssl_test.go b/pkg/seven/apisix/ssl_test.go
new file mode 100644
index 0000000..dbb8d30
--- /dev/null
+++ b/pkg/seven/apisix/ssl_test.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestSslUnmarshalJSON(t *testing.T) {
+ var sslList SslList
+ emptyData := `
+{
+ "key": "test",
+ "nodes": {}
+}
+`
+ err := json.Unmarshal([]byte(emptyData), &sslList)
+ assert.Nil(t, err)
+
+ notEmptyObject := `
+{
+ "key": "test",
+ "nodes": {"a": "b", "c": "d"}
+}
+`
+ err = json.Unmarshal([]byte(notEmptyObject), &sslList)
+ assert.Equal(t, err.Error(), "unexpected non-empty object")
+
+ emptyArray := `
+{
+ "key": "test",
+ "nodes": []
+}
+`
+ err = json.Unmarshal([]byte(emptyArray), &sslList)
+ assert.Nil(t, err)
+
+ normalData := `
+{
+ "key": "test",
+ "nodes": [
+ {
+ "key": "ssl id",
+ "value": {
+ "snis": ["test.apisix.org"],
+ "cert": "root",
+ "key": "123456",
+ "status": 1
+ }
+ }
+ ]
+}
+`
+ err = json.Unmarshal([]byte(normalData), &sslList)
+ assert.Nil(t, err)
+ assert.Equal(t, len(sslList.SslNodes), 1)
+
+ key := *sslList.SslNodes[0].Key
+ assert.Equal(t, key, "ssl id")
+ cert := *sslList.SslNodes[0].Ssl.Cert
+ assert.Equal(t, cert, "root")
+ sslKey := *sslList.SslNodes[0].Ssl.Key
+ assert.Equal(t, sslKey, "123456")
+ sni := *sslList.SslNodes[0].Ssl.Snis[0]
+ assert.Equal(t, sni, "test.apisix.org")
+}
diff --git a/pkg/seven/apisix/upstream.go b/pkg/seven/apisix/upstream.go
new file mode 100644
index 0000000..5d9b372
--- /dev/null
+++ b/pkg/seven/apisix/upstream.go
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/golang/glog"
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+ "github.com/api7/ingress-controller/pkg/seven/conf"
+ "github.com/api7/ingress-controller/pkg/seven/db"
+ "github.com/api7/ingress-controller/pkg/seven/utils"
+)
+
+// FindCurrentUpstream find upstream from memDB,
+// if Not Found, find upstream from apisix
+func FindCurrentUpstream(group, name, fullName string) (*v1.Upstream, error) {
+ ur := &db.UpstreamRequest{Group: group, Name: name, FullName: fullName}
+ currentUpstream, _ := ur.FindByName()
+ if currentUpstream != nil {
+ return currentUpstream, nil
+ } else {
+ // find upstream from apisix
+ if upstreams, err := ListUpstream(group); err != nil {
+ glog.Errorf("list upstreams in etcd failed, group: %s, err: %+v", group, err)
+ return nil, fmt.Errorf("list upstreams failed, err: %+v", err)
+ } else {
+ for _, upstream := range upstreams {
+ if upstream.Name != nil && *(upstream.Name) == name {
+ // and save to memDB
+ upstreamDB := &db.UpstreamDB{Upstreams: []*v1.Upstream{upstream}}
+ upstreamDB.InsertUpstreams()
+ //InsertUpstreams([]*v1.Upstream{upstream})
+ // return
+ return upstream, nil
+ }
+ }
+ }
+
+ }
+ return nil, nil
+}
+
+// ListUpstream list upstream from etcd , convert to v1.Upstream
+func ListUpstream(group string) ([]*v1.Upstream, error) {
+ baseUrl := conf.FindUrl(group)
+ url := baseUrl + "/upstreams"
+ ret, err := Get(url)
+ if err != nil {
+ return nil, fmt.Errorf("http get failed, url: %s, err: %+v", url, err)
+ }
+ var upstreamsResponse UpstreamsResponse
+ if err := json.Unmarshal(ret, &upstreamsResponse); err != nil {
+ return nil, fmt.Errorf("json转换失败")
+ } else {
+ upstreams := make([]*v1.Upstream, 0)
+ for _, u := range upstreamsResponse.Upstreams.Upstreams {
+ if n, err := u.convert(group); err == nil {
+ upstreams = append(upstreams, n)
+ } else {
+ return nil, fmt.Errorf("upstream: %s 转换失败, %s", *u.UpstreamNodes.Desc, err.Error())
+ }
+ }
+ return upstreams, nil
+ }
+}
+
+//func IsExist(name string) (bool, error) {
+// if upstreams, err := ListUpstream(); err != nil {
+// return false, err
+// } else {
+// for _, upstream := range upstreams {
+// if *upstream.Name == name {
+// return true, nil
+// }
+// }
+// return false, nil
+// }
+//}
+
+func AddUpstream(upstream *v1.Upstream) (*UpstreamResponse, error) {
+ baseUrl := conf.FindUrl(*upstream.Group)
+ url := fmt.Sprintf("%s/upstreams", baseUrl)
+ glog.V(2).Info(url)
+ ur := convert2UpstreamRequest(upstream)
+ if b, err := json.Marshal(ur); err != nil {
+ return nil, err
+ } else {
+ if res, err := utils.Post(url, b); err != nil {
+ return nil, fmt.Errorf("http post failed, url: %s, err: %+v", url, err)
+ } else {
+ var uRes UpstreamResponse
+ if err = json.Unmarshal(res, &uRes); err != nil {
+ glog.Errorf("json Unmarshal error: %s", err.Error())
+ return nil, err
+ } else {
+ glog.V(2).Info(uRes)
+ if uRes.Upstream.Key != nil {
+ return &uRes, nil
+ } else {
+ return nil, fmt.Errorf("apisix upstream not expected response")
+ }
+ }
+ }
+ }
+}
+
+func UpdateUpstream(upstream *v1.Upstream) error {
+ baseUrl := conf.FindUrl(*upstream.Group)
+ url := fmt.Sprintf("%s/upstreams/%s", baseUrl, *upstream.ID)
+ ur := convert2UpstreamRequest(upstream)
+ if b, err := json.Marshal(ur); err != nil {
+ return err
+ } else {
+ if _, err := utils.Patch(url, b); err != nil {
+ return fmt.Errorf("http patch failed, url: %s, err: %+v", url, err)
+ } else {
+ return nil
+ }
+ }
+}
+
+func PatchNodes(upstream *v1.Upstream, nodes []*v1.Node) error {
+ baseUrl := conf.FindUrl(*upstream.Group)
+ url := fmt.Sprintf("%s/upstreams/%s/nodes", baseUrl, *upstream.ID)
+ nodeMap := convertNodes(nodes)
+ if b, err := json.Marshal(nodeMap); err != nil {
+ return err
+ } else {
+ if _, err := utils.Patch(url, b); err != nil {
+ return fmt.Errorf("http patch failed, url: %s, err: %+v", url, err)
+ } else {
+ return nil
+ }
+ }
+}
+
+func DeleteUpstream(upstream *v1.Upstream) error {
+ baseUrl := conf.FindUrl(*upstream.Group)
+ url := fmt.Sprintf("%s/upstreams/%s", baseUrl, *upstream.ID)
+ if _, err := utils.Delete(url); err != nil {
+ return fmt.Errorf("http delete failed, url: %s, err: %+v", url, err)
+ } else {
+ return nil
+ }
+}
+
+func convert2UpstreamRequest(upstream *v1.Upstream) *UpstreamRequest {
+ nodes := convertNodes(upstream.Nodes)
+ return &UpstreamRequest{
+ LBType: *upstream.Type,
+ HashOn: upstream.HashOn,
+ Key: upstream.Key,
+ Desc: *upstream.Name,
+ Nodes: nodes,
+ }
+}
+
+func convertNodes(nodes []*v1.Node) map[string]int64 {
+ result := make(map[string]int64)
+ for _, u := range nodes {
+ result[*u.IP+":"+strconv.Itoa(*u.Port)] = int64(*u.Weight)
+ }
+ return result
+}
+
+// convert convert Upstream from etcd to v1.Upstream
+func (u *Upstream) convert(group string) (*v1.Upstream, error) {
+ // id
+ keys := strings.Split(*u.Key, "/")
+ id := keys[len(keys)-1]
+ // Name
+ name := u.UpstreamNodes.Desc
+ // type
+ LBType := u.UpstreamNodes.LBType
+ // key
+ key := u.Key
+ // nodes
+ nodes := make([]*v1.Node, 0)
+ for k, v := range u.UpstreamNodes.Nodes {
+ ks := strings.Split(k, ":")
+ ip := ks[0]
+ port := 80
+ if len(ks) > 1 {
+ port, _ = strconv.Atoi(ks[1])
+ }
+ weight := int(v)
+ node := &v1.Node{IP: &ip, Port: &port, Weight: &weight}
+ nodes = append(nodes, node)
+ }
+ // fullName
+ fullName := *name
+ if group != "" {
+ fullName = group + "_" + *name
+ }
+ return &v1.Upstream{ID: &id, FullName: &fullName, Group: &group, Name: name, Type: LBType, Key: key, Nodes: nodes}, nil
+}
+
+type UpstreamsResponse struct {
+ Upstreams Upstreams `json:"node"`
+}
+
+type UpstreamResponse struct {
+ Action string `json:"action"`
+ Upstream Upstream `json:"node"`
+}
+
+type Upstreams struct {
+ Key string `json:"key"` // 用来定位upstreams 列表
+ Upstreams UpstreamSet `json:"nodes"`
+}
+
+type UpstreamSet []Upstream
+
+// UpstreamSet.UnmarshalJSON implements json.Unmarshaler interface.
+// lua-cjson doesn't distinguish empty array and table,
+// and by default empty array will be encoded as '{}'.
+// We have to maintain the compatibility.
+func (set *UpstreamSet) UnmarshalJSON(p []byte) error {
+ if p[0] == '{' {
+ if len(p) != 2 {
+ return errors.New("unexpected non-empty object")
+ }
+ return nil
+ }
+ var ups []Upstream
+ if err := json.Unmarshal(p, &ups); err != nil {
+ return err
+ }
+ *set = ups
+ return nil
+}
+
+type Upstream struct {
+ Key *string `json:"key"` // upstream key
+ UpstreamNodes UpstreamNodes `json:"value"`
+}
+
+type UpstreamNodes struct {
+ Nodes map[string]int64 `json:"nodes"`
+ Desc *string `json:"desc"` // upstream name = k8s svc
+ LBType *string `json:"type"` // 负载均衡类型
+}
+
+//{"type":"roundrobin","nodes":{"10.244.10.11:8080":100},"desc":"somesvc"}
+type UpstreamRequest struct {
+ LBType string `json:"type"`
+ HashOn *string `json:"hash_on,omitempty"`
+ Key *string `json:"key,omitempty"`
+ Nodes map[string]int64 `json:"nodes"`
+ Desc string `json:"desc"`
+}
diff --git a/pkg/seven/apisix/upstream_test.go b/pkg/seven/apisix/upstream_test.go
new file mode 100644
index 0000000..1aee995
--- /dev/null
+++ b/pkg/seven/apisix/upstream_test.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestUpstreamsUnmarshalJSON(t *testing.T) {
+ var ups Upstreams
+ emptyData := `
+{
+ "key": "test",
+ "nodes": {}
+}
+`
+ err := json.Unmarshal([]byte(emptyData), &ups)
+ assert.Nil(t, err)
+
+ emptyData = `
+{
+ "key": "test",
+ "nodes": {"a": "b", "c": "d"}
+}
+`
+ err = json.Unmarshal([]byte(emptyData), &ups)
+ assert.Equal(t, err.Error(), "unexpected non-empty object")
+
+ emptyArray := `
+{
+ "key": "test",
+ "nodes": []
+}
+`
+ err = json.Unmarshal([]byte(emptyArray), &ups)
+ assert.Nil(t, err)
+
+ normalData := `
+{
+ "key": "test",
+ "nodes": [
+ {
+ "key": "ups1",
+ "value": {
+ "desc": "test upstream 1",
+ "type": "rr",
+ "nodes": {
+ "192.168.12.12": 100
+ }
+ }
+ }
+ ]
+}
+`
+ err = json.Unmarshal([]byte(normalData), &ups)
+ assert.Nil(t, err)
+ assert.Equal(t, ups.Key, "test")
+ assert.Equal(t, len(ups.Upstreams), 1)
+
+ key := *ups.Upstreams[0].Key
+ assert.Equal(t, key, "ups1")
+ desc := *ups.Upstreams[0].UpstreamNodes.Desc
+ assert.Equal(t, desc, "test upstream 1")
+ lb := *ups.Upstreams[0].UpstreamNodes.LBType
+ assert.Equal(t, lb, "rr")
+
+ assert.Equal(t, len(ups.Upstreams[0].UpstreamNodes.Nodes), 1)
+ assert.Equal(t, ups.Upstreams[0].UpstreamNodes.Nodes["192.168.12.12"], int64(100))
+}
diff --git a/pkg/seven/conf/conf.go b/pkg/seven/conf/conf.go
new file mode 100644
index 0000000..1f016c6
--- /dev/null
+++ b/pkg/seven/conf/conf.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package conf
+
+var BaseUrl = "http://172.16.20.90:30116/apisix/admin"
+var UrlGroup = make(map[string]string)
+
+func SetBaseUrl(url string) {
+ BaseUrl = url
+}
+
+func AddGroup(group string) {
+ if group != "" {
+ UrlGroup[group] = "http://" + group + "/apisix/admin"
+ }
+}
+
+func FindUrl(group string) string {
+ if group != "" && UrlGroup[group] != "" {
+ return UrlGroup[group]
+ } else {
+ return BaseUrl
+ }
+}
diff --git a/pkg/seven/conf/conf_test.go b/pkg/seven/conf/conf_test.go
new file mode 100644
index 0000000..1540cca
--- /dev/null
+++ b/pkg/seven/conf/conf_test.go
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package conf
+
+import "testing"
+
+func Test_map(t *testing.T) {
+ m1 := make(map[string]string)
+ m1["a"] = "aa"
+ m1["b"] = "bb"
+ t.Log(m1["c"] == "")
+}
diff --git a/pkg/seven/db/route.go b/pkg/seven/db/route.go
new file mode 100644
index 0000000..d6c143e
--- /dev/null
+++ b/pkg/seven/db/route.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package db
+
+import (
+ "fmt"
+
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+ "github.com/hashicorp/go-memdb"
+)
+
+const (
+ Route = "Route"
+)
+
+type RouteRequest struct {
+ Group string
+ Name string
+ FullName string
+}
+
+func (rr *RouteRequest) FindByName() (*v1.Route, error) {
+ txn := DB.Txn(false)
+ defer txn.Abort()
+ if raw, err := txn.First(Route, "id", rr.FullName); err != nil {
+ return nil, err
+ } else {
+ if raw != nil {
+ currentRoute := raw.(*v1.Route)
+ return currentRoute, nil
+ }
+ return nil, fmt.Errorf("NOT FOUND")
+ }
+}
+
+type RouteDB struct {
+ Routes []*v1.Route
+}
+
+// InsertRoute insert route to memDB
+func (db *RouteDB) Insert() error {
+ txn := DB.Txn(true)
+ defer txn.Abort()
+ for _, r := range db.Routes {
+ if err := txn.Insert(Route, r); err != nil {
+ return err
+ }
+ }
+ txn.Commit()
+ return nil
+}
+
+func (db *RouteDB) UpdateRoute() error {
+ txn := DB.Txn(true)
+ defer txn.Abort()
+ for _, r := range db.Routes {
+ // 1. delete
+ if _, err := txn.DeleteAll(Route, "id", *(r.FullName)); err != nil {
+ return err
+ }
+ // 2. insert
+ if err := txn.Insert(Route, r); err != nil {
+ return err
+ }
+ }
+ txn.Commit()
+ return nil
+}
+
+func (db *RouteDB) DeleteRoute() error {
+ txn := DB.Txn(true)
+ defer txn.Abort()
+ for _, r := range db.Routes {
+ //if _, err := txn.DeleteAll(Route, "id", *(r.ID)); err != nil {
+ if _, err := txn.DeleteAll(Route, "id", *(r.FullName)); err != nil {
+ return err
+ }
+ }
+ txn.Commit()
+ return nil
+}
+
+var routeSchema = &memdb.TableSchema{
+ Name: Route,
+ Indexes: map[string]*memdb.IndexSchema{
+ "id": {
+ Name: "id",
+ Unique: true,
+ Indexer: &memdb.StringFieldIndex{Field: "FullName"},
+ },
+ "name": {
+ Name: "name",
+ Unique: true,
+ Indexer: &memdb.StringFieldIndex{Field: "Name"},
+ AllowMissing: true,
+ },
+ },
+}
diff --git a/pkg/seven/db/service.go b/pkg/seven/db/service.go
new file mode 100644
index 0000000..d47bd29
--- /dev/null
+++ b/pkg/seven/db/service.go
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package db
+
+import (
+ "fmt"
+
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+ "github.com/hashicorp/go-memdb"
+)
+
+const (
+ Service = "Service"
+)
+
+type ServiceRequest struct {
+ Group string
+ Name string
+ FullName string
+}
+
+func (sr *ServiceRequest) FindByName() (*v1.Service, error) {
+ txn := DB.Txn(false)
+ defer txn.Abort()
+ if raw, err := txn.First(Service, "id", sr.FullName); err != nil {
+ return nil, err
+ } else {
+ if raw != nil {
+ currentService := raw.(*v1.Service)
+ return currentService, nil
+ }
+ return nil, fmt.Errorf("NOT FOUND")
+ }
+}
+
+func (db *ServiceDB) Insert() error {
+ txn := DB.Txn(true)
+ defer txn.Abort()
+ for _, s := range db.Services {
+ if err := txn.Insert(Service, s); err != nil {
+ return err
+ }
+ }
+ txn.Commit()
+ return nil
+}
+
+type ServiceDB struct {
+ Services []*v1.Service
+}
+
+func (db *ServiceDB) UpdateService() error {
+ txn := DB.Txn(true)
+ defer txn.Abort()
+ for _, s := range db.Services {
+ // 1. delete
+ if _, err := txn.DeleteAll(Service, "id", *(s.FullName)); err != nil {
+ return err
+ }
+ // 2. insert
+ if err := txn.Insert(Service, s); err != nil {
+ return err
+ }
+ }
+
+ txn.Commit()
+ return nil
+}
+
+var serviceSchema = &memdb.TableSchema{
+ Name: Service,
+ Indexes: map[string]*memdb.IndexSchema{
+ "id": {
+ Name: "id",
+ Unique: true,
+ Indexer: &memdb.StringFieldIndex{Field: "FullName"},
+ },
+ "name": {
+ Name: "name",
+ Unique: true,
+ Indexer: &memdb.StringFieldIndex{Field: "Name"},
+ AllowMissing: true,
+ },
+ },
+}
diff --git a/pkg/seven/db/store.go b/pkg/seven/db/store.go
new file mode 100644
index 0000000..9d71355
--- /dev/null
+++ b/pkg/seven/db/store.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package db
+
+import "github.com/hashicorp/go-memdb"
+
+var DB *memdb.MemDB
+
+func init() {
+ if db, err := NewDB(); err != nil {
+ panic(err)
+ } else {
+ DB = db
+ }
+}
+
+func NewDB() (*memdb.MemDB, error) {
+ var schema = &memdb.DBSchema{
+ Tables: map[string]*memdb.TableSchema{
+ Service: serviceSchema,
+ Route: routeSchema,
+ Upstream: upstreamSchema,
+ },
+ }
+
+ if memDB, err := memdb.NewMemDB(schema); err != nil {
+ return nil, err
+ } else {
+ return memDB, nil
+ }
+}
diff --git a/pkg/seven/db/upstream.go b/pkg/seven/db/upstream.go
new file mode 100644
index 0000000..361565c
--- /dev/null
+++ b/pkg/seven/db/upstream.go
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package db
+
+import (
+ "fmt"
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+ "github.com/hashicorp/go-memdb"
+)
+
+const (
+ Upstream = "Upstream"
+)
+
+type UpstreamDB struct {
+ Upstreams []*v1.Upstream
+}
+
+type UpstreamRequest struct {
+ Group string
+ Name string
+ FullName string
+}
+
+func (ur *UpstreamRequest) FindByName() (*v1.Upstream, error) {
+ txn := DB.Txn(false)
+ defer txn.Abort()
+ if raw, err := txn.First(Upstream, "id", ur.FullName); err != nil {
+ return nil, err
+ } else {
+ if raw != nil {
+ currentUpstream := raw.(*v1.Upstream)
+ return currentUpstream, nil
+ }
+ return nil, fmt.Errorf("NOT FOUND")
+ }
+}
+
+// insertUpstream insert upstream to memDB
+func (upstreamDB *UpstreamDB) InsertUpstreams() error {
+ txn := DB.Txn(true)
+ defer txn.Abort()
+ for _, u := range upstreamDB.Upstreams {
+ if err := txn.Insert(Upstream, u); err != nil {
+ return err
+ }
+ }
+ txn.Commit()
+ return nil
+}
+
+func (upstreamDB *UpstreamDB) UpdateUpstreams() error {
+ txn := DB.Txn(true)
+ defer txn.Abort()
+ for _, u := range upstreamDB.Upstreams {
+ // delete
+ if _, err := txn.DeleteAll(Upstream, "id", *(u.FullName)); err != nil {
+ return err
+ }
+ // insert
+ if err := txn.Insert(Upstream, u); err != nil {
+ return err
+ }
+ }
+ txn.Commit()
+ return nil
+}
+
+var upstreamSchema = &memdb.TableSchema{
+ Name: Upstream,
+ Indexes: map[string]*memdb.IndexSchema{
+ "id": {
+ Name: "id",
+ Unique: true,
+ Indexer: &memdb.StringFieldIndex{Field: "FullName"},
+ },
+ "name": {
+ Name: "name",
+ Unique: true,
+ Indexer: &memdb.StringFieldIndex{Field: "Name"},
+ AllowMissing: true,
+ },
+ },
+}
+
+//func indexer() *memdb.CompoundMultiIndex{
+// var idx = make([]memdb.Indexer, 0)
+// idx = append(idx, &memdb.StringFieldIndex{Field: "Group"})
+// idx = append(idx, &memdb.StringFieldIndex{Field: "Name"})
+// return &memdb.CompoundMultiIndex{Indexes: idx, AllowMissing: false}
+//}
diff --git a/pkg/seven/state/builder.go b/pkg/seven/state/builder.go
new file mode 100644
index 0000000..a300b55
--- /dev/null
+++ b/pkg/seven/state/builder.go
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package state
+
+import (
+ "strconv"
+ "strings"
+
+ "github.com/golang/glog"
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+ "github.com/api7/ingress-controller/pkg/seven/apisix"
+ "github.com/api7/ingress-controller/pkg/seven/db"
+ "github.com/api7/ingress-controller/pkg/seven/utils"
+)
+
+const (
+ ApisixUpstream = "ApisixUpstream"
+ WatchFromKind = "watch"
+)
+
+//// InitDB insert object into memDB first time
+//func InitDB(){
+// routes, _ := apisix.ListRoute()
+// upstreams, _ := apisix.ListUpstream()
+// apisix.InsertRoute(routes)
+// apisix.InsertUpstreams(upstreams)
+//}
+//
+//// LoadTargetState load targetState from ... maybe k8s CRD
+//func LoadTargetState(routes []*v1.Route, upstreams []*v1.Upstream){
+//
+// // 1.diff
+// // 2.send event
+//}
+
+// paddingRoute padding route from memDB
+func paddingRoute(route *v1.Route, currentRoute *v1.Route) {
+ // padding object, just id
+ if currentRoute == nil {
+ // NOT FOUND : set Id = 0
+ id := strconv.Itoa(0)
+ route.ID = &id
+ } else {
+ route.ID = currentRoute.ID
+ }
+}
+
+// padding service from memDB
+func paddingService(service *v1.Service, currentService *v1.Service) {
+ if currentService == nil {
+ id := strconv.Itoa(0)
+ service.ID = &id
+ } else {
+ service.ID = currentService.ID
+ }
+}
+
+// paddingUpstream padding upstream from memDB
+func paddingUpstream(upstream *v1.Upstream, currentUpstream *v1.Upstream) {
+ // padding id
+ if currentUpstream == nil {
+ // NOT FOUND : set Id = 0
+ id := strconv.Itoa(0)
+ upstream.ID = &id
+ } else {
+ upstream.ID = currentUpstream.ID
+ }
+ // todo padding nodes ? or sync nodes from crd ?
+}
+
+// NewRouteWorkers make routeWrokers group by service per CRD
+// 1.make routes group by (1_2_3) it may be a map like map[1_2_3][]Route;
+// 2.route is listenning Event from the ready of 1_2_3;
+func NewRouteWorkers(routes []*v1.Route) RouteWorkerGroup {
+ rwg := make(RouteWorkerGroup)
+ for _, r := range routes {
+ quit := make(chan Quit)
+ rw := &routeWorker{Route: r, Quit: quit}
+ rw.start()
+ rwg.Add(*r.ServiceName, rw)
+ }
+ return rwg
+}
+
+// 3.route get the Event and trigger a padding for object,then diff,sync;
+func (r *routeWorker) trigger(event Event) error {
+ defer close(r.Quit)
+ // consumer Event
+ service := event.Obj.(*v1.Service)
+ r.ServiceId = service.ID
+ glog.V(2).Infof("trigger routeWorker %s from %s, %s", *r.Name, event.Op, *service.Name)
+
+ // padding
+ currentRoute, _ := apisix.FindCurrentRoute(r.Route)
+ paddingRoute(r.Route, currentRoute)
+ // diff
+ hasDiff, err := utils.HasDiff(r.Route, currentRoute)
+ // sync
+ if err != nil {
+ return err
+ }
+ if hasDiff {
+ r.sync()
+ }
+ // todo broadcast
+
+ return nil
+}
+
+// sync
+func (r *routeWorker) sync() {
+ if *r.Route.ID != strconv.Itoa(0) {
+ // 1. sync memDB
+ db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
+ if err := db.UpdateRoute(); err != nil {
+ glog.Errorf("update route failed, route: %#v, err: %+v", r.Route, err)
+ return
+ }
+ // 2. sync apisix
+ apisix.UpdateRoute(r.Route)
+ glog.V(2).Infof("update route %s, %s", *r.Name, *r.ServiceId)
+ } else {
+ // 1. sync apisix and get id
+ if res, err := apisix.AddRoute(r.Route); err != nil {
+ glog.Errorf("add route failed, route: %#v, err: %+v", r.Route, err)
+ return
+ } else {
+ key := res.Route.Key
+ tmp := strings.Split(*key, "/")
+ *r.ID = tmp[len(tmp)-1]
+ }
+ // 2. sync memDB
+ db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
+ db.Insert()
+ glog.V(2).Infof("create route %s, %s", *r.Name, *r.ServiceId)
+ }
+}
+
+// service
+func NewServiceWorkers(services []*v1.Service, rwg *RouteWorkerGroup) ServiceWorkerGroup {
+ swg := make(ServiceWorkerGroup)
+ for _, s := range services {
+ quit := make(chan Quit)
+ rw := &serviceWorker{Service: s, Quit: quit}
+ rw.start(rwg)
+ swg.Add(*s.UpstreamName, rw)
+ }
+ return swg
+}
+
+// upstream
+func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup) {
+ for _, u := range upstreams {
+ op := Update
+ if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group, *u.Name, *u.FullName); err != nil {
+ glog.Errorf("solver upstream failed, find upstream from etcd failed, upstream: %+v, err: %+v", u, err)
+ return
+ } else {
+ paddingUpstream(u, currentUpstream)
+ // diff
+ hasDiff, _ := utils.HasDiff(u, currentUpstream)
+ if hasDiff {
+ if *u.ID != strconv.Itoa(0) {
+ op = Update
+ // 0.field check
+ needToUpdate := true
+ if currentUpstream.FromKind != nil && *(currentUpstream.FromKind) == ApisixUpstream { // update from ApisixUpstream
+ if u.FromKind == nil || (u.FromKind != nil && *(u.FromKind) != ApisixUpstream) {
+ // currentUpstream > u
+ // set lb && health check
+ needToUpdate = false
+ }
+ }
+ if needToUpdate {
+ // 1.sync memDB
+ upstreamDB := &db.UpstreamDB{Upstreams: []*v1.Upstream{u}}
+ if err := upstreamDB.UpdateUpstreams(); err != nil {
+ glog.Errorf("solver upstream failed, update upstream to local db failed, err: %s", err.Error())
+ return
+ }
+ // 2.sync apisix
+ if err = apisix.UpdateUpstream(u); err != nil {
+ glog.Errorf("solver upstream failed, update upstream to etcd failed, err: %+v", err)
+ return
+ }
+ }
+ // if fromKind == WatchFromKind
+ if u.FromKind != nil && *u.FromKind == WatchFromKind {
+ // 1.update nodes
+ if err = apisix.PatchNodes(u, u.Nodes); err != nil {
+ glog.Errorf("solver upstream failed, patch node info to etcd failed, err: %+v", err)
+ return
+ }
+ // 2. sync memDB
+ us := []*v1.Upstream{u}
+ if !needToUpdate {
+ currentUpstream.Nodes = u.Nodes
+ us = []*v1.Upstream{currentUpstream}
+ }
+ upstreamDB := &db.UpstreamDB{Upstreams: us}
+ if err := upstreamDB.UpdateUpstreams(); err != nil {
+ glog.Errorf("solver upstream failed, update upstream to local db failed, err: %s", err.Error())
+ return
+ }
+ }
+ } else {
+ op = Create
+ // 1.sync apisix and get response
+ if upstreamResponse, err := apisix.AddUpstream(u); err != nil {
+ glog.Errorf("solver upstream failed, update upstream to etcd failed, err: %+v", err)
+ return
+ } else {
+ tmp := strings.Split(*upstreamResponse.Upstream.Key, "/")
+ *u.ID = tmp[len(tmp)-1]
+ }
+ // 2.sync memDB
+ //apisix.InsertUpstreams([]*v1.Upstream{u})
+ upstreamDB := &db.UpstreamDB{Upstreams: []*v1.Upstream{u}}
+ upstreamDB.InsertUpstreams()
+ }
+ }
+ }
+ glog.V(2).Infof("solver upstream %s:%s", op, *u.Name)
+ // anyway, broadcast to service
+ serviceWorkers := swg[*u.Name]
+ for _, sw := range serviceWorkers {
+ event := &Event{Kind: UpstreamKind, Op: op, Obj: u}
+ sw.Event <- *event
+ }
+ }
+}
diff --git a/pkg/ingress/apisix/plugin.go b/pkg/seven/state/builder_test.go
similarity index 50%
copy from pkg/ingress/apisix/plugin.go
copy to pkg/seven/state/builder_test.go
index 4feb097..7550236 100644
--- a/pkg/ingress/apisix/plugin.go
+++ b/pkg/seven/state/builder_test.go
@@ -12,40 +12,40 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package apisix
+package state
import (
- "github.com/gxthrj/seven/apisix"
- "strconv"
+ "testing"
+
+ "github.com/api7/ingress-controller/pkg/seven/utils"
)
-type CorsYaml struct {
- Enable bool `json:"enable,omitempty"`
- AllowOrigin string `json:"allow_origin,omitempty"`
- AllowHeaders string `json:"allow_headers,omitempty"`
- AllowMethods string `json:"allow_methods,omitempty"`
+type school struct {
+ *province
+ Name string `json:"name"`
+ Address string `json:"address"`
}
-func (c *CorsYaml) SetEnable(enable string) {
- if b, err := strconv.ParseBool(enable); err != nil {
- c.Enable = false
- } else {
- c.Enable = b
- }
+type province struct {
+ Location string `json:"location"`
}
-func (c *CorsYaml) SetOrigin(origin string) {
- c.AllowOrigin = origin
-}
+func Test_diff(t *testing.T) {
+ //p1 := &province{Location: "jiangsu"}
+ p2 := &province{Location: "zh"}
+ s1 := &school{Name: "hello", Address: "this is a address"}
+ s2 := &school{Name: "hello", Address: "this is a address", province: p2}
+ t.Log(s1)
+ t.Log(s2)
+ if d, err := utils.Diff(s1, s2); err != nil {
+ t.Log(err.Error())
+ } else {
+ //t.Logf("s1 vs s2 hasDiff ? %v", d)
+ t.Log(d)
+ for _, delta := range d.Deltas() {
+ t.Log(delta.Similarity())
+ }
-func (c *CorsYaml) SetHeaders(headers string) {
- c.AllowHeaders = headers
-}
-func (c *CorsYaml) SetMethods(methods string) {
- c.AllowMethods = methods
-}
+ }
-func (c *CorsYaml) Build() *apisix.Cors {
- maxAge := int64(3600)
- return apisix.BuildCors(c.Enable, &c.AllowOrigin, &c.AllowHeaders, &c.AllowMethods, &maxAge)
}
diff --git a/pkg/seven/state/diff.go b/pkg/seven/state/diff.go
new file mode 100644
index 0000000..2de068d
--- /dev/null
+++ b/pkg/seven/state/diff.go
@@ -0,0 +1,15 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package state
diff --git a/pkg/ingress/apisix/plugin.go b/pkg/seven/state/event.go
similarity index 50%
copy from pkg/ingress/apisix/plugin.go
copy to pkg/seven/state/event.go
index 4feb097..026b99e 100644
--- a/pkg/ingress/apisix/plugin.go
+++ b/pkg/seven/state/event.go
@@ -12,40 +12,38 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package apisix
+package state
import (
- "github.com/gxthrj/seven/apisix"
- "strconv"
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
)
-type CorsYaml struct {
- Enable bool `json:"enable,omitempty"`
- AllowOrigin string `json:"allow_origin,omitempty"`
- AllowHeaders string `json:"allow_headers,omitempty"`
- AllowMethods string `json:"allow_methods,omitempty"`
+type ApisixCombination struct {
+ Routes []*v1.Route
+ Services []*v1.Service
+ Upstreams []*v1.Upstream
}
-func (c *CorsYaml) SetEnable(enable string) {
- if b, err := strconv.ParseBool(enable); err != nil {
- c.Enable = false
- } else {
- c.Enable = b
- }
+type RouteCompare struct {
+ OldRoutes []*v1.Route
+ NewRoutes []*v1.Route
}
-func (c *CorsYaml) SetOrigin(origin string) {
- c.AllowOrigin = origin
+type Quit struct {
+ Name string
}
-func (c *CorsYaml) SetHeaders(headers string) {
- c.AllowHeaders = headers
-}
-func (c *CorsYaml) SetMethods(methods string) {
- c.AllowMethods = methods
-}
+const (
+ RouteKind = "route"
+ ServiceKind = "service"
+ UpstreamKind = "upstream"
+ Create = "create"
+ Update = "update"
+ Delete = "delete"
+)
-func (c *CorsYaml) Build() *apisix.Cors {
- maxAge := int64(3600)
- return apisix.BuildCors(c.Enable, &c.AllowOrigin, &c.AllowHeaders, &c.AllowMethods, &maxAge)
+type Event struct {
+ Kind string // route/service/upstream
+ Op string // create update delete
+ Obj interface{} // the obj of kind
}
diff --git a/pkg/seven/state/route_worker.go b/pkg/seven/state/route_worker.go
new file mode 100644
index 0000000..1ee0c31
--- /dev/null
+++ b/pkg/seven/state/route_worker.go
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package state
+
+import "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+type routeWorker struct {
+ *v1.Route
+ Event chan Event
+ Quit chan Quit
+}
+
+// RouteWorkerGroup for broadcast from service to route
+type RouteWorkerGroup map[string][]*routeWorker
+
+// start start watch event
+func (w *routeWorker) start() {
+ w.Event = make(chan Event)
+ go func() {
+ for {
+ select {
+ case event := <-w.Event:
+ w.trigger(event)
+ case <-w.Quit:
+ return
+ }
+ }
+ }()
+}
+
+func (rg *RouteWorkerGroup) Add(key string, rw *routeWorker) {
+ routes := (*rg)[key]
+ if routes == nil {
+ routes = make([]*routeWorker, 0)
+ }
+ routes = append(routes, rw)
+ (*rg)[key] = routes
+}
+
+func (rg *RouteWorkerGroup) Delete(key string, route *routeWorker) {
+ routes := (*rg)[key]
+ result := make([]*routeWorker, 0)
+ for _, r := range routes {
+ if r.Name != route.Name {
+ result = append(result, r)
+ }
+ }
+ (*rg)[key] = result
+}
diff --git a/pkg/seven/state/service_worker.go b/pkg/seven/state/service_worker.go
new file mode 100644
index 0000000..20e173f
--- /dev/null
+++ b/pkg/seven/state/service_worker.go
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package state
+
+import (
+ "strconv"
+ "strings"
+
+ "github.com/golang/glog"
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+ "github.com/api7/ingress-controller/pkg/seven/apisix"
+ "github.com/api7/ingress-controller/pkg/seven/db"
+ "github.com/api7/ingress-controller/pkg/seven/utils"
+)
+
+const ApisixService = "ApisixService"
+
+type serviceWorker struct {
+ *v1.Service
+ Event chan Event
+ Quit chan Quit
+}
+
+// ServiceWorkerGroup for broadcast from upstream to service
+type ServiceWorkerGroup map[string][]*serviceWorker
+
+// start start watch event
+func (w *serviceWorker) start(rwg *RouteWorkerGroup) {
+ w.Event = make(chan Event)
+ go func() {
+ for {
+ select {
+ case event := <-w.Event:
+ w.trigger(event, rwg)
+ case <-w.Quit:
+ return
+ }
+ }
+ }()
+}
+
+// trigger add to queue
+func (w *serviceWorker) trigger(event Event, rwg *RouteWorkerGroup) error {
+ glog.V(2).Infof("1.service trigger from %s, %s", event.Op, event.Kind)
+ defer close(w.Quit)
+ // consumer Event set upstreamID
+ upstream := event.Obj.(*v1.Upstream)
+ glog.V(2).Infof("2.service trigger from %s, %s", event.Op, *upstream.Name)
+
+ w.UpstreamId = upstream.ID
+ // add to queue
+ services := []*v1.Service{w.Service}
+ sqo := &ServiceQueueObj{Services: services, RouteWorkerGroup: *rwg}
+ sqo.AddQueue()
+
+ //op := Update
+ //// padding
+ //currentService, _ := apisix.FindCurrentService(*w.Service.Name)
+ //paddingService(w.Service, currentService)
+ //// diff
+ //hasDiff, err := utils.HasDiff(w.Service, currentService)
+ //// sync
+ //if err != nil {
+ // return err
+ //}
+ //if hasDiff {
+ // if *w.Service.ID == strconv.Itoa(0) {
+ // op = Create
+ // // 1. sync apisix and get id
+ // if serviceResponse, err := apisix.AddService(w.Service, conf.BaseUrl); err != nil {
+ // // todo log error
+ // glog.Info(err.Error())
+ // }else {
+ // tmp := strings.Split(*serviceResponse.Service.Key, "/")
+ // *w.Service.ID = tmp[len(tmp) - 1]
+ // }
+ // // 2. sync memDB
+ // db := &db.ServiceDB{Services: []*v1.Service{w.Service}}
+ // db.Insert()
+ // glog.Infof("create service %s, %s", *w.Name, *w.UpstreamId)
+ // }else {
+ // op = Update
+ // // 1. sync memDB
+ // db := db.ServiceDB{Services: []*v1.Service{w.Service}}
+ // if err := db.UpdateService(); err != nil {
+ // // todo log error
+ // }
+ // // 2. sync apisix
+ // apisix.UpdateService(w.Service, conf.BaseUrl)
+ // glog.Infof("update service %s, %s", *w.Name, *w.UpstreamId)
+ // }
+ //}
+ //// broadcast to route
+ //routeWorkers := (*rwg)[*w.Service.Name]
+ //for _, rw := range routeWorkers{
+ // event := &Event{Kind: ServiceKind, Op: op, Obj: w.Service}
+ // glog.Infof("send event %s, %s, %s", event.Kind, event.Op, *w.Service.Name)
+ // rw.Event <- *event
+ //}
+ return nil
+}
+
+func SolverService(services []*v1.Service, rwg RouteWorkerGroup) error {
+ for _, svc := range services {
+ op := Update
+ // padding
+ currentService, _ := apisix.FindCurrentService(*svc.Group, *svc.Name, *svc.FullName)
+ paddingService(svc, currentService)
+ // diff
+ hasDiff, err := utils.HasDiff(svc, currentService)
+ // sync
+ if err != nil {
+ return err
+ }
+ if hasDiff {
+ if *svc.ID == strconv.Itoa(0) {
+ op = Create
+ // 1. sync apisix and get id
+ if serviceResponse, err := apisix.AddService(svc); err != nil {
+ // todo log error
+ glog.V(2).Info(err.Error())
+ } else {
+ tmp := strings.Split(*serviceResponse.Service.Key, "/")
+ *svc.ID = tmp[len(tmp)-1]
+ }
+ // 2. sync memDB
+ db := &db.ServiceDB{Services: []*v1.Service{svc}}
+ db.Insert()
+ glog.V(2).Infof("create service %s, %s", *svc.Name, *svc.UpstreamId)
+ } else {
+ op = Update
+ needToUpdate := true
+ if currentService.FromKind != nil && *(currentService.FromKind) == ApisixService { // update from ApisixUpstream
+ if svc.FromKind == nil || (svc.FromKind != nil && *(svc.FromKind) != ApisixService) {
+ // currentService > svc
+ // set lb && health check
+ needToUpdate = false
+ }
+ }
+ if needToUpdate {
+ // 1. sync memDB
+ db := db.ServiceDB{Services: []*v1.Service{svc}}
+ if err := db.UpdateService(); err != nil {
+ // todo log error
+ }
+ // 2. sync apisix
+ apisix.UpdateService(svc)
+ glog.V(2).Infof("update service %s, %s", *svc.Name, *svc.UpstreamId)
+ }
+
+ }
+ }
+ // broadcast to route
+ routeWorkers := rwg[*svc.Name]
+ for _, rw := range routeWorkers {
+ event := &Event{Kind: ServiceKind, Op: op, Obj: svc}
+ glog.V(2).Infof("send event %s, %s, %s", event.Kind, event.Op, *svc.Name)
+ rw.Event <- *event
+ }
+ }
+ return nil
+}
+
+func (swg *ServiceWorkerGroup) Add(key string, s *serviceWorker) {
+ sws := (*swg)[key]
+ if sws == nil {
+ sws = make([]*serviceWorker, 0)
+ }
+ sws = append(sws, s)
+ (*swg)[key] = sws
+}
+
+func (swg *ServiceWorkerGroup) Delete(key string, s *serviceWorker) {
+ sws := (*swg)[key]
+ result := make([]*serviceWorker, 0)
+ for _, r := range sws {
+ if r.Name != s.Name {
+ result = append(result, r)
+ }
+ }
+ (*swg)[key] = result
+}
diff --git a/pkg/seven/state/solver.go b/pkg/seven/state/solver.go
new file mode 100644
index 0000000..392680a
--- /dev/null
+++ b/pkg/seven/state/solver.go
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package state
+
+import (
+ "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+ "github.com/api7/ingress-controller/pkg/seven/apisix"
+ "github.com/api7/ingress-controller/pkg/seven/db"
+)
+
+var UpstreamQueue chan UpstreamQueueObj
+var ServiceQueue chan ServiceQueueObj
+
+func init() {
+ UpstreamQueue = make(chan UpstreamQueueObj, 500)
+ ServiceQueue = make(chan ServiceQueueObj, 500)
+ go WatchUpstream()
+ go WatchService()
+}
+
+func WatchService() {
+ for {
+ sqo := <-ServiceQueue
+ // solver service
+ SolverService(sqo.Services, sqo.RouteWorkerGroup)
+ }
+}
+
+func WatchUpstream() {
+ for {
+ uqo := <-UpstreamQueue
+ SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup)
+ }
+}
+
+// Solver
+func (s *ApisixCombination) Solver() (bool, error) {
+ // 1.route workers
+ rwg := NewRouteWorkers(s.Routes)
+ // 2.service workers
+ swg := NewServiceWorkers(s.Services, &rwg)
+ //sqo := &ServiceQueueObj{Services: s.Services, RouteWorkerGroup: rwg}
+ //sqo.AddQueue()
+ // 3.upstream workers
+ uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: swg}
+ uqo.AddQueue()
+ return true, nil
+}
+
+// UpstreamQueueObj for upstream queue
+type UpstreamQueueObj struct {
+ Upstreams []*v1.Upstream
+ ServiceWorkerGroup ServiceWorkerGroup
+}
+
+// AddQueue make upstreams in order
+// upstreams is group by CRD
+func (uqo *UpstreamQueueObj) AddQueue() {
+ UpstreamQueue <- *uqo
+}
+
+type ServiceQueueObj struct {
+ Services []*v1.Service
+ RouteWorkerGroup RouteWorkerGroup
+}
+
+// AddQueue make upstreams in order
+// upstreams is group by CRD
+func (sqo *ServiceQueueObj) AddQueue() {
+ ServiceQueue <- *sqo
+}
+
+// Sync remove from apisix
+func (rc *RouteCompare) Sync() error {
+ for _, old := range rc.OldRoutes {
+ needToDel := true
+ for _, nr := range rc.NewRoutes {
+ if old.Name == nr.Name {
+ needToDel = false
+ break
+ }
+ }
+ if needToDel {
+ fullName := *old.Name
+ if *old.Group != "" {
+ fullName = *old.Group + "_" + *old.Name
+ }
+ request := db.RouteRequest{Name: *old.Name, FullName: fullName}
+
+ if route, err := request.FindByName(); err != nil {
+ // log error
+ } else {
+ if err = apisix.DeleteRoute(route); err == nil {
+ db := db.RouteDB{Routes: []*v1.Route{route}}
+ db.DeleteRoute()
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func SyncSsl(ssl *v1.Ssl, method string) error {
+ switch method {
+ case Create:
+ _, err := apisix.AddOrUpdateSsl(ssl)
+ return err
+ case Update:
+ _, err := apisix.AddOrUpdateSsl(ssl)
+ return err
+ case Delete:
+ err := apisix.DeleteSsl(ssl)
+ return err
+ }
+ return nil
+}
diff --git a/pkg/seven/state/sync.go b/pkg/seven/state/sync.go
new file mode 100644
index 0000000..2de068d
--- /dev/null
+++ b/pkg/seven/state/sync.go
@@ -0,0 +1,15 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package state
diff --git a/pkg/seven/state/upstream_worker.go b/pkg/seven/state/upstream_worker.go
new file mode 100644
index 0000000..3f33ce6
--- /dev/null
+++ b/pkg/seven/state/upstream_worker.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package state
+
+import "github.com/gxthrj/apisix-types/pkg/apis/apisix/v1"
+
+type upstreamWorker struct {
+ *v1.Upstream
+ Event chan Event
+ Quit chan Quit
+}
diff --git a/pkg/ingress/apisix/plugin.go b/pkg/seven/utils/diff.go
similarity index 50%
copy from pkg/ingress/apisix/plugin.go
copy to pkg/seven/utils/diff.go
index 4feb097..10a74ea 100644
--- a/pkg/ingress/apisix/plugin.go
+++ b/pkg/seven/utils/diff.go
@@ -12,40 +12,47 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package apisix
+package utils
import (
- "github.com/gxthrj/seven/apisix"
- "strconv"
+ "encoding/json"
+ "github.com/golang/glog"
+ "github.com/yudai/gojsondiff"
)
-type CorsYaml struct {
- Enable bool `json:"enable,omitempty"`
- AllowOrigin string `json:"allow_origin,omitempty"`
- AllowHeaders string `json:"allow_headers,omitempty"`
- AllowMethods string `json:"allow_methods,omitempty"`
-}
+var (
+ differ = gojsondiff.New()
+)
-func (c *CorsYaml) SetEnable(enable string) {
- if b, err := strconv.ParseBool(enable); err != nil {
- c.Enable = false
+func HasDiff(a, b interface{}) (bool, error) {
+ aJSON, err := json.Marshal(a)
+ if err != nil {
+ return false, err
+ }
+ bJSON, err := json.Marshal(b)
+ if err != nil {
+ return false, err
+ }
+ if d, err := differ.Compare(aJSON, bJSON); err != nil {
+ return false, err
} else {
- c.Enable = b
+ glog.V(2).Info(d.Deltas())
+ return d.Modified(), nil
}
}
-func (c *CorsYaml) SetOrigin(origin string) {
- c.AllowOrigin = origin
-}
-
-func (c *CorsYaml) SetHeaders(headers string) {
- c.AllowHeaders = headers
-}
-func (c *CorsYaml) SetMethods(methods string) {
- c.AllowMethods = methods
-}
-
-func (c *CorsYaml) Build() *apisix.Cors {
- maxAge := int64(3600)
- return apisix.BuildCors(c.Enable, &c.AllowOrigin, &c.AllowHeaders, &c.AllowMethods, &maxAge)
+func Diff(a, b interface{}) (gojsondiff.Diff, error) {
+ aJSON, err := json.Marshal(a)
+ if err != nil {
+ return nil, err
+ }
+ bJSON, err := json.Marshal(b)
+ if err != nil {
+ return nil, err
+ }
+ if d, err := differ.Compare(aJSON, bJSON); err != nil {
+ return nil, err
+ } else {
+ return d, nil
+ }
}
diff --git a/pkg/seven/utils/http.go b/pkg/seven/utils/http.go
new file mode 100644
index 0000000..9b5a76a
--- /dev/null
+++ b/pkg/seven/utils/http.go
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package utils
+
+import (
+ "fmt"
+ "net/http"
+ "time"
+
+ "gopkg.in/resty.v1"
+)
+
+const timeout = 3000
+
+func Post(url string, bytes []byte) ([]byte, error) {
+ r := resty.New().
+ SetTimeout(time.Duration(timeout)*time.Millisecond).
+ R().
+ SetHeader("content-type", "application/json")
+ r.SetBody(bytes)
+ resp, err := r.Post(url)
+ if err != nil {
+ return nil, err
+ }
+ if resp.StatusCode() != http.StatusOK && resp.StatusCode() != http.StatusCreated {
+ return nil, fmt.Errorf("status: %d, body: %s", resp.StatusCode(), resp.Body())
+ }
+ return resp.Body(), nil
+}
+
+func Put(url string, bytes []byte) ([]byte, error) {
+ r := resty.New().
+ SetTimeout(time.Duration(timeout)*time.Millisecond).
+ R().
+ SetHeader("content-type", "application/json")
+ r.SetBody(bytes)
+ resp, err := r.Put(url)
+ if err != nil {
+ return nil, err
+ }
+ if resp.StatusCode() != http.StatusOK && resp.StatusCode() != http.StatusCreated {
+ return nil, fmt.Errorf("status: %d, body: %s", resp.StatusCode(), resp.Body())
+ }
+ return resp.Body(), nil
+}
+
+func Patch(url string, bytes []byte) ([]byte, error) {
+ r := resty.New().
+ SetTimeout(time.Duration(timeout)*time.Millisecond).
+ R().
+ SetHeader("content-type", "application/json")
+ r.SetBody(bytes)
+ resp, err := r.Patch(url)
+ if err != nil {
+ return nil, err
+ }
+ if resp.StatusCode() != http.StatusOK && resp.StatusCode() != http.StatusCreated {
+ return nil, fmt.Errorf("status: %d, body: %s", resp.StatusCode(), resp.Body())
+ }
+ return resp.Body(), nil
+}
+
+func Delete(url string) ([]byte, error) {
+ r := resty.New().
+ SetTimeout(time.Duration(timeout)*time.Millisecond).
+ R().
+ SetHeader("content-type", "application/json")
+ resp, err := r.Delete(url)
+ if err != nil {
+ return nil, err
+ }
+ if resp.StatusCode() != http.StatusOK && resp.StatusCode() != http.StatusNotFound {
+ return nil, fmt.Errorf("status: %d, body: %s", resp.StatusCode(), resp.Body())
+ }
+ return resp.Body(), nil
+}
diff --git a/pkg/seven/utils/types.go b/pkg/seven/utils/types.go
new file mode 100644
index 0000000..40863de
--- /dev/null
+++ b/pkg/seven/utils/types.go
@@ -0,0 +1,15 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package utils