You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/01/17 14:13:22 UTC

[dubbo-getty] branch improve/delte-handleLoop created (now 675313b)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a change to branch improve/delte-handleLoop
in repository https://gitbox.apache.org/repos/asf/dubbo-getty.git.


      at 675313b  delete session.handleLoop

This branch includes the following new commits:

     new 675313b  delete session.handleLoop

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[dubbo-getty] 01/01: delete session.handleLoop

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch improve/delte-handleLoop
in repository https://gitbox.apache.org/repos/asf/dubbo-getty.git

commit 675313bd355ecdea0a354696e0d84af1f9555768
Author: AlexStocks <al...@foxmail.com>
AuthorDate: Sun Jan 17 22:13:08 2021 +0800

    delete session.handleLoop
---
 client.go  |  15 +++++----
 go.mod     |   2 +-
 go.sum     |  10 ++++--
 server.go  |   3 +-
 session.go | 106 ++++++++++++++++++++++++-------------------------------------
 5 files changed, 60 insertions(+), 76 deletions(-)

diff --git a/client.go b/client.go
index a11c73b..d7e86fd 100644
--- a/client.go
+++ b/client.go
@@ -22,6 +22,7 @@ import (
 	"crypto/x509"
 	"encoding/pem"
 	"fmt"
+	gxtime "github.com/dubbogo/gost/time"
 	"io/ioutil"
 	"net"
 	"strings"
@@ -170,7 +171,7 @@ func (c *client) dialTCP() Session {
 		}
 
 		log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
-		<-wheel.After(connectInterval)
+		<-gxtime.After(connectInterval)
 	}
 }
 
@@ -202,7 +203,7 @@ func (c *client) dialUDP() Session {
 		}
 		if err != nil {
 			log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
-			<-wheel.After(connectInterval)
+			<-gxtime.After(connectInterval)
 			continue
 		}
 
@@ -211,7 +212,7 @@ func (c *client) dialUDP() Session {
 		if length, err = conn.Write(connectPingPackage[:]); err != nil {
 			conn.Close()
 			log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
-			<-wheel.After(connectInterval)
+			<-gxtime.After(connectInterval)
 			continue
 		}
 		conn.SetReadDeadline(time.Now().Add(1e9))
@@ -222,7 +223,7 @@ func (c *client) dialUDP() Session {
 		if err != nil {
 			log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
 			conn.Close()
-			<-wheel.After(connectInterval)
+			<-gxtime.After(connectInterval)
 			continue
 		}
 		//if err == nil {
@@ -260,7 +261,7 @@ func (c *client) dialWS() Session {
 		}
 
 		log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
-		<-wheel.After(connectInterval)
+		<-gxtime.After(connectInterval)
 	}
 }
 
@@ -338,7 +339,7 @@ func (c *client) dialWSS() Session {
 		}
 
 		log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
-		<-wheel.After(connectInterval)
+		<-gxtime.After(connectInterval)
 	}
 }
 
@@ -445,7 +446,7 @@ func (c *client) reConnect() {
 		if maxTimes < times {
 			times = maxTimes
 		}
-		<-wheel.After(time.Duration(int64(times) * int64(interval)))
+		<-gxtime.After(time.Duration(int64(times) * int64(interval)))
 	}
 }
 
