You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:34:04 UTC

[pulsar-client-go] 27/38: Fixed releasing of semaphore for each send request

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit b329bbff4d8a86c12c3b673d8ee7c612439ec6ae
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 4 15:45:02 2019 -0700

    Fixed releasing of semaphore for each send request
---
 pulsar/impl_partition_producer.go |  2 +-
 pulsar/producer_test.go           | 53 ++++++++++++++++++++++++++++++++++++++-
 2 files changed, 53 insertions(+), 2 deletions(-)

diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 7b32fdc..5f3f671 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -313,11 +313,11 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 
 	// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
 	p.pendingQueue.Poll()
-	p.publishSemaphore.Release()
 	for _, i := range pi.sendRequests {
 		sr := i.(*sendRequest)
 		atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceId))
 		if sr.callback != nil {
+			p.publishSemaphore.Release()
 			sr.callback(nil, sr.msg, nil)
 		}
 	}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index fa197b4..f7455ce 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2,8 +2,12 @@ package pulsar
 
 import (
 	"context"
+	log "github.com/sirupsen/logrus"
 	"github.com/stretchr/testify/assert"
+	"pulsar-client-go-native/pulsar/impl/util"
+	"sync"
 	"testing"
+	"time"
 )
 
 func TestSimpleProducer(t *testing.T) {
@@ -34,6 +38,53 @@ func TestSimpleProducer(t *testing.T) {
 	assert.NoError(t, err)
 }
 
+func TestProducerAsyncSend(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceUrl,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   newTopicName(),
+		BatchingMaxPublishDelay: 1 * time.Second,
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	wg := sync.WaitGroup{}
+	wg.Add(10)
+	errors := util.NewBlockingQueue(10)
+
+	for i := 0; i < 10; i++ {
+		producer.SendAsync(context.Background(), &ProducerMessage{
+			Payload: []byte("hello"),
+		}, func(id MessageID, message *ProducerMessage, e error) {
+			if e != nil {
+				log.WithError(e).Error("Failed to publish")
+				errors.Put(e)
+			} else {
+				log.Info("Published message ", id)
+			}
+			wg.Done()
+		})
+
+		assert.NoError(t, err)
+	}
+
+	producer.Flush()
+
+	wg.Wait()
+
+	assert.Equal(t, 0, errors.Size())
+
+	err = producer.Close()
+	assert.NoError(t, err)
+
+	err = client.Close()
+	assert.NoError(t, err)
+}
+
 func TestProducerCompression(t *testing.T) {
 
 	type testProvider struct {
@@ -108,4 +159,4 @@ func TestProducerLastSequenceID(t *testing.T) {
 
 	err = client.Close()
 	assert.NoError(t, err)
-}
\ No newline at end of file
+}