You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/18 16:00:35 UTC

[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

cckellogg commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r426723771



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -141,6 +164,16 @@ func (bq *blockingQueue) Peek() interface{} {
 	return bq.items[bq.headIdx]
 }
 
+func (bq *blockingQueue) PeekApply(consumer func(item interface{})) bool {

Review comment:
       It's generally not great to hold a lock when calling a callback this can lead to deadlocks.

##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}

Review comment:
       Update the variable name because when I see `wg` I think WaitGroup

##########
File path: pulsar/producer_partition.go
##########
@@ -299,12 +333,57 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalCheckSendTimeout() {
+	// check items in pendingQueue until one not all expire
+	p.pendingQueue.IterateIfNonEmpty(func(item interface{}) bool {
+		pi := item.(*pendingItem)
+		pi.Lock()
+		defer pi.Unlock()
+		return pi.checkRequestsContextDone()
+	})
+}
+
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
-	sequenceID   uint64
-	sendRequests []interface{}
-	completed    bool
+	batchData      []byte
+	sequenceID     uint64
+	sendRequests   []interface{}
+	completed      bool
+	requestCtxDone *bitset.BitSet
+}
+
+// need Lock() before call this func
+func (pi *pendingItem) setFlushAt(flushAt time.Time) {
+	for _, i := range pi.sendRequests {
+		sr := i.(*sendRequest)
+		sr.Lock()
+		sr.flushAt = flushAt

Review comment:
       Does each sendRequest need a flushAt? Can this just be at the pendingItem level?

##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -173,17 +206,27 @@ func (bq *blockingQueue) Size() int {
 	return bq.size
 }
 
-func (bq *blockingQueue) Iterator() BlockingQueueIterator {
-	bq.mutex.Lock()
-	defer bq.mutex.Unlock()
-
+func (bq *blockingQueue) iterator() BlockingQueueIterator {
 	return &blockingQueueIterator{
 		bq:      bq,
 		readIdx: bq.headIdx,
 		toRead:  bq.size,
 	}
 }
 
+func (bq *blockingQueue) IterateIfNonEmpty(operation IterateFunc) {

Review comment:
       What's wrong with the original Iterator() function?

##########
File path: pulsar/producer_partition.go
##########
@@ -299,12 +333,57 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalCheckSendTimeout() {
+	// check items in pendingQueue until one not all expire
+	p.pendingQueue.IterateIfNonEmpty(func(item interface{}) bool {
+		pi := item.(*pendingItem)
+		pi.Lock()
+		defer pi.Unlock()
+		return pi.checkRequestsContextDone()
+	})
+}
+
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
-	sequenceID   uint64
-	sendRequests []interface{}
-	completed    bool
+	batchData      []byte
+	sequenceID     uint64
+	sendRequests   []interface{}
+	completed      bool
+	requestCtxDone *bitset.BitSet

Review comment:
       Why is this needed? It looks like they set in one place. Would just a bool flag work?

##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}
 	}, true)
 
-	wg.Wait()
-	return msgID, err
+	for {
+		select {
+		case <-wg:
+			close(wg)
+			return msgID, err
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval)
+		}
+	}
 }
 
 func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error)) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: false,
+	if ctx == nil {
+		callback(nil, msg, ErrNilContextPass)
+	}
+
+	sr := newsendRequest(ctx, msg, callback, false)
+	// may be message will block on acquire semaphore
+	for {
+		select {
+		case p.publishSemaphore <- true:
+			sr.gotSemaphoreAt = time.Now()
+			p.eventsChan <- sr
+			return
+		case <-ctx.Done():
+			log.Debugf("send timeout because for publishSemaphore %v", msg)
+			callback(nil, msg, ctx.Err())
+			return
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
+		}
 	}
-	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: flushImmediately,
+
+	// may be message will block on acquire semaphore
+	sr := newsendRequest(ctx, msg, callback, flushImmediately)
+	for {
+		select {
+		case p.publishSemaphore <- true:
+			sr.gotSemaphoreAt = time.Now()
+			p.eventsChan <- sr
+			return
+		case <-ctx.Done():
+			log.Debugf("send timeout because for publishSemaphore %v", msg)
+			callback(nil, msg, ctx.Err())
+			return
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
+		}
 	}
-	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
-	pi, ok := p.pendingQueue.Peek().(*pendingItem)
 
-	if !ok {
+	// inject point for mock slow response
+	p.beforeReceiveResponse(response)

Review comment:
       What is the purpose of this line?

##########
File path: pulsar/producer_partition.go
##########
@@ -493,11 +637,100 @@ func (p *partitionProducer) Close() {
 	wg.Wait()
 }
 
+func (p *partitionProducer) beforeReceiveResponse(receipt *pb.CommandSendReceipt) {
+	if p.options.beforeReceiveResponseCallback != nil {
+		p.options.beforeReceiveResponseCallback(receipt)
+	}
+}
+
 type sendRequest struct {
 	ctx              context.Context
 	msg              *ProducerMessage
-	callback         func(MessageID, *ProducerMessage, error)
+	callback         *onceCallback
 	flushImmediately bool
+
+	sync.Mutex
+	callbackCalled bool
+
+	createAt               time.Time
+	gotSemaphoreAt         time.Time
+	addToBuilderAt         time.Time
+	flushAt                time.Time
+	lastTimeCheckCtxDoneAt time.Time
+	receiveResponseAt      time.Time
+	callBackCallAt         time.Time
+}
+
+type onceCallback struct {
+	once     sync.Once
+	callback func(MessageID, *ProducerMessage, error)
+}
+
+func (request *sendRequest) CallBack(messageID MessageID, msg *ProducerMessage, err error) {
+	request.callback.once.Do(func() {
+		request.callback.callback(messageID, msg, err)
+		request.callBackCallAt = time.Now()
+		log.Debugf("gotSemaphoreCost=%v, addToBuilderCost=%v, "+

Review comment:
       You should check if debug is enabled before logging a large debug method.

##########
File path: pulsar/producer_partition.go
##########
@@ -493,11 +637,100 @@ func (p *partitionProducer) Close() {
 	wg.Wait()
 }
 
+func (p *partitionProducer) beforeReceiveResponse(receipt *pb.CommandSendReceipt) {
+	if p.options.beforeReceiveResponseCallback != nil {
+		p.options.beforeReceiveResponseCallback(receipt)
+	}
+}
+
 type sendRequest struct {
 	ctx              context.Context
 	msg              *ProducerMessage
-	callback         func(MessageID, *ProducerMessage, error)
+	callback         *onceCallback
 	flushImmediately bool
+
+	sync.Mutex
+	callbackCalled bool
+
+	createAt               time.Time
+	gotSemaphoreAt         time.Time
+	addToBuilderAt         time.Time
+	flushAt                time.Time
+	lastTimeCheckCtxDoneAt time.Time
+	receiveResponseAt      time.Time
+	callBackCallAt         time.Time
+}
+
+type onceCallback struct {
+	once     sync.Once
+	callback func(MessageID, *ProducerMessage, error)
+}
+
+func (request *sendRequest) CallBack(messageID MessageID, msg *ProducerMessage, err error) {
+	request.callback.once.Do(func() {

Review comment:
       Why does this need a once lock?

##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}
 	}, true)
 
-	wg.Wait()
-	return msgID, err
+	for {
+		select {
+		case <-wg:
+			close(wg)
+			return msgID, err
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:

Review comment:
       Sleep defaults are not needed here and same for the ones below. The select will block until i can read/write to a channel




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org