You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/17 15:21:14 UTC

[GitHub] [pulsar-client-go] WJL3333 opened a new pull request #252: Add checkout by context

WJL3333 opened a new pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252


   <--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   Fixes #<xyz>
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #<xyz>
   
   ### Motivation
   
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-736160523


   Will close this issue, the #394 has achieved this feature.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427035590



##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}
 	}, true)
 
-	wg.Wait()
-	return msgID, err
+	for {
+		select {
+		case <-wg:
+			close(wg)
+			return msgID, err
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval)
+		}
+	}
 }
 
 func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error)) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: false,
+	if ctx == nil {
+		callback(nil, msg, ErrNilContextPass)
+	}
+
+	sr := newsendRequest(ctx, msg, callback, false)
+	// may be message will block on acquire semaphore
+	for {
+		select {
+		case p.publishSemaphore <- true:
+			sr.gotSemaphoreAt = time.Now()
+			p.eventsChan <- sr
+			return
+		case <-ctx.Done():
+			log.Debugf("send timeout because for publishSemaphore %v", msg)
+			callback(nil, msg, ctx.Err())
+			return
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
+		}
 	}
-	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: flushImmediately,
+
+	// may be message will block on acquire semaphore
+	sr := newsendRequest(ctx, msg, callback, flushImmediately)
+	for {
+		select {
+		case p.publishSemaphore <- true:
+			sr.gotSemaphoreAt = time.Now()
+			p.eventsChan <- sr
+			return
+		case <-ctx.Done():
+			log.Debugf("send timeout because for publishSemaphore %v", msg)
+			callback(nil, msg, ctx.Err())
+			return
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
+		}
 	}
-	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
-	pi, ok := p.pendingQueue.Peek().(*pendingItem)
 
-	if !ok {
+	// inject point for mock slow response
+	p.beforeReceiveResponse(response)

Review comment:
       this is for mock bad network in unit test. maybe i can remove it from the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-629949404


   hi,this pr has some bad test case,i'm working on it. any suggestions?i'm new to unit test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] yarthur1 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
yarthur1 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428483868



##########
File path: pulsar/producer_partition.go
##########
@@ -490,49 +475,77 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	}
 
 	sr := newsendRequest(ctx, msg, callback, false)
