You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2023/02/20 14:27:54 UTC
[servicecomb-service-center] branch mod created (now 0d3cbb23)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a change to branch mod
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
at 0d3cbb23 [fix] task queue will block when subscriber busy
This branch includes the following new commits:
new 0d3cbb23 [fix] task queue will block when subscriber busy
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[servicecomb-service-center] 01/01: [fix] task queue will block when subscriber busy
Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch mod
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
commit 0d3cbb2362c2aab4025296b37b23d8f5d66ce491
Author: little-cui <su...@qq.com>
AuthorDate: Mon Feb 20 22:27:29 2023 +0800
[fix] task queue will block when subscriber busy
---
pkg/queue/taskqueue.go | 39 +++++++++++++++++++++++++++++++++------
pkg/queue/taskqueue_test.go | 14 +++++++++++++-
2 files changed, 46 insertions(+), 7 deletions(-)
diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index 1947cf7d..48eb65a7 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -19,8 +19,11 @@ package queue
import (
"context"
+ "fmt"
+ "sync"
"github.com/apache/servicecomb-service-center/pkg/goutil"
+ "github.com/apache/servicecomb-service-center/pkg/log"
"github.com/go-chassis/foundation/gopool"
)
@@ -42,8 +45,10 @@ type Task struct {
type TaskQueue struct {
Workers []Worker
- taskCh chan Task
- goroutine *gopool.Pool
+ taskChLock sync.RWMutex
+ taskChSize int
+ taskCh chan Task
+ goroutine *gopool.Pool
}
// AddWorker is the method to add Worker
@@ -53,7 +58,24 @@ func (q *TaskQueue) AddWorker(w Worker) {
// Add is the method to add task in queue, one task will be handled by all workers
func (q *TaskQueue) Add(t Task) {
- q.taskCh <- t
+ q.taskChLock.RLock()
+ select {
+ case q.taskCh <- t:
+ q.taskChLock.RUnlock()
+ default:
+ q.taskChLock.RUnlock()
+ q.resetTaskCh()
+ }
+}
+
+func (q *TaskQueue) resetTaskCh() {
+ q.taskChLock.Lock()
+ defer q.taskChLock.Unlock()
+
+ log.Warn(fmt.Sprintf("taskCh[%d] is full, reset taskCh", q.taskChSize))
+ close(q.taskCh)
+ q.taskCh = make(chan Task, q.taskChSize)
+ q.Run()
}
func (q *TaskQueue) dispatch(ctx context.Context, w Worker, obj interface{}) {
@@ -82,7 +104,11 @@ func (q *TaskQueue) Run() {
select {
case <-ctx.Done():
return
- case task := <-q.taskCh:
+ case task, ok := <-q.taskCh:
+ if !ok {
+ log.Warn("taskCh is closed")
+ return
+ }
q.Do(ctx, task)
}
}
@@ -99,7 +125,8 @@ func NewTaskQueue(size int) *TaskQueue {
size = eventQueueSize
}
return &TaskQueue{
- taskCh: make(chan Task, size),
- goroutine: goutil.New(gopool.Configure()),
+ taskChSize: size,
+ taskCh: make(chan Task, size),
+ goroutine: goutil.New(gopool.Configure().Workers(1)),
}
}
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 681ba649..0df7d885 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -20,6 +20,7 @@ package queue
import (
"context"
"testing"
+ "time"
)
type mockWorker struct {
@@ -32,7 +33,6 @@ func (h *mockWorker) Handle(ctx context.Context, obj interface{}) {
func TestNewEventQueue(t *testing.T) {
h := &mockWorker{make(chan interface{}, 1)}
-
q := NewTaskQueue(0)
q.AddWorker(h)
@@ -59,3 +59,15 @@ func TestNewEventQueue(t *testing.T) {
q.Stop()
q.Add(Task{Payload: 3})
}
+
+func TestTaskQueue_Add(t *testing.T) {
+ h := &mockWorker{make(chan interface{}, 10000)}
+ q := NewTaskQueue(5)
+ q.AddWorker(h)
+ q.Run()
+ time.Sleep(100 * time.Millisecond)
+ for i := 0; i < 10000; i++ {
+ go q.Add(Task{Payload: 1})
+ }
+ q.Stop()
+}