You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/11/09 06:42:01 UTC

[GitHub] [dubbo-go] gaoxinge commented on a change in pull request #797: Imp: try to fix too many files open error

gaoxinge commented on a change in pull request #797:
URL: https://github.com/apache/dubbo-go/pull/797#discussion_r519579259



##########
File path: protocol/dubbo/pool.go
##########
@@ -288,117 +287,112 @@ func (c *gettyRPCClient) close() error {
 }
 
 type gettyRPCClientPool struct {
-	rpcClient *Client
-	size      int   // size of []*gettyRPCClient
-	ttl       int64 // ttl of every gettyRPCClient, it is checked when getConn
-
-	sync.Mutex
-	conns []*gettyRPCClient
+	rpcClient     *Client
+	maxSize       int   // maxSize of poolQueue
+	ttl           int64 // ttl of every gettyRPCClient, it is checked when getConn
+	activeNumber  uint32
+	chInitialized uint32 // set to 1 when field ch is initialized
+	ch            chan struct{}
+	closeCh       chan struct{}
+	poolQueue     *poolDequeue // store *gettyRPCClient
+	pushing       uint32
+	sync.RWMutex
 }
 
 func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
+	pq := &poolDequeue{
+		vals: make([]eface, size),
+	}
 	return &gettyRPCClientPool{
 		rpcClient: rpcClient,
-		size:      size,
+		maxSize:   size,
 		ttl:       int64(ttl.Seconds()),
-		conns:     make([]*gettyRPCClient, 0, 16),
+		closeCh:   make(chan struct{}, 0),
+		poolQueue: pq,
 	}
 }
 
 func (p *gettyRPCClientPool) close() {
 	p.Lock()
-	conns := p.conns
-	p.conns = nil
+	connPool := p.poolQueue
+	p.poolQueue = nil
 	p.Unlock()
-	for _, conn := range conns {
-		conn.close()
-	}
-}
-
-func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
-	conn, err := p.get()
-	if err == nil && conn == nil {
-		// create new conn
-		rpcClientConn, err := newGettyRPCClientConn(p, protocol, addr)
-		return rpcClientConn, perrors.WithStack(err)
-	}
-	return conn, perrors.WithStack(err)
-}
-
-func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
-	now := time.Now().Unix()
-
-	p.Lock()
-	defer p.Unlock()
-	if p.conns == nil {
-		return nil, errClientPoolClosed
-	}
-
-	for len(p.conns) > 0 {
-		conn := p.conns[len(p.conns)-1]
-		p.conns = p.conns[:len(p.conns)-1]
-
-		if d := now - conn.getActive(); d > p.ttl {
-			p.remove(conn)
-			go conn.close()
-			continue
+	for {
+		conn, ok := connPool.popTail()
+		if ok {
+			c := conn.(*gettyRPCClient)
+			c.close()
+		} else {
+			break
 		}
-		conn.updateActive(now) //update active time
-		return conn, nil
 	}
-	return nil, nil
 }
 
-func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
-	if conn == nil || conn.getActive() == 0 {
+func (p *gettyRPCClientPool) lazyInit() {
+	// Fast path.
+	if atomic.LoadUint32(&p.chInitialized) == 1 {
 		return
 	}
-
+	// Slow path.
 	p.Lock()
 	defer p.Unlock()
-
-	if p.conns == nil {
-		return
-	}
-
-	// check whether @conn has existed in p.conns or not.
-	for i := range p.conns {
-		if p.conns[i] == conn {
-			return
+	if p.chInitialized == 0 {
+		p.ch = make(chan struct{}, p.maxSize)
+		for i := 0; i < p.maxSize; i++ {
+			p.ch <- struct{}{}
 		}
+		atomic.StoreUint32(&p.chInitialized, 1)

Review comment:
       Because the update of `p.chInitialized` is protected by a lock, we can remove `atomic.LoadUint32` and `atomic.StoreUint32`.

##########
File path: protocol/dubbo/client.go
##########
@@ -267,11 +268,25 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
 		return errSessionNotExist
 	}
 	defer func() {
+		failNumber := 0
 		if err == nil {
-			c.pool.put(conn)
-			return
+			for {
+				ok := atomic.CompareAndSwapUint32(&c.pool.pushing, 0, 1)
+				if ok {
+					c.pool.poolQueue.pushHead(conn)
+					c.pool.pushing = 0
+					c.pool.ch <- struct{}{}
+					return

Review comment:
       Can these codes be encapsulated in one method, such as `func (p *gettyRPCClientPool) putGettyRpcClient(conn *gettyRPCClient)`?

##########
File path: protocol/dubbo/pool.go
##########
@@ -288,117 +287,112 @@ func (c *gettyRPCClient) close() error {
 }
 
 type gettyRPCClientPool struct {
-	rpcClient *Client
-	size      int   // size of []*gettyRPCClient
-	ttl       int64 // ttl of every gettyRPCClient, it is checked when getConn
-
-	sync.Mutex
-	conns []*gettyRPCClient
+	rpcClient     *Client
+	maxSize       int   // maxSize of poolQueue
+	ttl           int64 // ttl of every gettyRPCClient, it is checked when getConn
+	activeNumber  uint32
+	chInitialized uint32 // set to 1 when field ch is initialized
+	ch            chan struct{}
+	closeCh       chan struct{}
+	poolQueue     *poolDequeue // store *gettyRPCClient
+	pushing       uint32
+	sync.RWMutex
 }
 
 func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
+	pq := &poolDequeue{
+		vals: make([]eface, size),
+	}
 	return &gettyRPCClientPool{
 		rpcClient: rpcClient,
-		size:      size,
+		maxSize:   size,
 		ttl:       int64(ttl.Seconds()),
-		conns:     make([]*gettyRPCClient, 0, 16),
+		closeCh:   make(chan struct{}, 0),
+		poolQueue: pq,
 	}
 }
 
 func (p *gettyRPCClientPool) close() {
 	p.Lock()
-	conns := p.conns
-	p.conns = nil
+	connPool := p.poolQueue
+	p.poolQueue = nil
 	p.Unlock()
-	for _, conn := range conns {
-		conn.close()
-	}
-}
-
-func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
-	conn, err := p.get()
-	if err == nil && conn == nil {
-		// create new conn
-		rpcClientConn, err := newGettyRPCClientConn(p, protocol, addr)
-		return rpcClientConn, perrors.WithStack(err)
-	}
-	return conn, perrors.WithStack(err)
-}
-
-func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
-	now := time.Now().Unix()
-
-	p.Lock()
-	defer p.Unlock()
-	if p.conns == nil {
-		return nil, errClientPoolClosed
-	}
-
-	for len(p.conns) > 0 {
-		conn := p.conns[len(p.conns)-1]
-		p.conns = p.conns[:len(p.conns)-1]
-
-		if d := now - conn.getActive(); d > p.ttl {
-			p.remove(conn)
-			go conn.close()
-			continue
+	for {
+		conn, ok := connPool.popTail()
+		if ok {
+			c := conn.(*gettyRPCClient)
+			c.close()
+		} else {
+			break
 		}
-		conn.updateActive(now) //update active time
-		return conn, nil
 	}
-	return nil, nil
 }
 
-func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
-	if conn == nil || conn.getActive() == 0 {
+func (p *gettyRPCClientPool) lazyInit() {
+	// Fast path.
+	if atomic.LoadUint32(&p.chInitialized) == 1 {
 		return
 	}
-
+	// Slow path.
 	p.Lock()
 	defer p.Unlock()
-
-	if p.conns == nil {
-		return
-	}
-
-	// check whether @conn has existed in p.conns or not.
-	for i := range p.conns {
-		if p.conns[i] == conn {
-			return
+	if p.chInitialized == 0 {
+		p.ch = make(chan struct{}, p.maxSize)
+		for i := 0; i < p.maxSize; i++ {
+			p.ch <- struct{}{}
 		}
+		atomic.StoreUint32(&p.chInitialized, 1)

Review comment:
       Because the update of `c.chInitialized` is protected by a lock, I think we can remove `atomic.LoadUint32` and `atomic.StoreUint32`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org