+
+	if sr.needCheckTimeoutOrCancel() {
+		p.checkTimeoutQueue.Put(sr)
+	}
+
+	// may be message will block on eventChan publish event
+	checkContextIfBlockOnEventChan := func() (ctxDone bool) {
+		for {
+			select {
+			case p.eventsChan <- sr:
+				return false
+			case <-ctx.Done():
+				log.Debugf("send timeout because for eventChan Block %v", msg)
+				sr.CallBack(nil, msg, ctx.Err())
+				return true
+			}
+		}
+	}
+
 	// may be message will block on acquire semaphore
 	for {
 		select {
 		case p.publishSemaphore <- true:
-			sr.gotSemaphoreAt = time.Now()
-			p.eventsChan <- sr
+			checkContextIfBlockOnEventChan()
 			return
 		case <-ctx.Done():
 			log.Debugf("send timeout because for publishSemaphore %v", msg)
-			callback(nil, msg, ctx.Err())
+			sr.CallBack(nil, msg, ctx.Err())
 			return
-		default:
-			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
 		}
 	}
 }
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
+	sr := newsendRequest(ctx, msg, callback, flushImmediately)
+
+	if sr.needCheckTimeoutOrCancel() {
+		p.checkTimeoutQueue.Put(sr)
+	}
+
+	// may be message will block on eventChan publish event
+	checkContextIfBlockOnEventChan := func() (ctxDone bool) {
+		for {
+			select {
+			case p.eventsChan <- sr:
+				return false
+			case <-ctx.Done():
+				log.Debugf("send timeout because for eventChan Block %v", msg)
+				sr.CallBack(nil, msg, ctx.Err())
+				return true

Review comment:
       the same case with SendAsync 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] yarthur1 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
yarthur1 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428483179



##########
File path: pulsar/producer_partition.go
##########
@@ -490,49 +475,77 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	}
 
 	sr := newsendRequest(ctx, msg, callback, false)
+
+	if sr.needCheckTimeoutOrCancel() {
+		p.checkTimeoutQueue.Put(sr)
+	}
+
+	// may be message will block on eventChan publish event
+	checkContextIfBlockOnEventChan := func() (ctxDone bool) {
+		for {
+			select {
+			case p.eventsChan <- sr:
+				return false
+			case <-ctx.Done():
+				log.Debugf("send timeout because for eventChan Block %v", msg)
+				sr.CallBack(nil, msg, ctx.Err())
+				return true

Review comment:
       if go in this ,i think should release publishSemaphore after callback




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r426723771



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -141,6 +164,16 @@ func (bq *blockingQueue) Peek() interface{} {
 	return bq.items[bq.headIdx]
 }
 
+func (bq *blockingQueue) PeekApply(consumer func(item interface{})) bool {

Review comment:
       Is this function used anywhere?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428223019



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -131,6 +139,21 @@ func (bq *blockingQueue) Poll() interface{} {
 	return bq.dequeue()
 }
 
+func (bq *blockingQueue) PollIfSatisfy(condition func(interface{}) bool) (item interface{}, empty bool, satisfy bool) {

Review comment:
       and there is a new queue in partitionProducer. i use it to check if the sendRequest context is Done. the origin logic in pendingQueue is not changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-640973066


   @WJL3333 Can you fix conflict? 
   
   ping @cckellogg @merlimat PTAL again thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428177301



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -131,6 +139,21 @@ func (bq *blockingQueue) Poll() interface{} {
 	return bq.dequeue()
 }
 
+func (bq *blockingQueue) PollIfSatisfy(condition func(interface{}) bool) (item interface{}, empty bool, satisfy bool) {

Review comment:
       I don't think we need to take messages out individually from the queue. That will easily break ordering. 
   
   Also, we don't need to proactively remove any request from the queue. Just check if the context is expired ***before*** queuing and also when trying to write on the socket. That should be all that we need. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428214744



##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}
 	}, true)
 
-	wg.Wait()
-	return msgID, err
+	for {
+		select {
+		case <-wg:
+			close(wg)
+			return msgID, err
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval)
+		}
+	}
 }
 
 func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error)) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: false,
+	if ctx == nil {
+		callback(nil, msg, ErrNilContextPass)
+	}
+
+	sr := newsendRequest(ctx, msg, callback, false)
+	// may be message will block on acquire semaphore
+	for {
+		select {
+		case p.publishSemaphore <- true:
+			sr.gotSemaphoreAt = time.Now()
+			p.eventsChan <- sr
+			return
+		case <-ctx.Done():
+			log.Debugf("send timeout because for publishSemaphore %v", msg)
+			callback(nil, msg, ctx.Err())
+			return
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
+		}
 	}
-	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: flushImmediately,
+
+	// may be message will block on acquire semaphore
+	sr := newsendRequest(ctx, msg, callback, flushImmediately)
+	for {
+		select {
+		case p.publishSemaphore <- true:
+			sr.gotSemaphoreAt = time.Now()
+			p.eventsChan <- sr
+			return
+		case <-ctx.Done():
+			log.Debugf("send timeout because for publishSemaphore %v", msg)
+			callback(nil, msg, ctx.Err())
+			return
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
+		}
 	}
-	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
-	pi, ok := p.pendingQueue.Peek().(*pendingItem)
 
-	if !ok {
+	// inject point for mock slow response
+	p.beforeReceiveResponse(response)

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427034686



##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}

Review comment:
       fill fix it

##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}

Review comment:
       will fix it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427034622



##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}
 	}, true)
 
