You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/18 12:46:45 UTC

[GitHub] [pulsar-client-go] wuYin opened a new pull request #394: Support sendTimeout

wuYin opened a new pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394


   ### Motivation
   
   Support `sendTimeout` and `blockIfQueueFull` for producer.
   
   ### Modifications
   
   - Add 2 configurations into `ProducerOptions`
   - Add sendTimeout timer goroutine to check and fail time-out pending messages.
   - Check `BlockIfQueueFull` before acquiring publish semaphore while sending messages.
   - Add `TestSendTimeout` to verify sendTimeout effective whille backlog exceeded on topic.
   
   ### Others
   client-go and client-java have different ways to disable `sendTimeout`
   - client-java: set `sendTimeout` to 0 explicitly, to override default 30s configuration.
   - client-go: set `sendTimeout` to -1, to bypass unset detection.
   
   ### Documentation
   [configure-producer](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   Personally think there are 2 ways to implement `sendTimeout`:
   
   **message level**
   - Perpose: precisely control the `sendTimeout` for each message, so that different messages can set different timeout, like `consumer.Receive()` did.
   - Implement: set timeout context for `Send`, and check every message before dequeue, or something else.
   
   **producer level**
   - Perpose: `sendTimeout` for producer instance is sufficient for most situation, like client-java did.
   - Implement: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try context way If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   Personally think there are 2 ways to implement `sendTimeout`:
   
   **message level**
   - Perpose: precisely control the `sendTimeout` for each message, so that different messages can set different timeout, like `consumer.Receive()` did.
   - Implementation: set timeout context for Send, and check for every message.
   
   **producer level**
   - Perpose: `sendTimeout` for producer instance is sufficient for most situation, like client-java did.
   - Implementation: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   
   Personally think there are 2 ways to implement `sendTimeout`
   1. **message level**
       - Perpose: precisely control the expiration time of every message, so that different messages can have different expiration, like `consumer.Receive()` did.
       - Implementation: set timeout context in Send, and check for every message.
   
   2. **producer level**
       - Perpose: send timeout for producer instance is sufficient for most situation, like client-java did.
       - Implementation: set timeout timer to periodic fail pending queue.
       - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527370463



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// BlockIfQueueFull control whether Send and SendAsync return error if producer's message queue is full.
+	// Default is false.
+	BlockIfQueueFull bool

Review comment:
       Thanks for review.
   I intented to maintain the same default behavior as [client-java](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer), but this does change the default behavior of current client-go.
   I replaced `NonBlockIfQueueFull` with `BlockIfQueueFull`, just like `DisableBatching` for enable batch by default.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r533033581



##########
File path: pulsar/producer_partition.go
##########
@@ -430,26 +432,74 @@ type pendingItem struct {
 	sync.Mutex
 	batchData    internal.Buffer
 	sequenceID   uint64
-	sentAt       int64
+	sentAt       time.Time
 	sendRequests []interface{}
 	completed    bool
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
+	if p.options.SendTimeout > 0 {

Review comment:
       Yes, negative to disable, such as -1 typically.
   https://github.com/apache/pulsar-client-go/blob/master/pulsar/producer.go#L81




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528022439



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Thanks for review, good idea.
   In some case `producer.eventLoop()` will be stucked such as producer is reconnecting to broker, 2 ways:
   - Check at independent goroutine: `sendTimeout` still effective, periodic check and fail pending messages.
   - Check before flushing: `sendTimeout` not effective in this case.
   
   If no need to check message timeout precisely, the above case will be ignored.
   This is reasonable, since `sendTimeout` now implemented at **producer level**, no need to be that precise, also avoided the cost of lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   I'm going to try context instead of timer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527370463



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// BlockIfQueueFull control whether Send and SendAsync return error if producer's message queue is full.
+	// Default is false.
+	BlockIfQueueFull bool

Review comment:
       Thanks for review.
   I intented to maintain the same default behavior as [client-java](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer), but this does change the default behavior of current client-go.
   How about renaming `BlockIfQueueFull` to `NonBlockIfQueueFull`? just like `DisableBatching` for enable batch by default.
   I think the reason for doing this is, we couldn't check whether the user sets `false` explicitly or default `false`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528022439



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Thanks for review, good idea.
   In some case `producer.eventLoop()` will be stucked such as producer is reconnecting to broker, 2 ways:
   - Check at independent goroutine: `sendTimeout` still effective, periodic check and fail pending messages.
   - Check before flush: `sendTimeout` not effective in this case.
   
   If no need to check message timeout precisely, the above case will be ignored.
   This is reasonable, since sendTimeout now implemented at **producer level**, no need to be that precise, also avoid the cost of lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   Personally think there are 2 ways to implement `sendTimeout`:
   
   **message level**
   - Perpose: precisely control the `sendTimeout` for each message, so that different messages can set different timeout, like `consumer.Receive()` did.
   - Implementation: set timeout context for Send, and check every message before dequeue, or something else.
   
   **producer level**
   - Perpose: `sendTimeout` for producer instance is sufficient for most situation, like client-java did.
   - Implementation: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   Personally think there are 2 ways to implement `sendTimeout`:
   
   **message level**
   - Perpose: precisely control the `sendTimeout` for each message, so that different messages can set different timeout, like `consumer.Receive()` did.
   - Implement: set timeout context for `Send`, and check every message before dequeue, or something else.
   
   **producer level**
   - Perpose: `sendTimeout` for producer instance is sufficient for most situation, like client-java did.
   - Implement: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try context If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528032340



##########
File path: pulsar/producer_partition.go
##########
@@ -553,9 +613,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	// lock the pending item while sending the requests
 	pi.Lock()
 	defer pi.Unlock()
-	if pi.sentAt > 0 {

Review comment:
       @cckellogg 
   `sentAt` always set before puting into pending queue, so I removed this predicate.
   Like [sendRequest.publishTime](https://github.com/apache/pulsar-client-go/pull/394/files#diff-8adb09af2175be5751840343e14df2220a2154ca4bc8de21157ed112be25b6b9L565) did
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528032340



##########
File path: pulsar/producer_partition.go
##########
@@ -553,9 +613,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	// lock the pending item while sending the requests
 	pi.Lock()
 	defer pi.Unlock()
-	if pi.sentAt > 0 {

Review comment:
       @cckellogg 
   I used `sentAt` for checking timeout of messasges.
   `sentAt` always set before puting into pending queue, so I removed this predicate.
   Like [sendRequest.publishTime](https://github.com/apache/pulsar-client-go/blob/master/pulsar/producer_partition.go#L565) did.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528022439



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Thanks for review, good idea.
   In some case `producer.eventLoop()` will be stucked such as producer is reconnecting to broker, 2 ways:
   - Check at independent goroutine: `sendTimeout` still effective, periodic check and fail pending messages.
   - Check before flushing: `sendTimeout` not effective in this case.
   
   If no need to check message timeout precisely, the above case will be ignored.
   This is reasonable, since `sendTimeout` now implemented at **producer level**, no need to be that precise, also avoided cost of pending queue lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527370463



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// BlockIfQueueFull control whether Send and SendAsync return error if producer's message queue is full.
+	// Default is false.
+	BlockIfQueueFull bool

Review comment:
       Thanks for review.
   I intented to maintain the same default behavior as [client-java](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer), but this does change the default behavior of current client-go.
   I'll rename `BlockIfQueueFull` to `NonBlockIfQueueFull`, just like `DisableBatching` for enable batch by default.
   I think the reason for doing this is, we couldn't check whether the user sets `false` explicitly or default `false`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527370463



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// BlockIfQueueFull control whether Send and SendAsync return error if producer's message queue is full.
+	// Default is false.
+	BlockIfQueueFull bool

Review comment:
       Thanks for review.
   I intented to maintain the same default behavior as [client-java](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer), but this does change the default behavior of current client-go.
   I'll rename `BlockIfQueueFull` to `NonBlockIfQueueFull`? just like `DisableBatching` for enable batch by default.
   I think the reason for doing this is, we couldn't check whether the user sets `false` explicitly or default `false`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528032340



##########
File path: pulsar/producer_partition.go
##########
@@ -553,9 +613,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	// lock the pending item while sending the requests
 	pi.Lock()
 	defer pi.Unlock()
-	if pi.sentAt > 0 {

Review comment:
       @cckellogg 
   `sentAt` always set before puting into pending queue, so I removed this predicate.
   Like [sendRequest.publishTime](https://github.com/apache/pulsar-client-go/blob/master/pulsar/producer_partition.go#L565) did
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   Personally think there are 2 ways to implement `sendTimeout`:
   
   **message level**
   - Perpose: precisely control the `sendTimeout` for each message, so that different messages can set different timeout, like `consumer.Receive()` did.
   - Implement: set timeout context for Send, and check every message before dequeue, or something else.
   
   **producer level**
   - Perpose: `sendTimeout` for producer instance is sufficient for most situation, like client-java did.
   - Implement: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   Personally think there are 2 ways to implement `sendTimeout`
   
   **message level**
   - Perpose: precisely control the expiration time of every message, so that different messages can have different expiration, like `consumer.Receive()` did.
   - Implementation: set timeout context in Send, and check for every message.
   
   **producer level**
   - Perpose: send timeout for producer instance is sufficient for most situation, like client-java did.
   - Implementation: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528032340



##########
File path: pulsar/producer_partition.go
##########
@@ -553,9 +613,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	// lock the pending item while sending the requests
 	pi.Lock()
 	defer pi.Unlock()
-	if pi.sentAt > 0 {

Review comment:
       @cckellogg 
   I used `sentAt` for checking timeout of messasges.
   `sentAt` always be set before puting into pending queue, so I removed this predicate.
   Like [sendRequest.publishTime](https://github.com/apache/pulsar-client-go/blob/master/pulsar/producer_partition.go#L565) did.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527370463



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// BlockIfQueueFull control whether Send and SendAsync return error if producer's message queue is full.
+	// Default is false.
+	BlockIfQueueFull bool

Review comment:
       Thanks for review.
   
   I intented to maintain the same default behavior as [client-java](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer), but this does change the default behavior of current client-go.
   
   How about renaming `BlockIfQueueFull` to `NonBlockIfQueueFull`? just like `DisableBatching` for enable batch by default.
   
   I think the reason for doing this is, we couldn't check whether the user sets `false` explicitly or default `false`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   
   Personally think there are 2 ways to implement `sendTimeout`
   
   1. **message level**
       - Perpose: precisely control the expiration time of every message, so that different messages can have different expiration, like `consumer.Receive()` did.
       - Implementation: set timeout context in Send, and check for every message.
   
   2. **producer level**
       - Perpose: send timeout for producer instance is sufficient for most situation, like client-java did.
       - Implementation: set timeout timer to periodic fail pending queue.
       - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat merged pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   Personally think there are 2 ways to implement `sendTimeout`:
   
   **message level**
   - Perpose: precisely control the `sendTimeout` for each message, so that different messages can set different timeout, like `consumer.Receive()` did.
   - Implement: set timeout context for `Send`, and check every message before dequeue, or something else.
   
   **producer level**
   - Perpose: `sendTimeout` for producer instance is sufficient for most situation, like client-java did.
   - Implement: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528009532



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// NonBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full.
+	// Default is false, if set to true then Send and SendAsync return error when queue is full.
+	NonBlockIfQueueFull bool

Review comment:
       Thanks for review.
   Now I renamed `NonBlockIfQueueFull` to `DisableBlockIfQueueFull`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528022439



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Thanks for review, good idea.
   In some case `producer.eventLoop()` will be stucked such as producer is reconnecting to broker, 2 ways:
   - Check at independent goroutine: `sendTimeout` still effective, periodic check and fail pending messages.
   - Check before flushing: `sendTimeout` not effective in this case.
   
   If no need to check message timeout precisely, the above case will be ignored.
   This is reasonable, since `sendTimeout` now implemented at **producer level**, no need to be that precise, also avoid the cost of lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527941803



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Instead of having a separate go routine checking for timeouts can the message expiry check be performed here?
   
   I don't think the timeout check needs to be exact? We just need to expires old messages at some point? This could simplify the code and reduces the need for another lock. We can just call this method `failTimeoutMessages` and expire messages if needed. 
    

##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// NonBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full.
+	// Default is false, if set to true then Send and SendAsync return error when queue is full.
+	NonBlockIfQueueFull bool

Review comment:
       `DisableBlockIfQueueFull` this is clearer to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528022439



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Thanks for review, good idea.
   In some case `producer.eventLoop()` will be stucked such as producer is reconnecting to broker, 2 ways:
   - Check at independent goroutine: `sendTimeout` still effective, periodic check and fail pending messages.
   - Check before flush: `sendTimeout` not effective in this case.
   
   If no need to check message timeout precisely, the above case will be ignored.
   This is reasonable, since `sendTimeout` now implemented at **producer level**, no need to be that precise, also avoid the cost of lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528009532



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// NonBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full.
+	// Default is false, if set to true then Send and SendAsync return error when queue is full.
+	NonBlockIfQueueFull bool

Review comment:
       Thanks for review.
   Now I renamed NonBlockIfQueueFull to DisableBlockIfQueueFull.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528022439



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Thanks for review, good idea.
   In some case `eventLoop` will be stucked such as producer is reconnecting to broker, 2 ways:
   - Check at independent goroutine: `sendTimeout` still effective, periodic check and fail pending messages.
   - Check before flush: `sendTimeout` not effective in this case.
   
   If no need to check message timeout precisely, the above case will be ignored.
   This is reasonable, since sendTimeout now implemented at **producer level**, no need to be that precise, also avoid the cost of lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   Personally think there are 2 ways to implement `sendTimeout`:
   
   **message level**
   - Perpose: precisely control the `sendTimeout` for each message, so that different messages can set different timeout, like `consumer.Receive()` did.
   - Implementation: set timeout context for Send, and check every message before dequeue.
   
   **producer level**
   - Perpose: `sendTimeout` for producer instance is sufficient for most situation, like client-java did.
   - Implementation: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   I'll try context instead of timer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r532795246



##########
File path: pulsar/producer_partition.go
##########
@@ -430,26 +432,74 @@ type pendingItem struct {
 	sync.Mutex
 	batchData    internal.Buffer
 	sequenceID   uint64
-	sentAt       int64
+	sentAt       time.Time
 	sendRequests []interface{}
 	completed    bool
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
+	if p.options.SendTimeout > 0 {

Review comment:
       How can this be disabled? Setting it to -1?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527268799



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Shouldn't this be taken the `ctx` passed in by the application at the moment of `Send()` ?

##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// BlockIfQueueFull control whether Send and SendAsync return error if producer's message queue is full.
+	// Default is false.
+	BlockIfQueueFull bool

Review comment:
       The logic should be negated here. The default should be to block if queue full.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528022439



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Thanks for review, good idea.
   In some case `producer.eventLoop()` will be stucked such as producer is reconnecting to broker, in this case:
   - Check at independent goroutine: `sendTimeout` still effective, periodic check and fail pending messages.
   - Check before flushing: `sendTimeout` not effective in this case.
   
   If no need to check message timeout precisely, the above case will be ignored.
   This is reasonable, since `sendTimeout` now implemented at **producer level**, no need to be that precise, also avoided cost of pending queue lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wuYin commented on a change in pull request #394: Support sendTimeout

Posted by GitBox <gi...@apache.org>.
wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527372119



##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration

Review comment:
       Thanks for review.
   I see [PR#252](https://github.com/apache/pulsar-client-go/pull/252) implementing `sendTimeout` with context but progress has stalled.
   
   Personally think there are 2 ways to implement `sendTimeout`
   **message level**
   - Perpose: precisely control the expiration time of every message, so that different messages can have different expiration, like `consumer.Receive()` did.
   - Implementation: set timeout context in Send, and check for every message.
   
   **producer level**
   - Perpose: send timeout for producer instance is sufficient for most situation, like client-java did.
   - Implementation: set timeout timer to periodic fail pending queue.
   - Trade-off: optional big change, we may need remove context parameter from Send method.
   
   I will try If context way is better, looking forward to reply, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org