You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2023/02/20 14:27:54 UTC

[servicecomb-service-center] branch mod created (now 0d3cbb23)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a change to branch mod
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


      at 0d3cbb23 [fix] task queue will block when subscriber busy

This branch includes the following new commits:

     new 0d3cbb23 [fix] task queue will block when subscriber busy

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[servicecomb-service-center] 01/01: [fix] task queue will block when subscriber busy

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch mod
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git

commit 0d3cbb2362c2aab4025296b37b23d8f5d66ce491
Author: little-cui <su...@qq.com>
AuthorDate: Mon Feb 20 22:27:29 2023 +0800

    [fix] task queue will block when subscriber busy
---
 pkg/queue/taskqueue.go      | 39 +++++++++++++++++++++++++++++++++------
 pkg/queue/taskqueue_test.go | 14 +++++++++++++-
 2 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index 1947cf7d..48eb65a7 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -19,8 +19,11 @@ package queue
 
 import (
 	"context"
+	"fmt"
+	"sync"
 
 	"github.com/apache/servicecomb-service-center/pkg/goutil"
+	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/go-chassis/foundation/gopool"
 )
 
@@ -42,8 +45,10 @@ type Task struct {
 type TaskQueue struct {
 	Workers []Worker
 
-	taskCh    chan Task
-	goroutine *gopool.Pool
+	taskChLock sync.RWMutex
+	taskChSize int
+	taskCh     chan Task
+	goroutine  *gopool.Pool
 }
 
 // AddWorker is the method to add Worker
@@ -53,7 +58,24 @@ func (q *TaskQueue) AddWorker(w Worker) {
 
 // Add is the method to add task in queue, one task will be handled by all workers
 func (q *TaskQueue) Add(t Task) {
-	q.taskCh <- t
+	q.taskChLock.RLock()
+	select {
+	case q.taskCh <- t:
+		q.taskChLock.RUnlock()
+	default:
+		q.taskChLock.RUnlock()
+		q.resetTaskCh()
+	}
+}
+
+func (q *TaskQueue) resetTaskCh() {
+	q.taskChLock.Lock()
+	defer q.taskChLock.Unlock()
+
+	log.Warn(fmt.Sprintf("taskCh[%d] is full, reset taskCh", q.taskChSize))
+	close(q.taskCh)
+	q.taskCh = make(chan Task, q.taskChSize)
+	q.Run()
 }
 
 func (q *TaskQueue) dispatch(ctx context.Context, w Worker, obj interface{}) {
@@ -82,7 +104,11 @@ func (q *TaskQueue) Run() {
 			select {
 			case <-ctx.Done():
 				return
-			case task := <-q.taskCh:
+			case task, ok := <-q.taskCh:
+				if !ok {
+					log.Warn("taskCh is closed")
+					return
+				}
 				q.Do(ctx, task)
 			}
 		}
@@ -99,7 +125,8 @@ func NewTaskQueue(size int) *TaskQueue {
 		size = eventQueueSize
 	}
 	return &TaskQueue{
-		taskCh:    make(chan Task, size),
-		goroutine: goutil.New(gopool.Configure()),
+		taskChSize: size,
+		taskCh:     make(chan Task, size),
+		goroutine:  goutil.New(gopool.Configure().Workers(1)),
 	}
 }
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 681ba649..0df7d885 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -20,6 +20,7 @@ package queue
 import (
 	"context"
 	"testing"
+	"time"
 )
 
 type mockWorker struct {
@@ -32,7 +33,6 @@ func (h *mockWorker) Handle(ctx context.Context, obj interface{}) {
 
 func TestNewEventQueue(t *testing.T) {
 	h := &mockWorker{make(chan interface{}, 1)}
-
 	q := NewTaskQueue(0)
 	q.AddWorker(h)
 
@@ -59,3 +59,15 @@ func TestNewEventQueue(t *testing.T) {
 	q.Stop()
 	q.Add(Task{Payload: 3})
 }
+
+func TestTaskQueue_Add(t *testing.T) {
+	h := &mockWorker{make(chan interface{}, 10000)}
+	q := NewTaskQueue(5)
+	q.AddWorker(h)
+	q.Run()
+	time.Sleep(100 * time.Millisecond)
+	for i := 0; i < 10000; i++ {
+		go q.Add(Task{Payload: 1})
+	}
+	q.Stop()
+}