You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2023/02/20 14:27:55 UTC

[servicecomb-service-center] 01/01: [fix] task queue will block when subscriber busy

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch mod
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git

commit 0d3cbb2362c2aab4025296b37b23d8f5d66ce491
Author: little-cui <su...@qq.com>
AuthorDate: Mon Feb 20 22:27:29 2023 +0800

    [fix] task queue will block when subscriber busy
---
 pkg/queue/taskqueue.go      | 39 +++++++++++++++++++++++++++++++++------
 pkg/queue/taskqueue_test.go | 14 +++++++++++++-
 2 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index 1947cf7d..48eb65a7 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -19,8 +19,11 @@ package queue
 
 import (
 	"context"
+	"fmt"
+	"sync"
 
 	"github.com/apache/servicecomb-service-center/pkg/goutil"
+	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/go-chassis/foundation/gopool"
 )
 
@@ -42,8 +45,10 @@ type Task struct {
 type TaskQueue struct {
 	Workers []Worker
 
-	taskCh    chan Task
-	goroutine *gopool.Pool
+	taskChLock sync.RWMutex
+	taskChSize int
+	taskCh     chan Task
+	goroutine  *gopool.Pool
 }
 
 // AddWorker is the method to add Worker
@@ -53,7 +58,24 @@ func (q *TaskQueue) AddWorker(w Worker) {
 
 // Add is the method to add task in queue, one task will be handled by all workers
 func (q *TaskQueue) Add(t Task) {
-	q.taskCh <- t
+	q.taskChLock.RLock()
+	select {
+	case q.taskCh <- t:
+		q.taskChLock.RUnlock()
+	default:
+		q.taskChLock.RUnlock()
+		q.resetTaskCh()
+	}
+}
+
+func (q *TaskQueue) resetTaskCh() {
+	q.taskChLock.Lock()
+	defer q.taskChLock.Unlock()
+
+	log.Warn(fmt.Sprintf("taskCh[%d] is full, reset taskCh", q.taskChSize))
+	close(q.taskCh)
+	q.taskCh = make(chan Task, q.taskChSize)
+	q.Run()
 }
 
 func (q *TaskQueue) dispatch(ctx context.Context, w Worker, obj interface{}) {
@@ -82,7 +104,11 @@ func (q *TaskQueue) Run() {
 			select {
 			case <-ctx.Done():
 				return
-			case task := <-q.taskCh:
+			case task, ok := <-q.taskCh:
+				if !ok {
+					log.Warn("taskCh is closed")
+					return
+				}
 				q.Do(ctx, task)
 			}
 		}
@@ -99,7 +125,8 @@ func NewTaskQueue(size int) *TaskQueue {
 		size = eventQueueSize
 	}
 	return &TaskQueue{
-		taskCh:    make(chan Task, size),
-		goroutine: goutil.New(gopool.Configure()),
+		taskChSize: size,
+		taskCh:     make(chan Task, size),
+		goroutine:  goutil.New(gopool.Configure().Workers(1)),
 	}
 }
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 681ba649..0df7d885 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -20,6 +20,7 @@ package queue
 import (
 	"context"
 	"testing"
+	"time"
 )
 
 type mockWorker struct {
@@ -32,7 +33,6 @@ func (h *mockWorker) Handle(ctx context.Context, obj interface{}) {
 
 func TestNewEventQueue(t *testing.T) {
 	h := &mockWorker{make(chan interface{}, 1)}
-
 	q := NewTaskQueue(0)
 	q.AddWorker(h)
 
@@ -59,3 +59,15 @@ func TestNewEventQueue(t *testing.T) {
 	q.Stop()
 	q.Add(Task{Payload: 3})
 }
+
+func TestTaskQueue_Add(t *testing.T) {
+	h := &mockWorker{make(chan interface{}, 10000)}
+	q := NewTaskQueue(5)
+	q.AddWorker(h)
+	q.Run()
+	time.Sleep(100 * time.Millisecond)
+	for i := 0; i < 10000; i++ {
+		go q.Add(Task{Payload: 1})
+	}
+	q.Stop()
+}