You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/20 20:03:06 UTC

[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #394: Support sendTimeout

cckellogg commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527941803



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 		return
 	}
 
+	p.queueLock.Lock()
 	p.pendingQueue.Put(&pendingItem{
+		createdAt:    time.Now(),
 		batchData:    batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
+	p.queueLock.Unlock()
+
 	p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+	t := time.NewTimer(sendTimeout)
+	var state int32
+	for range t.C {
+		// since Closing/Closed state could not be reopen, load and compare is safe
+		state = atomic.LoadInt32(&p.state)
+		if state == producerClosing || state == producerClosed {
+			t.Stop()
+			return
+		}
+
+		p.queueLock.Lock()
+		item := p.pendingQueue.Peek()
+		if item == nil {
+			t.Reset(p.options.SendTimeout)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		pi := item.(*pendingItem)
+		diff := p.options.SendTimeout - time.Since(pi.createdAt)
+		if diff > 0 {
+			// pending messages not timeout yet
+			t.Reset(diff)
+			p.queueLock.Unlock()
+			continue
+		}
+
+		p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+		for p.pendingQueue.Size() > 0 {
+			pi = p.pendingQueue.Poll().(*pendingItem)
+			pi.Lock()
+			for _, i := range pi.sendRequests {
+				sr := i.(*sendRequest)
+				if sr.msg != nil {
+					size := len(sr.msg.Payload)
+					p.publishSemaphore.Release()
+					messagesPending.Dec()
+					bytesPending.Sub(float64(size))
+					publishErrors.Inc()
+					p.log.WithError(errSendTimeout).
+						WithField("size", size).
+						WithField("properties", sr.msg.Properties)
+				}
+				if sr.callback != nil {
+					sr.callback(nil, sr.msg, errSendTimeout)
+				}
+			}
+			buffersPool.Put(pi.batchData)
+			pi.Unlock()
+		}
+		t.Reset(p.options.SendTimeout)
+		p.queueLock.Unlock()
+	}
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
+	p.queueLock.Lock()

Review comment:
       Instead of having a separate go routine checking for timeouts can the message expiry check be performed here?
   
   I don't think the timeout check needs to be exact? We just need to expires old messages at some point? This could simplify the code and reduces the need for another lock. We can just call this method `failTimeoutMessages` and expire messages if needed. 
    

##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
+	// SendTimeout set the timeout for a message that not be acknowledged by server since sent.
+	// Send and SendAsync returns an error after timeout.
+	// Default is 30 seconds, -1 to disable.
+	SendTimeout time.Duration
+
+	// NonBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full.
+	// Default is false, if set to true then Send and SendAsync return error when queue is full.
+	NonBlockIfQueueFull bool

Review comment:
       `DisableBlockIfQueueFull` this is clearer to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org