diff --git a/go.mod b/go.mod
index 6dd2917..baf4991 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/apache/dubbo-getty
 go 1.14
 
 require (
-	github.com/dubbogo/gost v1.10.1
+	github.com/dubbogo/gost v1.10.4
 	github.com/golang/snappy v0.0.1
 	github.com/gorilla/websocket v1.4.2
 	github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 175c5ae..c5ead10 100644
--- a/go.sum
+++ b/go.sum
@@ -5,8 +5,8 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY=
-github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
+github.com/dubbogo/gost v1.10.4 h1:z9Kw3tgLc9cDmA3gu0hgzjr/slsprZNzssK4zXFqo8s=
+github.com/dubbogo/gost v1.10.4/go.mod h1:w8Yw29eDWtRVo3tx9nPpHkNZnOi4SRx1fZf7eVlAAU4=
 github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
 github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
 github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
@@ -15,7 +15,9 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
 github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
 github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
+github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=
 github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
@@ -23,7 +25,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3mdw=
 github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
+github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
 github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -40,6 +44,8 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
 go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
 go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
 go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
diff --git a/server.go b/server.go
index 9f5e586..aa3bb6d 100644
--- a/server.go
+++ b/server.go
@@ -22,6 +22,7 @@ import (
 	"crypto/tls"
 	"crypto/x509"
 	"fmt"
+	gxtime "github.com/dubbogo/gost/time"
 	"io/ioutil"
 	"net"
 	"net/http"
@@ -268,7 +269,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
 				return
 			}
 			if delay != 0 {
-				<-wheel.After(delay)
+				<-gxtime.After(delay)
 			}
 			client, err = s.accept(newSession)
 			if err != nil {
diff --git a/session.go b/session.go
index e9176d1..4f88b27 100644
--- a/session.go
+++ b/session.go
@@ -59,17 +59,12 @@ const (
 /////////////////////////////////////////
 
 var (
-	wheel *gxtime.Wheel
+	defaultTimerWheel *gxtime.TimerWheel
 )
 
 func init() {
-	span := 100e6 // 100ms
-	buckets := MaxWheelTimeSpan / span
-	wheel = gxtime.NewWheel(time.Duration(span), int(buckets)) // wheel longest span is 15 minute
-}
-
-func GetTimeWheel() *gxtime.Wheel {
-	return wheel
+	gxtime.InitDefaultTimerWheel()
+	defaultTimerWheel = gxtime.GetDefaultTimerWheel()
 }
 
 // getty base session
@@ -102,8 +97,6 @@ type session struct {
 
 	// goroutines sync
 	grNum int32
-	// read goroutines done signal
-	rDone chan struct{}
 	lock  sync.RWMutex
 }
 
@@ -122,7 +115,6 @@ func newSession(endPoint EndPoint, conn Connection) *session {
 		done:  make(chan struct{}),
 		wait:  pendingDuration,
 		attrs: gxcontext.NewValuesContext(context.Background()),
-		rDone: make(chan struct{}),
 	}
 
 	ss.Connection.setSession(ss)
@@ -164,7 +156,6 @@ func (s *session) Reset() {
 		period: period,
 		wait:   pendingDuration,
 		attrs:  gxcontext.NewValuesContext(context.Background()),
-		rDone:  make(chan struct{}),
 	}
 }
 
@@ -460,6 +451,34 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
 	return nil
 }
 
+func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
+	ss, _ := arg.(*session)
+	if ss != nil && ss.IsClosed() {
+		return ErrSessionClosed
+	}
+
+	f := func() {
+		wsConn, wsFlag := ss.Connection.(*gettyWSConn)
+		if wsFlag {
+			err := wsConn.writePing()
+			if err != nil {
+				log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
+			}
+		}
+
+		ss.listener.OnCron(ss)
+	}
+
+	// if enable task pool, run @f asynchronously.
+	if taskPool := ss.EndPoint().GetTaskPool(); taskPool != nil {
+		taskPool.AddTaskAlways(f)
+		return nil
+	}
+
+	f()
+	return nil
+}
+
 // func (s *session) RunEventLoop() {
 func (s *session) run() {
 	if s.Connection == nil || s.listener == nil || s.writer == nil {
@@ -477,56 +496,12 @@ func (s *session) run() {
 		return
 	}
 
-	// start read/write gr
-	atomic.AddInt32(&(s.grNum), 2)
-	go s.handleLoop()
-	go s.handlePackage()
-}
-
-func (s *session) handleLoop() {
-	var (
-		wsFlag  bool
-		wsConn  *gettyWSConn
-		counter gxtime.CountWatch
-	)
-
-	defer func() {
-		if r := recover(); r != nil {
-			const size = 64 << 10
-			rBuf := make([]byte, size)
-			rBuf = rBuf[:runtime.Stack(rBuf, false)]
-			log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
-		}
-
-		grNum := atomic.AddInt32(&(s.grNum), -1)
-		s.listener.OnClose(s)
-		log.Infof("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
-		s.gc()
-	}()
-
-	wsConn, wsFlag = s.Connection.(*gettyWSConn)
-LOOP:
-	for {
-		select {
-		case <-s.done:
-			// this case branch assure the (session)handleLoop gr will exit after (session)handlePackage gr.
-			<-s.rDone
-			counter.Start()
-			if counter.Count() > s.wait.Nanoseconds() {
-				log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
-				break LOOP
-			}
-
-		case <-wheel.After(s.period):
-			if wsFlag {
-				err := wsConn.writePing()
-				if err != nil {
-					log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
-				}
-			}
-			s.listener.OnCron(s)
-		}
+	atomic.AddInt32(&(s.grNum), 1)
+	if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, gxtime.TimerLoop, s.period, s); err != nil {
+		panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel", s.Stat()))
 	}
+	// start read gr
+	go s.handlePackage()
 }
 
 func (s *session) addTask(pkg interface{}) {
@@ -553,8 +528,6 @@ func (s *session) handlePackage() {
 			rBuf = rBuf[:runtime.Stack(rBuf, false)]
 			log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
 		}
-
-		close(s.rDone)
 		grNum := atomic.AddInt32(&(s.grNum), -1)
 		log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
 		s.stop()
@@ -564,6 +537,9 @@ func (s *session) handlePackage() {
 				s.listener.OnError(s, err)
 			}
 		}
+
+		s.listener.OnClose(s)
+		s.gc()
 	}()
 
 	if _, ok := s.Connection.(*gettyTCPConn); ok {
@@ -702,8 +678,8 @@ func (s *session) handleUDPPackage() error {
 	if int(s.maxMsgLen<<1) < bufLen {
 		maxBufLen = int(s.maxMsgLen << 1)
 	}
-	bufp = gxbytes.GetBytes(maxBufLen)
-	defer gxbytes.PutBytes(bufp)
+	bufp = gxbytes.AcquireBytes(maxBufLen)
+	defer gxbytes.ReleaseBytes(bufp)
 	buf = *bufp
 	for {
 		if s.IsClosed() {