You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/06 14:40:26 UTC

[GitHub] merlimat closed pull request #2527: [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized

merlimat closed pull request #2527: [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized
URL: https://github.com/apache/incubator-pulsar/pull/2527
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 1b41a71a49..c78a58eb87 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
 )
 
 type consumer struct {
+	client         *client
 	ptr            *C.pulsar_consumer_t
 	defaultChannel chan ConsumerMessage
 }
@@ -76,7 +77,7 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 
 	conf := C.pulsar_consumer_configuration_create()
 
-	consumer := &consumer{}
+	consumer := &consumer{client: client}
 
 	if options.MessageChannel == nil {
 		// If there is no message listener, set a default channel so that we can have receive to
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index 284315dbb2..620b64d8b0 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -24,13 +24,14 @@ package pulsar
 */
 import "C"
 import (
+	"context"
 	"runtime"
-	"unsafe"
 	"time"
-	"context"
+	"unsafe"
 )
 
 type createProducerCtx struct {
+	client   *client
 	callback func(producer Producer, err error)
 	conf     *C.pulsar_producer_configuration_t
 }
@@ -44,7 +45,7 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, ptr *C.pulsar_produc
 	if res != C.pulsar_result_Ok {
 		producerCtx.callback(nil, newError(res, "Failed to create Producer"))
 	} else {
-		p := &producer{ptr: ptr}
+		p := &producer{client: producerCtx.client, ptr: ptr}
 		runtime.SetFinalizer(p, producerFinalizer)
 		producerCtx.callback(p, nil)
 	}
@@ -140,7 +141,7 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
 	defer C.free(unsafe.Pointer(topicName))
 
 	C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
-		savePointer(createProducerCtx{callback, conf}))
+		savePointer(createProducerCtx{client,callback, conf}))
 }
 
 type topicMetadata struct {
@@ -161,7 +162,8 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic
 /// Producer
 
 type producer struct {
-	ptr *C.pulsar_producer_t
+	client *client
+	ptr    *C.pulsar_producer_t
 }
 
 func producerFinalizer(p *producer) {
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 04bb5cf840..7336c1a39b 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
 )
 
 type reader struct {
+	client         *client
 	ptr            *C.pulsar_reader_t
 	defaultChannel chan ReaderMessage
 }
@@ -73,7 +74,7 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
 		return
 	}
 
-	reader := &reader{}
+	reader := &reader{client: client}
 
 	if options.MessageChannel == nil {
 		// If there is no message listener, set a default channel so that we can have receive to


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services