You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/01/17 14:13:22 UTC
[dubbo-getty] branch improve/delte-handleLoop created (now 675313b)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a change to branch improve/delte-handleLoop
in repository https://gitbox.apache.org/repos/asf/dubbo-getty.git.
at 675313b delete session.handleLoop
This branch includes the following new commits:
new 675313b delete session.handleLoop
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[dubbo-getty] 01/01: delete session.handleLoop
Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch improve/delte-handleLoop
in repository https://gitbox.apache.org/repos/asf/dubbo-getty.git
commit 675313bd355ecdea0a354696e0d84af1f9555768
Author: AlexStocks <al...@foxmail.com>
AuthorDate: Sun Jan 17 22:13:08 2021 +0800
delete session.handleLoop
---
client.go | 15 +++++----
go.mod | 2 +-
go.sum | 10 ++++--
server.go | 3 +-
session.go | 106 ++++++++++++++++++++++++-------------------------------------
5 files changed, 60 insertions(+), 76 deletions(-)
diff --git a/client.go b/client.go
index a11c73b..d7e86fd 100644
--- a/client.go
+++ b/client.go
@@ -22,6 +22,7 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
+ gxtime "github.com/dubbogo/gost/time"
"io/ioutil"
"net"
"strings"
@@ -170,7 +171,7 @@ func (c *client) dialTCP() Session {
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
- <-wheel.After(connectInterval)
+ <-gxtime.After(connectInterval)
}
}
@@ -202,7 +203,7 @@ func (c *client) dialUDP() Session {
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
- <-wheel.After(connectInterval)
+ <-gxtime.After(connectInterval)
continue
}
@@ -211,7 +212,7 @@ func (c *client) dialUDP() Session {
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
- <-wheel.After(connectInterval)
+ <-gxtime.After(connectInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
@@ -222,7 +223,7 @@ func (c *client) dialUDP() Session {
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
conn.Close()
- <-wheel.After(connectInterval)
+ <-gxtime.After(connectInterval)
continue
}
//if err == nil {
@@ -260,7 +261,7 @@ func (c *client) dialWS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
- <-wheel.After(connectInterval)
+ <-gxtime.After(connectInterval)
}
}
@@ -338,7 +339,7 @@ func (c *client) dialWSS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
- <-wheel.After(connectInterval)
+ <-gxtime.After(connectInterval)
}
}
@@ -445,7 +446,7 @@ func (c *client) reConnect() {
if maxTimes < times {
times = maxTimes
}
- <-wheel.After(time.Duration(int64(times) * int64(interval)))
+ <-gxtime.After(time.Duration(int64(times) * int64(interval)))
}
}
diff --git a/go.mod b/go.mod
index 6dd2917..baf4991 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/apache/dubbo-getty
go 1.14
require (
- github.com/dubbogo/gost v1.10.1
+ github.com/dubbogo/gost v1.10.4
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 175c5ae..c5ead10 100644
--- a/go.sum
+++ b/go.sum
@@ -5,8 +5,8 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY=
-github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
+github.com/dubbogo/gost v1.10.4 h1:z9Kw3tgLc9cDmA3gu0hgzjr/slsprZNzssK4zXFqo8s=
+github.com/dubbogo/gost v1.10.4/go.mod h1:w8Yw29eDWtRVo3tx9nPpHkNZnOi4SRx1fZf7eVlAAU4=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
@@ -15,7 +15,9 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
+github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=
github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
@@ -23,7 +25,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3mdw=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
+github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -40,6 +44,8 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
diff --git a/server.go b/server.go
index 9f5e586..aa3bb6d 100644
--- a/server.go
+++ b/server.go
@@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
+ gxtime "github.com/dubbogo/gost/time"
"io/ioutil"
"net"
"net/http"
@@ -268,7 +269,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
return
}
if delay != 0 {
- <-wheel.After(delay)
+ <-gxtime.After(delay)
}
client, err = s.accept(newSession)
if err != nil {
diff --git a/session.go b/session.go
index e9176d1..4f88b27 100644
--- a/session.go
+++ b/session.go
@@ -59,17 +59,12 @@ const (
/////////////////////////////////////////
var (
- wheel *gxtime.Wheel
+ defaultTimerWheel *gxtime.TimerWheel
)
func init() {
- span := 100e6 // 100ms
- buckets := MaxWheelTimeSpan / span
- wheel = gxtime.NewWheel(time.Duration(span), int(buckets)) // wheel longest span is 15 minute
-}
-
-func GetTimeWheel() *gxtime.Wheel {
- return wheel
+ gxtime.InitDefaultTimerWheel()
+ defaultTimerWheel = gxtime.GetDefaultTimerWheel()
}
// getty base session
@@ -102,8 +97,6 @@ type session struct {
// goroutines sync
grNum int32
- // read goroutines done signal
- rDone chan struct{}
lock sync.RWMutex
}
@@ -122,7 +115,6 @@ func newSession(endPoint EndPoint, conn Connection) *session {
done: make(chan struct{}),
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(context.Background()),
- rDone: make(chan struct{}),
}
ss.Connection.setSession(ss)
@@ -164,7 +156,6 @@ func (s *session) Reset() {
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(context.Background()),
- rDone: make(chan struct{}),
}
}
@@ -460,6 +451,34 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return nil
}
+func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
+ ss, _ := arg.(*session)
+ if ss != nil && ss.IsClosed() {
+ return ErrSessionClosed
+ }
+
+ f := func() {
+ wsConn, wsFlag := ss.Connection.(*gettyWSConn)
+ if wsFlag {
+ err := wsConn.writePing()
+ if err != nil {
+ log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
+ }
+ }
+
+ ss.listener.OnCron(ss)
+ }
+
+ // if enable task pool, run @f asynchronously.
+ if taskPool := ss.EndPoint().GetTaskPool(); taskPool != nil {
+ taskPool.AddTaskAlways(f)
+ return nil
+ }
+
+ f()
+ return nil
+}
+
// func (s *session) RunEventLoop() {
func (s *session) run() {
if s.Connection == nil || s.listener == nil || s.writer == nil {
@@ -477,56 +496,12 @@ func (s *session) run() {
return
}
- // start read/write gr
- atomic.AddInt32(&(s.grNum), 2)
- go s.handleLoop()
- go s.handlePackage()
-}
-
-func (s *session) handleLoop() {
- var (
- wsFlag bool
- wsConn *gettyWSConn
- counter gxtime.CountWatch
- )
-
- defer func() {
- if r := recover(); r != nil {
- const size = 64 << 10
- rBuf := make([]byte, size)
- rBuf = rBuf[:runtime.Stack(rBuf, false)]
- log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
- }
-
- grNum := atomic.AddInt32(&(s.grNum), -1)
- s.listener.OnClose(s)
- log.Infof("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
- s.gc()
- }()
-
- wsConn, wsFlag = s.Connection.(*gettyWSConn)
-LOOP:
- for {
- select {
- case <-s.done:
- // this case branch assure the (session)handleLoop gr will exit after (session)handlePackage gr.
- <-s.rDone
- counter.Start()
- if counter.Count() > s.wait.Nanoseconds() {
- log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
- break LOOP
- }
-
- case <-wheel.After(s.period):
- if wsFlag {
- err := wsConn.writePing()
- if err != nil {
- log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
- }
- }
- s.listener.OnCron(s)
- }
+ atomic.AddInt32(&(s.grNum), 1)
+ if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, gxtime.TimerLoop, s.period, s); err != nil {
+ panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel", s.Stat()))
}
+ // start read gr
+ go s.handlePackage()
}
func (s *session) addTask(pkg interface{}) {
@@ -553,8 +528,6 @@ func (s *session) handlePackage() {
rBuf = rBuf[:runtime.Stack(rBuf, false)]
log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
-
- close(s.rDone)
grNum := atomic.AddInt32(&(s.grNum), -1)
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop()
@@ -564,6 +537,9 @@ func (s *session) handlePackage() {
s.listener.OnError(s, err)
}
}
+
+ s.listener.OnClose(s)
+ s.gc()
}()
if _, ok := s.Connection.(*gettyTCPConn); ok {
@@ -702,8 +678,8 @@ func (s *session) handleUDPPackage() error {
if int(s.maxMsgLen<<1) < bufLen {
maxBufLen = int(s.maxMsgLen << 1)
}
- bufp = gxbytes.GetBytes(maxBufLen)
- defer gxbytes.PutBytes(bufp)
+ bufp = gxbytes.AcquireBytes(maxBufLen)
+ defer gxbytes.ReleaseBytes(bufp)
buf = *bufp
for {
if s.IsClosed() {