You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/04/29 13:33:26 UTC

[pulsar-client-go] branch master updated: [Issue 763][producer] Fix deadlock in Producer Send when message fails to encode. (#762)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee6330  [Issue 763][producer] Fix deadlock in Producer Send when message fails to encode. (#762)
5ee6330 is described below

commit 5ee63303d43e8ee05f2c957a32c5526f29f54a36
Author: samuelhewitt <13...@users.noreply.github.com>
AuthorDate: Fri Apr 29 09:33:21 2022 -0400

    [Issue 763][producer] Fix deadlock in Producer Send when message fails to encode. (#762)
    
    * Release semaphore and execute callback when message fails to encode.
    Add tests for producer schema encode.
    
    * Use well-defined error code.
    
    Co-authored-by: shewitt <sh...@fanatics.com>
---
 pulsar/error.go              |  4 ++++
 pulsar/producer_partition.go |  2 ++
 pulsar/producer_test.go      | 44 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 50 insertions(+)

diff --git a/pulsar/error.go b/pulsar/error.go
index f433bfc..ead5cf9 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -101,6 +101,8 @@ const (
 	SeekFailed
 	// ProducerClosed means producer already been closed
 	ProducerClosed
+	// SchemaFailure means the payload could not be encoded using the Schema
+	SchemaFailure
 )
 
 // Error implement error interface, composed of two parts: msg and result.
@@ -205,6 +207,8 @@ func getResultStr(r Result) string {
 		return "SeekFailed"
 	case ProducerClosed:
 		return "ProducerClosed"
+	case SchemaFailure:
+		return "SchemaFailure"
 	default:
 		return fmt.Sprintf("Result(%d)", r)
 	}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index bc775e9..43ae68f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -419,6 +419,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		var schemaPayload []byte
 		schemaPayload, err = p.options.Schema.Encode(msg.Value)
 		if err != nil {
+			p.publishSemaphore.Release()
+			request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
 			p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
 			return
 		}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 3c526bb..541c1fe 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -924,6 +924,50 @@ func TestMaxMessageSize(t *testing.T) {
 	}
 }
 
+func TestFailedSchemaEncode(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	ctx := context.Background()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  topic,
+		Schema: NewAvroSchema("{\"type\":\"string\"}", nil),
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		// producer should send return an error as message is Int64, but schema is String
+		mid, err := producer.Send(ctx, &ProducerMessage{
+			Value: int64(1),
+		})
+		assert.NotNil(t, err)
+		assert.Nil(t, mid)
+		wg.Done()
+	}()
+
+	wg.Add(1)
+	// producer should send return an error as message is Int64, but schema is String
+	producer.SendAsync(ctx, &ProducerMessage{
+		Value: int64(1),
+	}, func(messageID MessageID, producerMessage *ProducerMessage, err error) {
+		assert.NotNil(t, err)
+		assert.Nil(t, messageID)
+		wg.Done()
+	})
+	wg.Wait()
+}
+
 func TestSendTimeout(t *testing.T) {
 	quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota"
 	quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`