You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/07/05 07:19:37 UTC

[apisix-go-plugin-runner] branch master updated: fix: we can't implement limit-req now (#17)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git


The following commit(s) were added to refs/heads/master by this push:
     new 746d738  fix: we can't implement limit-req now (#17)
746d738 is described below

commit 746d738ebab2d474d3646fad737a51c76e2de159
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Mon Jul 5 15:19:26 2021 +0800

    fix: we can't implement limit-req now (#17)
---
 cmd/go-runner/main.go                   |  1 +
 cmd/go-runner/plugins/limit_req.go      | 77 ---------------------------------
 cmd/go-runner/plugins/limit_req_test.go | 70 ------------------------------
 internal/plugin/plugin.go               |  2 +
 internal/server/server.go               | 13 +++++-
 pkg/http/http.go                        |  6 +--
 pkg/runner/runner.go                    |  2 +-
 7 files changed, 16 insertions(+), 155 deletions(-)

diff --git a/cmd/go-runner/main.go b/cmd/go-runner/main.go
index 109e219..ea2f5c2 100644
--- a/cmd/go-runner/main.go
+++ b/cmd/go-runner/main.go
@@ -26,6 +26,7 @@ import (
 	"github.com/thediveo/enumflag"
 	"go.uber.org/zap/zapcore"
 
+	_ "github.com/apache/apisix-go-plugin-runner/cmd/go-runner/plugins"
 	"github.com/apache/apisix-go-plugin-runner/pkg/log"
 	"github.com/apache/apisix-go-plugin-runner/pkg/runner"
 )
