You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/04/29 13:33:26 UTC
[pulsar-client-go] branch master updated: [Issue 763][producer] Fix deadlock in Producer Send when message fails to encode. (#762)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5ee6330 [Issue 763][producer] Fix deadlock in Producer Send when message fails to encode. (#762)
5ee6330 is described below
commit 5ee63303d43e8ee05f2c957a32c5526f29f54a36
Author: samuelhewitt <13...@users.noreply.github.com>
AuthorDate: Fri Apr 29 09:33:21 2022 -0400
[Issue 763][producer] Fix deadlock in Producer Send when message fails to encode. (#762)
* Release semaphore and execute callback when message fails to encode.
Add tests for producer schema encode.
* Use well-defined error code.
Co-authored-by: shewitt <sh...@fanatics.com>
---
pulsar/error.go | 4 ++++
pulsar/producer_partition.go | 2 ++
pulsar/producer_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 50 insertions(+)
diff --git a/pulsar/error.go b/pulsar/error.go
index f433bfc..ead5cf9 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -101,6 +101,8 @@ const (
SeekFailed
// ProducerClosed means producer already been closed
ProducerClosed
+ // SchemaFailure means the payload could not be encoded using the Schema
+ SchemaFailure
)
// Error implement error interface, composed of two parts: msg and result.
@@ -205,6 +207,8 @@ func getResultStr(r Result) string {
return "SeekFailed"
case ProducerClosed:
return "ProducerClosed"
+ case SchemaFailure:
+ return "SchemaFailure"
default:
return fmt.Sprintf("Result(%d)", r)
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index bc775e9..43ae68f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -419,6 +419,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
var schemaPayload []byte
schemaPayload, err = p.options.Schema.Encode(msg.Value)
if err != nil {
+ p.publishSemaphore.Release()
+ request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 3c526bb..541c1fe 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -924,6 +924,50 @@ func TestMaxMessageSize(t *testing.T) {
}
}
+func TestFailedSchemaEncode(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: NewAvroSchema("{\"type\":\"string\"}", nil),
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ // producer should send return an error as message is Int64, but schema is String
+ mid, err := producer.Send(ctx, &ProducerMessage{
+ Value: int64(1),
+ })
+ assert.NotNil(t, err)
+ assert.Nil(t, mid)
+ wg.Done()
+ }()
+
+ wg.Add(1)
+ // producer should send return an error as message is Int64, but schema is String
+ producer.SendAsync(ctx, &ProducerMessage{
+ Value: int64(1),
+ }, func(messageID MessageID, producerMessage *ProducerMessage, err error) {
+ assert.NotNil(t, err)
+ assert.Nil(t, messageID)
+ wg.Done()
+ })
+ wg.Wait()
+}
+
func TestSendTimeout(t *testing.T) {
quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota"
quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`