You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/11/09 06:42:01 UTC
[GitHub] [dubbo-go] gaoxinge commented on a change in pull request #797: Imp: try to fix too many files open error
gaoxinge commented on a change in pull request #797:
URL: https://github.com/apache/dubbo-go/pull/797#discussion_r519579259
##########
File path: protocol/dubbo/pool.go
##########
@@ -288,117 +287,112 @@ func (c *gettyRPCClient) close() error {
}
type gettyRPCClientPool struct {
- rpcClient *Client
- size int // size of []*gettyRPCClient
- ttl int64 // ttl of every gettyRPCClient, it is checked when getConn
-
- sync.Mutex
- conns []*gettyRPCClient
+ rpcClient *Client
+ maxSize int // maxSize of poolQueue
+ ttl int64 // ttl of every gettyRPCClient, it is checked when getConn
+ activeNumber uint32
+ chInitialized uint32 // set to 1 when field ch is initialized
+ ch chan struct{}
+ closeCh chan struct{}
+ poolQueue *poolDequeue // store *gettyRPCClient
+ pushing uint32
+ sync.RWMutex
}
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
+ pq := &poolDequeue{
+ vals: make([]eface, size),
+ }
return &gettyRPCClientPool{
rpcClient: rpcClient,
- size: size,
+ maxSize: size,
ttl: int64(ttl.Seconds()),
- conns: make([]*gettyRPCClient, 0, 16),
+ closeCh: make(chan struct{}, 0),
+ poolQueue: pq,
}
}
func (p *gettyRPCClientPool) close() {
p.Lock()
- conns := p.conns
- p.conns = nil
+ connPool := p.poolQueue
+ p.poolQueue = nil
p.Unlock()
- for _, conn := range conns {
- conn.close()
- }
-}
-
-func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
- conn, err := p.get()
- if err == nil && conn == nil {
- // create new conn
- rpcClientConn, err := newGettyRPCClientConn(p, protocol, addr)
- return rpcClientConn, perrors.WithStack(err)
- }
- return conn, perrors.WithStack(err)
-}
-
-func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
- now := time.Now().Unix()
-
- p.Lock()
- defer p.Unlock()
- if p.conns == nil {
- return nil, errClientPoolClosed
- }
-
- for len(p.conns) > 0 {
- conn := p.conns[len(p.conns)-1]
- p.conns = p.conns[:len(p.conns)-1]
-
- if d := now - conn.getActive(); d > p.ttl {
- p.remove(conn)
- go conn.close()
- continue
+ for {
+ conn, ok := connPool.popTail()
+ if ok {
+ c := conn.(*gettyRPCClient)
+ c.close()
+ } else {
+ break
}
- conn.updateActive(now) //update active time
- return conn, nil
}
- return nil, nil
}
-func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
- if conn == nil || conn.getActive() == 0 {
+func (p *gettyRPCClientPool) lazyInit() {
+ // Fast path.
+ if atomic.LoadUint32(&p.chInitialized) == 1 {
return
}
-
+ // Slow path.
p.Lock()
defer p.Unlock()
-
- if p.conns == nil {
- return
- }
-
- // check whether @conn has existed in p.conns or not.
- for i := range p.conns {
- if p.conns[i] == conn {
- return
+ if p.chInitialized == 0 {
+ p.ch = make(chan struct{}, p.maxSize)
+ for i := 0; i < p.maxSize; i++ {
+ p.ch <- struct{}{}
}
+ atomic.StoreUint32(&p.chInitialized, 1)
Review comment:
Because the update of `p.chInitialized` is protected by a lock, we can remove `atomic.LoadUint32` and `atomic.StoreUint32`.
##########
File path: protocol/dubbo/client.go
##########
@@ -267,11 +268,25 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
return errSessionNotExist
}
defer func() {
+ failNumber := 0
if err == nil {
- c.pool.put(conn)
- return
+ for {
+ ok := atomic.CompareAndSwapUint32(&c.pool.pushing, 0, 1)
+ if ok {
+ c.pool.poolQueue.pushHead(conn)
+ c.pool.pushing = 0
+ c.pool.ch <- struct{}{}
+ return
Review comment:
Can these codes be encapsulated in one method, such as `func (p *gettyRPCClientPool) putGettyRpcClient(conn *gettyRPCClient)`?
##########
File path: protocol/dubbo/pool.go
##########
@@ -288,117 +287,112 @@ func (c *gettyRPCClient) close() error {
}
type gettyRPCClientPool struct {
- rpcClient *Client
- size int // size of []*gettyRPCClient
- ttl int64 // ttl of every gettyRPCClient, it is checked when getConn
-
- sync.Mutex
- conns []*gettyRPCClient
+ rpcClient *Client
+ maxSize int // maxSize of poolQueue
+ ttl int64 // ttl of every gettyRPCClient, it is checked when getConn
+ activeNumber uint32
+ chInitialized uint32 // set to 1 when field ch is initialized
+ ch chan struct{}
+ closeCh chan struct{}
+ poolQueue *poolDequeue // store *gettyRPCClient
+ pushing uint32
+ sync.RWMutex
}
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
+ pq := &poolDequeue{
+ vals: make([]eface, size),
+ }
return &gettyRPCClientPool{
rpcClient: rpcClient,
- size: size,
+ maxSize: size,
ttl: int64(ttl.Seconds()),
- conns: make([]*gettyRPCClient, 0, 16),
+ closeCh: make(chan struct{}, 0),
+ poolQueue: pq,
}
}
func (p *gettyRPCClientPool) close() {
p.Lock()
- conns := p.conns
- p.conns = nil
+ connPool := p.poolQueue
+ p.poolQueue = nil
p.Unlock()
- for _, conn := range conns {
- conn.close()
- }
-}
-
-func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
- conn, err := p.get()
- if err == nil && conn == nil {
- // create new conn
- rpcClientConn, err := newGettyRPCClientConn(p, protocol, addr)
- return rpcClientConn, perrors.WithStack(err)
- }
- return conn, perrors.WithStack(err)
-}
-
-func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
- now := time.Now().Unix()
-
- p.Lock()
- defer p.Unlock()
- if p.conns == nil {
- return nil, errClientPoolClosed
- }
-
- for len(p.conns) > 0 {
- conn := p.conns[len(p.conns)-1]
- p.conns = p.conns[:len(p.conns)-1]
-
- if d := now - conn.getActive(); d > p.ttl {
- p.remove(conn)
- go conn.close()
- continue
+ for {
+ conn, ok := connPool.popTail()
+ if ok {
+ c := conn.(*gettyRPCClient)
+ c.close()
+ } else {
+ break
}
- conn.updateActive(now) //update active time
- return conn, nil
}
- return nil, nil
}
-func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
- if conn == nil || conn.getActive() == 0 {
+func (p *gettyRPCClientPool) lazyInit() {
+ // Fast path.
+ if atomic.LoadUint32(&p.chInitialized) == 1 {
return
}
-
+ // Slow path.
p.Lock()
defer p.Unlock()
-
- if p.conns == nil {
- return
- }
-
- // check whether @conn has existed in p.conns or not.
- for i := range p.conns {
- if p.conns[i] == conn {
- return
+ if p.chInitialized == 0 {
+ p.ch = make(chan struct{}, p.maxSize)
+ for i := 0; i < p.maxSize; i++ {
+ p.ch <- struct{}{}
}
+ atomic.StoreUint32(&p.chInitialized, 1)
Review comment:
Because the update of `c.chInitialized` is protected by a lock, I think we can remove `atomic.LoadUint32` and `atomic.StoreUint32`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org