-	wg.Wait()
-	return msgID, err
+	for {
+		select {
+		case <-wg:
+			close(wg)
+			return msgID, err
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:

Review comment:
       got it, will fix




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] yarthur1 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
yarthur1 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428423636



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -131,6 +139,21 @@ func (bq *blockingQueue) Poll() interface{} {
 	return bq.dequeue()
 }
 
+func (bq *blockingQueue) PollIfSatisfy(condition func(interface{}) bool) (item interface{}, empty bool, satisfy bool) {

Review comment:
       Based on your comit,I have modified something,can you check it?
   https://github.com/WJL3333/pulsar-client-go/pull/1
   
   > and there is a new queue in partitionProducer. i use it to check if the sendRequest context is Done. the origin logic in pendingQueue is not changed.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427034238



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -173,17 +206,27 @@ func (bq *blockingQueue) Size() int {
 	return bq.size
 }
 
-func (bq *blockingQueue) Iterator() BlockingQueueIterator {
-	bq.mutex.Lock()
-	defer bq.mutex.Unlock()
-
+func (bq *blockingQueue) iterator() BlockingQueueIterator {
 	return &blockingQueueIterator{
 		bq:      bq,
 		readIdx: bq.headIdx,
 		toRead:  bq.size,
 	}
 }
 
+func (bq *blockingQueue) IterateIfNonEmpty(operation IterateFunc) {

Review comment:
       i think BlockingQueue is thread safe.
   but if you run the following code
   
   ```go 
   q := NewBlockingQueue(3)
   q.Put(1)
   q.Put(2)
   q.Put(3)
   iter := q.Iterator()
   q.Poll()
   q.Put(4)
   ```
   we got 4,1,2 here. but the queue should return 2 before 4.
   if some goroutine run concurrently. i think the Iterator is not woking as expected.
   so i add an safe IteratorIfNonEmpty to avoid this
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428214286



##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}
 	}, true)
 
-	wg.Wait()
-	return msgID, err
+	for {
+		select {
+		case <-wg:
+			close(wg)
+			return msgID, err
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:

Review comment:
       remove defaults here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] yarthur1 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
yarthur1 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428485255



##########
File path: pulsar/producer_partition.go
##########
@@ -322,9 +331,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 				WithField("sequenceID", sequenceID).
 				WithField("properties", msg.Properties).
 				Error("unable to send single message")

Review comment:
       if fail to add batch builder,should n't relaese publishSemaphore and callback? and why Send individually not retry if add fail?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r426287863



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -34,6 +34,11 @@ type BlockingQueue interface {
 	// Poll dequeue one item, return nil if queue is empty
 	Poll() interface{}
 
+	// Poll dequeue one item if the item meets the condition,
+	// return nil,true,false if queue is empty,
+	// return nil,false,false if not satisfy.
+	PollIfSatisfy(i func(item interface{}) bool) (item interface{}, empty bool, satisfy bool)

Review comment:
       This doesn't seem to be related with send timeout




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-641400060


   ping @cckellogg @merlimat PTAL. i can reply as soon as posible


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428214995



##########
File path: pulsar/producer_partition.go
##########
@@ -299,12 +333,57 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalCheckSendTimeout() {
+	// check items in pendingQueue until one not all expire
+	p.pendingQueue.IterateIfNonEmpty(func(item interface{}) bool {
+		pi := item.(*pendingItem)
+		pi.Lock()
+		defer pi.Unlock()
+		return pi.checkRequestsContextDone()
+	})
+}
+
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
-	sequenceID   uint64
-	sendRequests []interface{}
-	completed    bool
+	batchData      []byte
+	sequenceID     uint64
+	sendRequests   []interface{}
+	completed      bool
+	requestCtxDone *bitset.BitSet

Review comment:
       remove the dependency from project,instead we have an bool here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427034524



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -141,6 +164,16 @@ func (bq *blockingQueue) Peek() interface{} {
 	return bq.items[bq.headIdx]
 }
 
+func (bq *blockingQueue) PeekApply(consumer func(item interface{})) bool {

Review comment:
       i'll remove it from code.  :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] yarthur1 commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
yarthur1 commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-631855578


   @WJL3333 
   Based on your comit,I have modified something,can you check it?
   https://github.com/WJL3333/pulsar-client-go/pull/1


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-629964211


   > hi, this pr has some bad test case, i'm working on it. any suggestions?i'm new to unit test.
   > and i found in some case send timeout is a bit longer then expect. try to solve it too.
   > @merlimat @wolfstudy @jerrypeng
   
   Thanks @WJL3333 contribution, in unit test case, please avoid using `time.sleep()`. Because, in many cases, we have no way to know the specific situation of other people's servers and network resources.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r426723771



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -141,6 +164,16 @@ func (bq *blockingQueue) Peek() interface{} {
 	return bq.items[bq.headIdx]
 }
 
+func (bq *blockingQueue) PeekApply(consumer func(item interface{})) bool {

Review comment:
       It's generally not great to hold a lock when calling a callback this can lead to deadlocks.

##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}

Review comment:
       Update the variable name because when I see `wg` I think WaitGroup

##########
File path: pulsar/producer_partition.go
##########
@@ -299,12 +333,57 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalCheckSendTimeout() {
+	// check items in pendingQueue until one not all expire
+	p.pendingQueue.IterateIfNonEmpty(func(item interface{}) bool {
+		pi := item.(*pendingItem)
+		pi.Lock()
+		defer pi.Unlock()
+		return pi.checkRequestsContextDone()
+	})
+}
+
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
-	sequenceID   uint64
-	sendRequests []interface{}
-	completed    bool
+	batchData      []byte
+	sequenceID     uint64
+	sendRequests   []interface{}
+	completed      bool
+	requestCtxDone *bitset.BitSet
+}
+
+// need Lock() before call this func
+func (pi *pendingItem) setFlushAt(flushAt time.Time) {
+	for _, i := range pi.sendRequests {
+		sr := i.(*sendRequest)
+		sr.Lock()
+		sr.flushAt = flushAt

Review comment:
       Does each sendRequest need a flushAt? Can this just be at the pendingItem level?

##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -173,17 +206,27 @@ func (bq *blockingQueue) Size() int {
 	return bq.size
 }
 
-func (bq *blockingQueue) Iterator() BlockingQueueIterator {
-	bq.mutex.Lock()
-	defer bq.mutex.Unlock()
-
+func (bq *blockingQueue) iterator() BlockingQueueIterator {
 	return &blockingQueueIterator{
 		bq:      bq,
 		readIdx: bq.headIdx,
 		toRead:  bq.size,
 	}
 }
 
+func (bq *blockingQueue) IterateIfNonEmpty(operation IterateFunc) {

Review comment:
       What's wrong with the original Iterator() function?

##########
File path: pulsar/producer_partition.go
##########
@@ -299,12 +333,57 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalCheckSendTimeout() {
+	// check items in pendingQueue until one not all expire
+	p.pendingQueue.IterateIfNonEmpty(func(item interface{}) bool {
+		pi := item.(*pendingItem)
+		pi.Lock()
+		defer pi.Unlock()
+		return pi.checkRequestsContextDone()
+	})
+}
+
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
-	sequenceID   uint64
-	sendRequests []interface{}
-	completed    bool
+	batchData      []byte
+	sequenceID     uint64
+	sendRequests   []interface{}
+	completed      bool
+	requestCtxDone *bitset.BitSet

Review comment:
       Why is this needed? It looks like they set in one place. Would just a bool flag work?

##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}
 	}, true)
 
-	wg.Wait()
-	return msgID, err
+	for {
+		select {
+		case <-wg:
+			close(wg)
+			return msgID, err
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval)
+		}
+	}
 }
 
 func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error)) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: false,
+	if ctx == nil {
+		callback(nil, msg, ErrNilContextPass)
+	}
+
+	sr := newsendRequest(ctx, msg, callback, false)
+	// may be message will block on acquire semaphore
+	for {
+		select {
+		case p.publishSemaphore <- true:
+			sr.gotSemaphoreAt = time.Now()
+			p.eventsChan <- sr
+			return
+		case <-ctx.Done():
+			log.Debugf("send timeout because for publishSemaphore %v", msg)
+			callback(nil, msg, ctx.Err())
+			return
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
+		}
 	}
-	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
-	p.publishSemaphore.Acquire()
-	sr := &sendRequest{
-		ctx:              ctx,
-		msg:              msg,
-		callback:         callback,
-		flushImmediately: flushImmediately,
+
+	// may be message will block on acquire semaphore
+	sr := newsendRequest(ctx, msg, callback, flushImmediately)
+	for {
+		select {
+		case p.publishSemaphore <- true:
+			sr.gotSemaphoreAt = time.Now()
+			p.eventsChan <- sr
+			return
+		case <-ctx.Done():
+			log.Debugf("send timeout because for publishSemaphore %v", msg)
+			callback(nil, msg, ctx.Err())
+			return
+		default:
+			time.Sleep(p.options.SendTimeoutCheckInterval / 4)
+		}
 	}
-	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
-	pi, ok := p.pendingQueue.Peek().(*pendingItem)
 
-	if !ok {
+	// inject point for mock slow response
+	p.beforeReceiveResponse(response)

Review comment:
       What is the purpose of this line?

##########
File path: pulsar/producer_partition.go
##########
@@ -493,11 +637,100 @@ func (p *partitionProducer) Close() {
 	wg.Wait()
 }
 
+func (p *partitionProducer) beforeReceiveResponse(receipt *pb.CommandSendReceipt) {
+	if p.options.beforeReceiveResponseCallback != nil {
+		p.options.beforeReceiveResponseCallback(receipt)
+	}
+}
+
 type sendRequest struct {
 	ctx              context.Context
 	msg              *ProducerMessage
-	callback         func(MessageID, *ProducerMessage, error)
+	callback         *onceCallback
 	flushImmediately bool
+
+	sync.Mutex
+	callbackCalled bool
+
+	createAt               time.Time
+	gotSemaphoreAt         time.Time
+	addToBuilderAt         time.Time
+	flushAt                time.Time
+	lastTimeCheckCtxDoneAt time.Time
+	receiveResponseAt      time.Time
+	callBackCallAt         time.Time
+}
+
+type onceCallback struct {
+	once     sync.Once
+	callback func(MessageID, *ProducerMessage, error)
+}
+
+func (request *sendRequest) CallBack(messageID MessageID, msg *ProducerMessage, err error) {
+	request.callback.once.Do(func() {
+		request.callback.callback(messageID, msg, err)
+		request.callBackCallAt = time.Now()
+		log.Debugf("gotSemaphoreCost=%v, addToBuilderCost=%v, "+

Review comment:
       You should check if debug is enabled before logging a large debug method.

##########
File path: pulsar/producer_partition.go
##########
@@ -493,11 +637,100 @@ func (p *partitionProducer) Close() {
 	wg.Wait()
 }
 
+func (p *partitionProducer) beforeReceiveResponse(receipt *pb.CommandSendReceipt) {
+	if p.options.beforeReceiveResponseCallback != nil {
+		p.options.beforeReceiveResponseCallback(receipt)
+	}
+}
+
 type sendRequest struct {
 	ctx              context.Context
 	msg              *ProducerMessage
-	callback         func(MessageID, *ProducerMessage, error)
+	callback         *onceCallback
 	flushImmediately bool
+
+	sync.Mutex
+	callbackCalled bool
+
+	createAt               time.Time
+	gotSemaphoreAt         time.Time
+	addToBuilderAt         time.Time
+	flushAt                time.Time
+	lastTimeCheckCtxDoneAt time.Time
+	receiveResponseAt      time.Time
+	callBackCallAt         time.Time
+}
+
+type onceCallback struct {
+	once     sync.Once
+	callback func(MessageID, *ProducerMessage, error)
+}
+
+func (request *sendRequest) CallBack(messageID MessageID, msg *ProducerMessage, err error) {
+	request.callback.once.Do(func() {

Review comment:
       Why does this need a once lock?

##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}
 	}, true)
 
-	wg.Wait()
-	return msgID, err
+	for {
+		select {
+		case <-wg:
+			close(wg)
+			return msgID, err
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:

Review comment:
       Sleep defaults are not needed here and same for the ones below. The select will block until i can read/write to a channel




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 edited a comment on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 edited a comment on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-629949404


   hi, this pr has some bad test case, i'm working on it. any suggestions?i'm new to unit test.
   and i found in some case send timeout is a bit longer then expect. try to solve it too.
   @merlimat @wolfstudy @jerrypeng 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-705920708


   @merlimat Can you double-check this change? It seems that this change introduces a new context object, which may conflict with the context we used before.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-642550145


   Move the milestones to 0.2.0


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428214592



##########
File path: pulsar/producer_partition.go
##########
@@ -493,11 +637,100 @@ func (p *partitionProducer) Close() {
 	wg.Wait()
 }
 
+func (p *partitionProducer) beforeReceiveResponse(receipt *pb.CommandSendReceipt) {
+	if p.options.beforeReceiveResponseCallback != nil {
+		p.options.beforeReceiveResponseCallback(receipt)
+	}
+}
+
 type sendRequest struct {
 	ctx              context.Context
 	msg              *ProducerMessage
-	callback         func(MessageID, *ProducerMessage, error)
+	callback         *onceCallback
 	flushImmediately bool
+
+	sync.Mutex
+	callbackCalled bool
+
+	createAt               time.Time
+	gotSemaphoreAt         time.Time
+	addToBuilderAt         time.Time
+	flushAt                time.Time
+	lastTimeCheckCtxDoneAt time.Time
+	receiveResponseAt      time.Time
+	callBackCallAt         time.Time
+}
+
+type onceCallback struct {
+	once     sync.Once
+	callback func(MessageID, *ProducerMessage, error)
+}
+
+func (request *sendRequest) CallBack(messageID MessageID, msg *ProducerMessage, err error) {
+	request.callback.once.Do(func() {
+		request.callback.callback(messageID, msg, err)
+		request.callBackCallAt = time.Now()
+		log.Debugf("gotSemaphoreCost=%v, addToBuilderCost=%v, "+

Review comment:
       remove the large debug log




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy closed pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
wolfstudy closed pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428215256



##########
File path: pulsar/producer_partition.go
##########
@@ -299,12 +333,57 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalCheckSendTimeout() {
+	// check items in pendingQueue until one not all expire
+	p.pendingQueue.IterateIfNonEmpty(func(item interface{}) bool {
+		pi := item.(*pendingItem)
+		pi.Lock()
+		defer pi.Unlock()
+		return pi.checkRequestsContextDone()
+	})
+}
+
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
-	sequenceID   uint64
-	sendRequests []interface{}
-	completed    bool
+	batchData      []byte
+	sequenceID     uint64
+	sendRequests   []interface{}
+	completed      bool
+	requestCtxDone *bitset.BitSet
+}
+
+// need Lock() before call this func
+func (pi *pendingItem) setFlushAt(flushAt time.Time) {
+	for _, i := range pi.sendRequests {
+		sr := i.(*sendRequest)
+		sr.Lock()
+		sr.flushAt = flushAt

Review comment:
       remove the whole func, because no debug log now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-631875982


   @merlimat @cckellogg  @jerrypeng  @wolfstudy i think the code works fine now. could you take a look.some logic rewrited.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#issuecomment-705920708


   @merlimat Can you double-check this change? It seems that this change introduces a new context object, which may conflict with the context we used before.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427035180



##########
File path: pulsar/producer_partition.go
##########
@@ -493,11 +637,100 @@ func (p *partitionProducer) Close() {
 	wg.Wait()
 }
 
+func (p *partitionProducer) beforeReceiveResponse(receipt *pb.CommandSendReceipt) {
+	if p.options.beforeReceiveResponseCallback != nil {
+		p.options.beforeReceiveResponseCallback(receipt)
+	}
+}
+
 type sendRequest struct {
 	ctx              context.Context
 	msg              *ProducerMessage
-	callback         func(MessageID, *ProducerMessage, error)
+	callback         *onceCallback
 	flushImmediately bool
+
+	sync.Mutex
+	callbackCalled bool
+
+	createAt               time.Time
+	gotSemaphoreAt         time.Time
+	addToBuilderAt         time.Time
+	flushAt                time.Time
+	lastTimeCheckCtxDoneAt time.Time
+	receiveResponseAt      time.Time
+	callBackCallAt         time.Time
+}
+
+type onceCallback struct {
+	once     sync.Once
+	callback func(MessageID, *ProducerMessage, error)
+}
+
+func (request *sendRequest) CallBack(messageID MessageID, msg *ProducerMessage, err error) {
+	request.callback.once.Do(func() {

Review comment:
       i just want to call the callback once. whatever how many times you call CallBack.
   
   if the context has Done. won't change the origin logic. and just call CallBack as before.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428220361



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -131,6 +139,21 @@ func (bq *blockingQueue) Poll() interface{} {
 	return bq.dequeue()
 }
 
+func (bq *blockingQueue) PollIfSatisfy(condition func(interface{}) bool) (item interface{}, empty bool, satisfy bool) {

Review comment:
       yeah, but i seen some use case that we just check with peek and poll after that. this may be not thread safe. if we check and another goroutine poll at the same time. 
   
   and the implementation now is check the context before queuing and also before to write the socket. 
   
   i used to just check the context at those points, but the Send and the SendAsync callback cost much longer time then expected. so there is an external goroutine to check to if the context is Done. and i add some other points  check the context. and we got the expect callback return time near sendtimeout




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427035337



##########
File path: pulsar/producer_partition.go
##########
@@ -493,11 +637,100 @@ func (p *partitionProducer) Close() {
 	wg.Wait()
 }
 
+func (p *partitionProducer) beforeReceiveResponse(receipt *pb.CommandSendReceipt) {
+	if p.options.beforeReceiveResponseCallback != nil {
+		p.options.beforeReceiveResponseCallback(receipt)
+	}
+}
+
 type sendRequest struct {
 	ctx              context.Context
 	msg              *ProducerMessage
-	callback         func(MessageID, *ProducerMessage, error)
+	callback         *onceCallback
 	flushImmediately bool
+
+	sync.Mutex
+	callbackCalled bool
+
+	createAt               time.Time
+	gotSemaphoreAt         time.Time
+	addToBuilderAt         time.Time
+	flushAt                time.Time
+	lastTimeCheckCtxDoneAt time.Time
+	receiveResponseAt      time.Time
+	callBackCallAt         time.Time
+}
+
+type onceCallback struct {
+	once     sync.Once
+	callback func(MessageID, *ProducerMessage, error)
+}
+
+func (request *sendRequest) CallBack(messageID MessageID, msg *ProducerMessage, err error) {
+	request.callback.once.Do(func() {
+		request.callback.callback(messageID, msg, err)
+		request.callBackCallAt = time.Now()
+		log.Debugf("gotSemaphoreCost=%v, addToBuilderCost=%v, "+

Review comment:
       ok, i'll fix it
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] yarthur1 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
yarthur1 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428484520



##########
File path: pulsar/producer_partition.go
##########
@@ -490,49 +475,77 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	}

Review comment:
       if ctx == nil,  return after callback




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428213897



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -141,6 +164,16 @@ func (bq *blockingQueue) Peek() interface{} {
 	return bq.items[bq.headIdx]
 }
 
+func (bq *blockingQueue) PeekApply(consumer func(item interface{})) bool {

Review comment:
       removed
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427036000



##########
File path: pulsar/producer_partition.go
##########
@@ -299,12 +333,57 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalCheckSendTimeout() {
+	// check items in pendingQueue until one not all expire
+	p.pendingQueue.IterateIfNonEmpty(func(item interface{}) bool {
+		pi := item.(*pendingItem)
+		pi.Lock()
+		defer pi.Unlock()
+		return pi.checkRequestsContextDone()
+	})
+}
+
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
-	sequenceID   uint64
-	sendRequests []interface{}
-	completed    bool
+	batchData      []byte
+	sequenceID     uint64
+	sendRequests   []interface{}
+	completed      bool
+	requestCtxDone *bitset.BitSet
+}
+
+// need Lock() before call this func
+func (pi *pendingItem) setFlushAt(flushAt time.Time) {
+	for _, i := range pi.sendRequests {
+		sr := i.(*sendRequest)
+		sr.Lock()
+		sr.flushAt = flushAt

Review comment:
       this is just for statistic log in sendRequest Callback. need this var to print debug log.
   maybe i can remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428220361



##########
File path: pulsar/internal/blocking_queue.go
##########
@@ -131,6 +139,21 @@ func (bq *blockingQueue) Poll() interface{} {
 	return bq.dequeue()
 }
 
+func (bq *blockingQueue) PollIfSatisfy(condition func(interface{}) bool) (item interface{}, empty bool, satisfy bool) {

Review comment:
       yeah, but i seen some use case that we just check with peek and poll after that. this may be not thread safe. if we check and another goroutine poll at the same time. 
   
   and the implementation now is check the context before queuing and also before to write the socket. 
   
   i used to just check the context at those points, but the Send and the SendAsync callback cost much longer time then expected. so there is an external goroutine to check to if the context is Done. and i add some other points  check the context. and we got the expected callback return time near sendtimeout




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r428214034



##########
File path: pulsar/producer_partition.go
##########
@@ -343,94 +444,136 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
-	sendReq := &sendRequest{
-		msg: nil,
-		callback: func(id MessageID, message *ProducerMessage, e error) {
+	sendReq := newsendRequest(context.Background(),
+		nil,
+		func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
 		},
-	}
+		false)
 
 	pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	if ctx == nil {
+		return nil, ErrNilContextPass
+	}
+	wg := make(chan struct{}, 1)
 
 	var err error
 	var msgID MessageID
 
 	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		msgID = ID
-		wg.Done()
+		wg <- struct{}{}

Review comment:
       rename to chanWaitGroup




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] WJL3333 commented on a change in pull request #252: [Issue 242][pulsar-client-go] feature: add send timeout

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #252:
URL: https://github.com/apache/pulsar-client-go/pull/252#discussion_r427036551



##########
File path: pulsar/producer_partition.go
##########
@@ -299,12 +333,57 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalCheckSendTimeout() {
+	// check items in pendingQueue until one not all expire
+	p.pendingQueue.IterateIfNonEmpty(func(item interface{}) bool {
+		pi := item.(*pendingItem)
+		pi.Lock()
+		defer pi.Unlock()
+		return pi.checkRequestsContextDone()
+	})
+}
+
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
-	sequenceID   uint64
-	sendRequests []interface{}
-	completed    bool
+	batchData      []byte
+	sequenceID     uint64
+	sendRequests   []interface{}
+	completed      bool
+	requestCtxDone *bitset.BitSet

Review comment:
       this is for check if all the sendRequests in pendingItem has been done.
   maybe partial or all of it. a bool can't judge partial done case. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org