You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2022/06/10 03:14:58 UTC
[dubbo-getty] 01/01: Fix: remove gost timer to fix high cpu usage problem
This is an automated email from the ASF dual-hosted git repository.
laurence pushed a commit to branch remove-timer
in repository https://gitbox.apache.org/repos/asf/dubbo-getty.git
commit 1bfc340ef6dc93e78c15b04f3df5d4b815408921
Author: LaurenceLiZhixin <38...@qq.com>
AuthorDate: Fri Jun 10 11:14:47 2022 +0800
Fix: remove gost timer to fix high cpu usage problem
---
client.go | 16 +++++++---------
server.go | 4 +---
session.go | 17 +++++++----------
3 files changed, 15 insertions(+), 22 deletions(-)
diff --git a/client.go b/client.go
index bc700f2..7d24507 100644
--- a/client.go
+++ b/client.go
@@ -34,8 +34,6 @@ import (
"github.com/dubbogo/gost/bytes"
"github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync"
- gxtime "github.com/dubbogo/gost/time"
-
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
@@ -171,7 +169,7 @@ func (c *client) dialTCP() Session {
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
- <-gxtime.After(connectInterval)
+ <-time.After(connectInterval)
}
}
@@ -202,7 +200,7 @@ func (c *client) dialUDP() Session {
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
- <-gxtime.After(connectInterval)
+ <-time.After(connectInterval)
continue
}
@@ -211,7 +209,7 @@ func (c *client) dialUDP() Session {
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
- <-gxtime.After(connectInterval)
+ <-time.After(connectInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
@@ -222,7 +220,7 @@ func (c *client) dialUDP() Session {
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
conn.Close()
- <-gxtime.After(connectInterval)
+ <-time.After(connectInterval)
continue
}
return newUDPSession(conn, c)
@@ -258,7 +256,7 @@ func (c *client) dialWS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
- <-gxtime.After(connectInterval)
+ <-time.After(connectInterval)
}
}
@@ -336,7 +334,7 @@ func (c *client) dialWSS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
- <-gxtime.After(connectInterval)
+ <-time.After(connectInterval)
}
}
@@ -443,7 +441,7 @@ func (c *client) reConnect() {
if maxTimes < times {
times = maxTimes
}
- <-gxtime.After(time.Duration(int64(times) * int64(interval)))
+ <-time.After(time.Duration(int64(times) * int64(interval)))
}
}
diff --git a/server.go b/server.go
index 82b5362..48c56da 100644
--- a/server.go
+++ b/server.go
@@ -33,8 +33,6 @@ import (
import (
gxnet "github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync"
- gxtime "github.com/dubbogo/gost/time"
-
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
@@ -292,7 +290,7 @@ func (s *server) runTCPEventLoop(newSession NewSessionCallback) {
return
}
if delay != 0 {
- <-gxtime.After(delay)
+ <-time.After(delay)
}
client, err = s.accept(newSession)
if err != nil {
diff --git a/session.go b/session.go
index 5bfa4da..f9a897e 100644
--- a/session.go
+++ b/session.go
@@ -31,8 +31,6 @@ import (
import (
gxbytes "github.com/dubbogo/gost/bytes"
gxcontext "github.com/dubbogo/gost/context"
- gxtime "github.com/dubbogo/gost/time"
-
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
@@ -57,11 +55,7 @@ const (
outputFormat = "session %s, Read Bytes: %d, Write Bytes: %d, Read Pkgs: %d, Write Pkgs: %d"
)
-var defaultTimerWheel *gxtime.TimerWheel
-
func init() {
- gxtime.InitDefaultTimerWheel()
- defaultTimerWheel = gxtime.GetDefaultTimerWheel()
}
// Session wrap connection between the server and the client
@@ -507,7 +501,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) (int, error) {
return wlg, nil
}
-func heartbeat(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
+func heartbeat(arg interface{}) error {
ss, _ := arg.(*session)
if ss == nil || ss.IsClosed() {
return ErrSessionClosed
@@ -551,9 +545,12 @@ func (s *session) run() {
return
}
- if _, err := defaultTimerWheel.AddTimer(heartbeat, gxtime.TimerLoop, s.period, s); err != nil {
- panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel err:%v", s.Stat(), err))
- }
+ go func() {
+ for {
+ <-time.After(s.period)
+ heartbeat(s)
+ }
+ }()
s.grNum.Add(1)
// start read gr