You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/07/05 07:19:37 UTC
[apisix-go-plugin-runner] branch master updated: fix: we can't
implement limit-req now (#17)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git
The following commit(s) were added to refs/heads/master by this push:
new 746d738 fix: we can't implement limit-req now (#17)
746d738 is described below
commit 746d738ebab2d474d3646fad737a51c76e2de159
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Mon Jul 5 15:19:26 2021 +0800
fix: we can't implement limit-req now (#17)
---
cmd/go-runner/main.go | 1 +
cmd/go-runner/plugins/limit_req.go | 77 ---------------------------------
cmd/go-runner/plugins/limit_req_test.go | 70 ------------------------------
internal/plugin/plugin.go | 2 +
internal/server/server.go | 13 +++++-
pkg/http/http.go | 6 +--
pkg/runner/runner.go | 2 +-
7 files changed, 16 insertions(+), 155 deletions(-)
diff --git a/cmd/go-runner/main.go b/cmd/go-runner/main.go
index 109e219..ea2f5c2 100644
--- a/cmd/go-runner/main.go
+++ b/cmd/go-runner/main.go
@@ -26,6 +26,7 @@ import (
"github.com/thediveo/enumflag"
"go.uber.org/zap/zapcore"
+ _ "github.com/apache/apisix-go-plugin-runner/cmd/go-runner/plugins"
"github.com/apache/apisix-go-plugin-runner/pkg/log"
"github.com/apache/apisix-go-plugin-runner/pkg/runner"
)
diff --git a/cmd/go-runner/plugins/limit_req.go b/cmd/go-runner/plugins/limit_req.go
deleted file mode 100644
index 0a8320f..0000000
--- a/cmd/go-runner/plugins/limit_req.go
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package plugins
-
-import (
- "encoding/json"
- "net/http"
- "time"
-
- "golang.org/x/time/rate"
-
- pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
- "github.com/apache/apisix-go-plugin-runner/pkg/log"
- "github.com/apache/apisix-go-plugin-runner/pkg/plugin"
-)
-
-func init() {
- err := plugin.RegisterPlugin(&LimitReq{})
- if err != nil {
- log.Fatalf("failed to register plugin limit-req: %s", err)
- }
-}
-
-// LimitReq is a demo for a real world plugin
-type LimitReq struct {
-}
-
-type LimitReqConf struct {
- Burst int `json:"burst"`
- Rate float64 `json:"rate"`
-
- limiter *rate.Limiter
-}
-
-func (p *LimitReq) Name() string {
- return "limit-req"
-}
-
-// ParseConf is called when the configuration is changed. And its output is unique per route.
-func (p *LimitReq) ParseConf(in []byte) (interface{}, error) {
- conf := LimitReqConf{}
- err := json.Unmarshal(in, &conf)
- if err != nil {
- return nil, err
- }
-
- limiter := rate.NewLimiter(rate.Limit(conf.Rate), conf.Burst)
- // the conf can be used to store route scope data
- conf.limiter = limiter
- return conf, nil
-}
-
-// Filter is called when a request hits the route
-func (p *LimitReq) Filter(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) {
- li := conf.(LimitReqConf).limiter
- rs := li.Reserve()
- if !rs.OK() {
- // limit rate exceeded
- log.Infof("limit req rate exceeded")
- // stop filters with this response
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
- time.Sleep(rs.Delay())
-}
diff --git a/cmd/go-runner/plugins/limit_req_test.go b/cmd/go-runner/plugins/limit_req_test.go
deleted file mode 100644
index baab79b..0000000
--- a/cmd/go-runner/plugins/limit_req_test.go
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package plugins
-
-import (
- "net/http"
- "net/http/httptest"
- "sync"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestLimitReq(t *testing.T) {
- in := []byte(`{"rate":5,"burst":1}`)
- lr := &LimitReq{}
- conf, err := lr.ParseConf(in)
- assert.Nil(t, err)
-
- start := time.Now()
- n := 6
- var wg sync.WaitGroup
- res := make([]*http.Response, n)
- for i := 0; i < n; i++ {
- wg.Add(1)
- go func(i int) {
- w := httptest.NewRecorder()
- lr.Filter(conf, w, nil)
- resp := w.Result()
- res[i] = resp
- wg.Done()
- }(i)
- }
- wg.Wait()
-
- rejectN := 0
- for _, r := range res {
- if r.StatusCode == 503 {
- rejectN++
- }
- }
- assert.Equal(t, 0, rejectN)
- t.Logf("Start: %v, now: %v", start, time.Now())
- assert.True(t, time.Now().Sub(start) >= 1*time.Second)
-}
-
-func TestLimitReq_YouShouldNotPass(t *testing.T) {
- in := []byte(`{}`)
- lr := &LimitReq{}
- conf, err := lr.ParseConf(in)
- assert.Nil(t, err)
-
- w := httptest.NewRecorder()
- lr.Filter(conf, w, nil)
- resp := w.Result()
- assert.Equal(t, 503, resp.StatusCode)
-}
diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go
index 9b88ca9..ce8a620 100644
--- a/internal/plugin/plugin.go
+++ b/internal/plugin/plugin.go
@@ -59,6 +59,8 @@ var (
)
func RegisterPlugin(name string, pc ParseConfFunc, sv FilterFunc) error {
+ log.Infof("register plugin %s", name)
+
if name == "" {
return ErrMissingName
}
diff --git a/internal/server/server.go b/internal/server/server.go
index 6e3ca58..5ac6dc6 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -26,6 +26,7 @@ import (
"syscall"
"time"
+ "github.com/ReneKroon/ttlcache/v2"
flatbuffers "github.com/google/flatbuffers/go"
"github.com/apache/apisix-go-plugin-runner/internal/plugin"
@@ -67,12 +68,17 @@ func readErr(n int, err error, required int) bool {
func writeErr(n int, err error) {
if err != nil {
+ // TODO: solve "write: broken pipe" with context
log.Errorf("write: %s", err)
}
}
func generateErrorReport(err error) (ty byte, out []byte) {
- log.Errorf("%s", err)
+ if err == ttlcache.ErrNotFound {
+ log.Warnf("%s", err)
+ } else {
+ log.Errorf("%s", err)
+ }
ty = RPCError
bd := ReportError(err)
@@ -220,13 +226,16 @@ func Run() {
go func() {
for {
+ conn, err := l.Accept()
+
select {
case <-done:
+ // don't report the "use of closed network connection" error when the server
+ // is exiting.
return
default:
}
- conn, err := l.Accept()
if err != nil {
log.Errorf("accept: %s", err)
continue
diff --git a/pkg/http/http.go b/pkg/http/http.go
index cf00132..b5d79e6 100644
--- a/pkg/http/http.go
+++ b/pkg/http/http.go
@@ -26,17 +26,13 @@ import (
//
// 1. We need to record any change to the request headers. As the Request.Header
// is not an interface, there is not way to inject our special tracker.
+//
// 2. As the author of fasthttp pointed out, "headers are stored in a map[string][]string.
// So the server must parse all the headers, ...". The official API is suboptimal, which
// is even worse in our case as it is not a real HTTP server.
type Request interface {
// ID returns the request id
ID() uint32
- // ConfToken returns the token represents the configuration of current route.
- // Each route have its unique token, so we can use it to distinguish different
- // route in the same plugin. When the configuration of a route changed, the token
- // will change too.
- ConfToken() uint32
// SrcIP returns the client's IP
SrcIP() net.IP
diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go
index 0405097..e5efa34 100644
--- a/pkg/runner/runner.go
+++ b/pkg/runner/runner.go
@@ -31,7 +31,7 @@ type RunnerConfig struct {
LogOutput zapcore.WriteSyncer
}
-// Run starts the runner and listen the socket
+// Run starts the runner and listen the socket configured by environment variable "APISIX_LISTEN_ADDRESS"
func Run(cfg RunnerConfig) {
if cfg.LogOutput == nil {
cfg.LogOutput = os.Stdout