You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/06 14:40:26 UTC
[GitHub] merlimat closed pull request #2527: [go] Ensure
producer/consumer/reader keep a ref of client instance so it won't be
finalized
merlimat closed pull request #2527: [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized
URL: https://github.com/apache/incubator-pulsar/pull/2527
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 1b41a71a49..c78a58eb87 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
)
type consumer struct {
+ client *client
ptr *C.pulsar_consumer_t
defaultChannel chan ConsumerMessage
}
@@ -76,7 +77,7 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
conf := C.pulsar_consumer_configuration_create()
- consumer := &consumer{}
+ consumer := &consumer{client: client}
if options.MessageChannel == nil {
// If there is no message listener, set a default channel so that we can have receive to
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index 284315dbb2..620b64d8b0 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -24,13 +24,14 @@ package pulsar
*/
import "C"
import (
+ "context"
"runtime"
- "unsafe"
"time"
- "context"
+ "unsafe"
)
type createProducerCtx struct {
+ client *client
callback func(producer Producer, err error)
conf *C.pulsar_producer_configuration_t
}
@@ -44,7 +45,7 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, ptr *C.pulsar_produc
if res != C.pulsar_result_Ok {
producerCtx.callback(nil, newError(res, "Failed to create Producer"))
} else {
- p := &producer{ptr: ptr}
+ p := &producer{client: producerCtx.client, ptr: ptr}
runtime.SetFinalizer(p, producerFinalizer)
producerCtx.callback(p, nil)
}
@@ -140,7 +141,7 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
defer C.free(unsafe.Pointer(topicName))
C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
- savePointer(createProducerCtx{callback, conf}))
+ savePointer(createProducerCtx{client,callback, conf}))
}
type topicMetadata struct {
@@ -161,7 +162,8 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic
/// Producer
type producer struct {
- ptr *C.pulsar_producer_t
+ client *client
+ ptr *C.pulsar_producer_t
}
func producerFinalizer(p *producer) {
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 04bb5cf840..7336c1a39b 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
)
type reader struct {
+ client *client
ptr *C.pulsar_reader_t
defaultChannel chan ReaderMessage
}
@@ -73,7 +74,7 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
return
}
- reader := &reader{}
+ reader := &reader{client: client}
if options.MessageChannel == nil {
// If there is no message listener, set a default channel so that we can have receive to
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services