You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/10/22 05:28:46 UTC

[GitHub] [dubbo-go] wenxuwan commented on a change in pull request #797: Imp: try to fix too many files open error

wenxuwan commented on a change in pull request #797:
URL: https://github.com/apache/dubbo-go/pull/797#discussion_r509888369



##########
File path: protocol/dubbo/pool.go
##########
@@ -288,117 +287,113 @@ func (c *gettyRPCClient) close() error {
 }
 
 type gettyRPCClientPool struct {
-	rpcClient *Client
-	size      int   // size of []*gettyRPCClient
-	ttl       int64 // ttl of every gettyRPCClient, it is checked when getConn
-
-	sync.Mutex
-	conns []*gettyRPCClient
+	rpcClient     *Client
+	maxSize       int   // maxSize of poolQueue
+	ttl           int64 // ttl of every gettyRPCClient, it is checked when getConn
+	activeNumber  uint32
+	chInitialized uint32 // set to 1 when field ch is initialized
+	ch            chan struct{}
+	closeCh       chan struct{}
+	poolQueue     *poolDequeue // store *gettyRPCClient
+	pushing       uint32
+	sync.RWMutex
 }
 
 func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
+	pq := &poolDequeue{
+		vals: make([]eface, size),
+	}
 	return &gettyRPCClientPool{
 		rpcClient: rpcClient,
-		size:      size,
+		maxSize:   size,
 		ttl:       int64(ttl.Seconds()),
-		conns:     make([]*gettyRPCClient, 0, 16),
+		closeCh:   make(chan struct{}, 0),
+		poolQueue: pq,
 	}
 }
 
 func (p *gettyRPCClientPool) close() {
 	p.Lock()
-	conns := p.conns
-	p.conns = nil
+	connPool := p.poolQueue
+	p.poolQueue = nil
 	p.Unlock()
-	for _, conn := range conns {
-		conn.close()
-	}
-}
-
-func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
-	conn, err := p.get()
-	if err == nil && conn == nil {
-		// create new conn
-		rpcClientConn, err := newGettyRPCClientConn(p, protocol, addr)
-		return rpcClientConn, perrors.WithStack(err)
-	}
-	return conn, perrors.WithStack(err)
-}
-
-func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
-	now := time.Now().Unix()
-
-	p.Lock()
-	defer p.Unlock()
-	if p.conns == nil {
-		return nil, errClientPoolClosed
-	}
-
-	for len(p.conns) > 0 {
-		conn := p.conns[len(p.conns)-1]
-		p.conns = p.conns[:len(p.conns)-1]
-
-		if d := now - conn.getActive(); d > p.ttl {
-			p.remove(conn)
-			go conn.close()
-			continue
+	for {
+		conn, ok := connPool.popTail()
+		if ok {
+			c := conn.(*gettyRPCClient)
+			c.close()
+		} else {
+			break
 		}
-		conn.updateActive(now) //update active time
-		return conn, nil
 	}
-	return nil, nil
 }
 
-func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
-	if conn == nil || conn.getActive() == 0 {
+func (p *gettyRPCClientPool) lazyInit() {
+	// Fast path.
+	if atomic.LoadUint32(&p.chInitialized) == 1 {
 		return
 	}
-
+	// Slow path.
 	p.Lock()
-	defer p.Unlock()
-
-	if p.conns == nil {
-		return
-	}
-
-	// check whether @conn has existed in p.conns or not.
-	for i := range p.conns {
-		if p.conns[i] == conn {
-			return
+	if p.chInitialized == 0 {
+		p.ch = make(chan struct{}, p.maxSize)
+		for i := 0; i < p.maxSize; i++ {
+			p.ch <- struct{}{}
 		}
+		atomic.StoreUint32(&p.chInitialized, 1)
 	}
+	p.Unlock()
+}
 
-	if len(p.conns) >= p.size {
-		// delete @conn from client pool
-		// p.remove(conn)
-		conn.close()
-		return
+func (p *gettyRPCClientPool) waitVacantConn() error {
+	p.lazyInit()
+	select {
+	case <-p.ch:
+		// Additionally check that close chan hasn't expired while we were waiting,
+		// because `select` picks a random `case` if several of them are "ready".
+		select {
+		case <-p.closeCh:
+			return errClientPoolClosed
+		default:
+		}
+	case <-p.closeCh:
+		return errClientPoolClosed
 	}
-	p.conns = append(p.conns, conn)
+	return nil
 }
 
-func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
-	if conn == nil || conn.getActive() == 0 {
-		return
+func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
+	err := p.waitVacantConn()
+	if err != nil {
+		return nil, err
 	}
+	conn, err := p.getConnFromPoll()
+	if err == nil && conn == nil {
+		rpcClientConn, err := newGettyRPCClientConn(p, protocol, addr)
+		return rpcClientConn, perrors.WithStack(err)
 
-	if p.conns == nil {
-		return
 	}
+	return conn, perrors.WithStack(err)
+}
 
-	if len(p.conns) > 0 {
-		for idx, c := range p.conns {
-			if conn == c {
-				p.conns = append(p.conns[:idx], p.conns[idx+1:]...)
-				break
+func (p *gettyRPCClientPool) getConnFromPoll() (*gettyRPCClient, error) {
+	now := time.Now().Unix()
+	if p.poolQueue == nil {
+		return nil, errClientPoolClosed
+	}
+	for {
+		value, ok := p.poolQueue.popTail()
+		if ok {
+			conn := value.(*gettyRPCClient)
+			if d := now - conn.getActive(); d > p.ttl {
+				go conn.close()
+				continue
 			}
+			conn.updateActive(now)
+			return conn, nil
+		} else {
+			break
 		}
 	}
-}
-
-func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) {
-	p.Lock()
-	defer p.Unlock()
-
-	p.remove(conn)
+	return nil, nil

Review comment:
       Can't get connection means pool is empty or all connections have expired , so just return nil  and will create new connection




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org