You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:34:04 UTC
[pulsar-client-go] 27/38: Fixed releasing of semaphore for each
send request
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit b329bbff4d8a86c12c3b673d8ee7c612439ec6ae
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 4 15:45:02 2019 -0700
Fixed releasing of semaphore for each send request
---
pulsar/impl_partition_producer.go | 2 +-
pulsar/producer_test.go | 53 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 53 insertions(+), 2 deletions(-)
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 7b32fdc..5f3f671 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -313,11 +313,11 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
p.pendingQueue.Poll()
- p.publishSemaphore.Release()
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceId))
if sr.callback != nil {
+ p.publishSemaphore.Release()
sr.callback(nil, sr.msg, nil)
}
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index fa197b4..f7455ce 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2,8 +2,12 @@ package pulsar
import (
"context"
+ log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
+ "pulsar-client-go-native/pulsar/impl/util"
+ "sync"
"testing"
+ "time"
)
func TestSimpleProducer(t *testing.T) {
@@ -34,6 +38,53 @@ func TestSimpleProducer(t *testing.T) {
assert.NoError(t, err)
}
+func TestProducerAsyncSend(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceUrl,
+ })
+ assert.NoError(t, err)
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ BatchingMaxPublishDelay: 1 * time.Second,
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ wg := sync.WaitGroup{}
+ wg.Add(10)
+ errors := util.NewBlockingQueue(10)
+
+ for i := 0; i < 10; i++ {
+ producer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ }, func(id MessageID, message *ProducerMessage, e error) {
+ if e != nil {
+ log.WithError(e).Error("Failed to publish")
+ errors.Put(e)
+ } else {
+ log.Info("Published message ", id)
+ }
+ wg.Done()
+ })
+
+ assert.NoError(t, err)
+ }
+
+ producer.Flush()
+
+ wg.Wait()
+
+ assert.Equal(t, 0, errors.Size())
+
+ err = producer.Close()
+ assert.NoError(t, err)
+
+ err = client.Close()
+ assert.NoError(t, err)
+}
+
func TestProducerCompression(t *testing.T) {
type testProvider struct {
@@ -108,4 +159,4 @@ func TestProducerLastSequenceID(t *testing.T) {
err = client.Close()
assert.NoError(t, err)
-}
\ No newline at end of file
+}