You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2023/02/20 14:27:55 UTC
[servicecomb-service-center] 01/01: [fix] task queue will block when subscriber busy
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch mod
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
commit 0d3cbb2362c2aab4025296b37b23d8f5d66ce491
Author: little-cui <su...@qq.com>
AuthorDate: Mon Feb 20 22:27:29 2023 +0800
[fix] task queue will block when subscriber busy
---
pkg/queue/taskqueue.go | 39 +++++++++++++++++++++++++++++++++------
pkg/queue/taskqueue_test.go | 14 +++++++++++++-
2 files changed, 46 insertions(+), 7 deletions(-)
diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index 1947cf7d..48eb65a7 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -19,8 +19,11 @@ package queue
import (
"context"
+ "fmt"
+ "sync"
"github.com/apache/servicecomb-service-center/pkg/goutil"
+ "github.com/apache/servicecomb-service-center/pkg/log"
"github.com/go-chassis/foundation/gopool"
)
@@ -42,8 +45,10 @@ type Task struct {
type TaskQueue struct {
Workers []Worker
- taskCh chan Task
- goroutine *gopool.Pool
+ taskChLock sync.RWMutex
+ taskChSize int
+ taskCh chan Task
+ goroutine *gopool.Pool
}
// AddWorker is the method to add Worker
@@ -53,7 +58,24 @@ func (q *TaskQueue) AddWorker(w Worker) {
// Add is the method to add task in queue, one task will be handled by all workers
func (q *TaskQueue) Add(t Task) {
- q.taskCh <- t
+ q.taskChLock.RLock()
+ select {
+ case q.taskCh <- t:
+ q.taskChLock.RUnlock()
+ default:
+ q.taskChLock.RUnlock()
+ q.resetTaskCh()
+ }
+}
+
+func (q *TaskQueue) resetTaskCh() {
+ q.taskChLock.Lock()
+ defer q.taskChLock.Unlock()
+
+ log.Warn(fmt.Sprintf("taskCh[%d] is full, reset taskCh", q.taskChSize))
+ close(q.taskCh)
+ q.taskCh = make(chan Task, q.taskChSize)
+ q.Run()
}
func (q *TaskQueue) dispatch(ctx context.Context, w Worker, obj interface{}) {
@@ -82,7 +104,11 @@ func (q *TaskQueue) Run() {
select {
case <-ctx.Done():
return
- case task := <-q.taskCh:
+ case task, ok := <-q.taskCh:
+ if !ok {
+ log.Warn("taskCh is closed")
+ return
+ }
q.Do(ctx, task)
}
}
@@ -99,7 +125,8 @@ func NewTaskQueue(size int) *TaskQueue {
size = eventQueueSize
}
return &TaskQueue{
- taskCh: make(chan Task, size),
- goroutine: goutil.New(gopool.Configure()),
+ taskChSize: size,
+ taskCh: make(chan Task, size),
+ goroutine: goutil.New(gopool.Configure().Workers(1)),
}
}
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 681ba649..0df7d885 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -20,6 +20,7 @@ package queue
import (
"context"
"testing"
+ "time"
)
type mockWorker struct {
@@ -32,7 +33,6 @@ func (h *mockWorker) Handle(ctx context.Context, obj interface{}) {
func TestNewEventQueue(t *testing.T) {
h := &mockWorker{make(chan interface{}, 1)}
-
q := NewTaskQueue(0)
q.AddWorker(h)
@@ -59,3 +59,15 @@ func TestNewEventQueue(t *testing.T) {
q.Stop()
q.Add(Task{Payload: 3})
}
+
+func TestTaskQueue_Add(t *testing.T) {
+ h := &mockWorker{make(chan interface{}, 10000)}
+ q := NewTaskQueue(5)
+ q.AddWorker(h)
+ q.Run()
+ time.Sleep(100 * time.Millisecond)
+ for i := 0; i < 10000; i++ {
+ go q.Add(Task{Payload: 1})
+ }
+ q.Stop()
+}