diff --git a/cmd/go-runner/plugins/limit_req.go b/cmd/go-runner/plugins/limit_req.go
deleted file mode 100644
index 0a8320f..0000000
--- a/cmd/go-runner/plugins/limit_req.go
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package plugins
-
-import (
-	"encoding/json"
-	"net/http"
-	"time"
-
-	"golang.org/x/time/rate"
-
-	pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
-	"github.com/apache/apisix-go-plugin-runner/pkg/log"
-	"github.com/apache/apisix-go-plugin-runner/pkg/plugin"
-)
-
-func init() {
-	err := plugin.RegisterPlugin(&LimitReq{})
-	if err != nil {
-		log.Fatalf("failed to register plugin limit-req: %s", err)
-	}
-}
-
-// LimitReq is a demo for a real world plugin
-type LimitReq struct {
-}
-
-type LimitReqConf struct {
-	Burst int     `json:"burst"`
-	Rate  float64 `json:"rate"`
-
-	limiter *rate.Limiter
-}
-
-func (p *LimitReq) Name() string {
-	return "limit-req"
-}
-
-// ParseConf is called when the configuration is changed. And its output is unique per route.
-func (p *LimitReq) ParseConf(in []byte) (interface{}, error) {
-	conf := LimitReqConf{}
-	err := json.Unmarshal(in, &conf)
-	if err != nil {
-		return nil, err
-	}
-
-	limiter := rate.NewLimiter(rate.Limit(conf.Rate), conf.Burst)
-	// the conf can be used to store route scope data
-	conf.limiter = limiter
-	return conf, nil
-}
-
-// Filter is called when a request hits the route
-func (p *LimitReq) Filter(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) {
-	li := conf.(LimitReqConf).limiter
-	rs := li.Reserve()
-	if !rs.OK() {
-		// limit rate exceeded
-		log.Infof("limit req rate exceeded")
-		// stop filters with this response
-		w.WriteHeader(http.StatusServiceUnavailable)
-		return
-	}
-	time.Sleep(rs.Delay())
-}
diff --git a/cmd/go-runner/plugins/limit_req_test.go b/cmd/go-runner/plugins/limit_req_test.go
deleted file mode 100644
index baab79b..0000000
--- a/cmd/go-runner/plugins/limit_req_test.go
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package plugins
-
-import (
-	"net/http"
-	"net/http/httptest"
-	"sync"
-	"testing"
-	"time"
-
-	"github.com/stretchr/testify/assert"
-)
-
-func TestLimitReq(t *testing.T) {
-	in := []byte(`{"rate":5,"burst":1}`)
-	lr := &LimitReq{}
-	conf, err := lr.ParseConf(in)
-	assert.Nil(t, err)
-
-	start := time.Now()
-	n := 6
-	var wg sync.WaitGroup
-	res := make([]*http.Response, n)
-	for i := 0; i < n; i++ {
-		wg.Add(1)
-		go func(i int) {
-			w := httptest.NewRecorder()
-			lr.Filter(conf, w, nil)
-			resp := w.Result()
-			res[i] = resp
-			wg.Done()
-		}(i)
-	}
-	wg.Wait()
-
-	rejectN := 0
-	for _, r := range res {
-		if r.StatusCode == 503 {
-			rejectN++
-		}
-	}
-	assert.Equal(t, 0, rejectN)
-	t.Logf("Start: %v, now: %v", start, time.Now())
-	assert.True(t, time.Now().Sub(start) >= 1*time.Second)
-}
-
-func TestLimitReq_YouShouldNotPass(t *testing.T) {
-	in := []byte(`{}`)
-	lr := &LimitReq{}
-	conf, err := lr.ParseConf(in)
-	assert.Nil(t, err)
-
-	w := httptest.NewRecorder()
-	lr.Filter(conf, w, nil)
-	resp := w.Result()
-	assert.Equal(t, 503, resp.StatusCode)
-}
diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go
index 9b88ca9..ce8a620 100644
--- a/internal/plugin/plugin.go
+++ b/internal/plugin/plugin.go
@@ -59,6 +59,8 @@ var (
 )
 
 func RegisterPlugin(name string, pc ParseConfFunc, sv FilterFunc) error {
+	log.Infof("register plugin %s", name)
+
 	if name == "" {
 		return ErrMissingName
 	}
diff --git a/internal/server/server.go b/internal/server/server.go
index 6e3ca58..5ac6dc6 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -26,6 +26,7 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/ReneKroon/ttlcache/v2"
 	flatbuffers "github.com/google/flatbuffers/go"
 
 	"github.com/apache/apisix-go-plugin-runner/internal/plugin"
@@ -67,12 +68,17 @@ func readErr(n int, err error, required int) bool {
 
 func writeErr(n int, err error) {
 	if err != nil {
+		// TODO: solve "write: broken pipe" with context
 		log.Errorf("write: %s", err)
 	}
 }
 
 func generateErrorReport(err error) (ty byte, out []byte) {
-	log.Errorf("%s", err)
+	if err == ttlcache.ErrNotFound {
+		log.Warnf("%s", err)
+	} else {
+		log.Errorf("%s", err)
+	}
 
 	ty = RPCError
 	bd := ReportError(err)
@@ -220,13 +226,16 @@ func Run() {
 
 	go func() {
 		for {
+			conn, err := l.Accept()
+
 			select {
 			case <-done:
+				// don't report the "use of closed network connection" error when the server
+				// is exiting.
 				return
 			default:
 			}
 
-			conn, err := l.Accept()
 			if err != nil {
 				log.Errorf("accept: %s", err)
 				continue
diff --git a/pkg/http/http.go b/pkg/http/http.go
index cf00132..b5d79e6 100644
--- a/pkg/http/http.go
+++ b/pkg/http/http.go
@@ -26,17 +26,13 @@ import (
 //
 // 1. We need to record any change to the request headers. As the Request.Header
 // is not an interface, there is not way to inject our special tracker.
+//
 // 2. As the author of fasthttp pointed out, "headers are stored in a map[string][]string.
 // So the server must parse all the headers, ...". The official API is suboptimal, which
 // is even worse in our case as it is not a real HTTP server.
 type Request interface {
 	// ID returns the request id
 	ID() uint32
-	// ConfToken returns the token represents the configuration of current route.
-	// Each route have its unique token, so we can use it to distinguish different
-	// route in the same plugin. When the configuration of a route changed, the token
-	// will change too.
-	ConfToken() uint32
 
 	// SrcIP returns the client's IP
 	SrcIP() net.IP
diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go
index 0405097..e5efa34 100644
--- a/pkg/runner/runner.go
+++ b/pkg/runner/runner.go
@@ -31,7 +31,7 @@ type RunnerConfig struct {
 	LogOutput zapcore.WriteSyncer
 }
 
-// Run starts the runner and listen the socket
+// Run starts the runner and listen the socket configured by environment variable "APISIX_LISTEN_ADDRESS"
 func Run(cfg RunnerConfig) {
 	if cfg.LogOutput == nil {
 		cfg.LogOutput = os.Stdout